Onibex Clickhouse Sink Connector

Onibex Clickhouse Sink Connector

The Onibex Clickhouse JDBC connector sends real-time data from Kafka to write to Tables based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and auto-evolution is supported using the Schema Registry.

Features

  1. Idempotent writes. By default, the connector uses insert.mode=insert, performing standard row insertions. When configured with insert.mode=upsert, it applies upsert semanticsโ€”inserting a new row or updating an existing one if a primary key conflict is detected. This ensures idempotent data delivery, a critical feature for exactly-once or retry-safe pipelines.
  2. Schema Support. The connector supports Avro format for both key and value schemas. It is compatible with Confluent Schema Registry and provides robust schema logging capabilities, allowing data pipelines to evolve safely and trace schema changes over time.
  3. Automatic Table and Column Creation. With auto.create=true and auto.evolve=true, the connector can automatically create missing tables and columns in ClickHouse. Table names are dynamically derived from the Kafka topic names, simplifying deployment and reducing operational overhead.
  4. Raw Data Ingestion. When using insert.mode=insert and pk.mode=none, the connector can ingest raw data directly into ClickHouse without requiring primary key constraints. This is ideal for use cases where denormalized or append-only data models are preferred.



Prerequisites

To deploy and operate the Onibex Clickhouse Sink Connector, the following requirements must be met:

  • Confluent Platform or Confluent Cloud with support for custom connectors (bring-your-own-connector capability).

  • Schema Registry integration with active Avro schema registration for keys and values.

  • Access to a ClickHouse datastore, either self-managed or cloud-hosted.

  • Valid ClickHouse credentials, with appropriate privileges to create and write to tables.

  • A valid Onibex license, provisioned and activated for the connector runtime.

  • Onibex Clickhouse Sink Connector installed on your Confluent environment.




Quick Start Guide Confluent Platform


1. Installation:

Contact Onibex for more details on installing this component.

2. Choose the theme you want to insert as a table in Clickhouse.



3. Specify the class of connector to manage integration with Clickhouse:

com.onibex.connect.clickhouse.jdbc.OnibexClickhouseSinkConnector




4. Connector configuration details:
  • Name: The name of the connector.
  • Task Max: Maximum number of tasks performed by the connector.
  • Key converter class: Key converter format.
  • Value converter class: Value converter format.
    • Key (Avro): io.confluent.connect.avro.AvroConverter
    • Value (Avro): io.confluent.connect.avro.AvroConverter
    • Key (JSON): io.confluent.connect.json.JsonSchemaConverter
    • Value (JSON): io.confluent.connect.json.JsonSchemaConverter
  • Transformation: Transformations applied to data.
    • ExtractTimestamp
    • InsertTimezone

5.- JDBC Conection:
  • JDBC URL
  • The URL to connect to the Clickhouse On-premise database, specifying the port and database.
    Example:
  1. jdbc:clickhouse://<host/ip>:<clickhouse_port>/<database>
  • The URL to connect to the Clickhouse Cloud database, specifying the port and database.
    Example:
    1.   jdbc:clickhouse://<cloud_host>:<clickhouse_port>/<database>?ssl=true
  • Connection user: 
    • JDBC user: Clickhouse Username.
  • Connection password:
    • JDBC password: Clickhouse Password.

6. Writes Settings:

  • Insert mode: Registration insertion mode. Options: Insert.
  • Batch size: Number of records grouped for SQL transactions.
  • Enable Delete: Allows the deletion of records in the target database.  Options: True or False.


Important note:
  • The insert mode depend on the Primary key mode configuration (record_key or record_value) to accurately identify records.

  • The Primary key modeinsert does not use keys to check for duplicates.

  • The Enable deletions para meter depends on the Primary key mode set  of configuration a record_key.


7. Table and schema settings:

  • Table name format: Sdefines the destination table name. Use ${topic} as a placeholder to insert the Kafka topic name. Example: my_table_${topic} with topic product maps to table my_table_product.
  • Primary key mode: Define how the main key is extracted. Options: record_keynone, record_value.
  • Primary key fields: Specify the main key to the topic.
  • Create a table: Create tables automatically if they do not exist. Options: True 
    or False.
  • Automatic evolution: Automatically update table outlines if changes occur. Options: True or False


8. Configuring the schema record (Confluent Platform):
  • Key converter schema log URL: Confluent schema log URLs.
    Example:
  1. http://<ip_schema_registry>:<port>
  • Value converter scheme record URL: URL of the Confluent Schemes Register for Values.
    Example:
  1. http://<ip_schema_registry>:<port>
  2. Transformations:
  1. ExtractTimestamp.typeAdd a timestamp to the records.
    Example: org.apache.kafka.connect.transforms.InsertField$Value
  2. transformes.ExtractTimestamp.timestamp.fieldField where the timestamp of the event will be inserted.
    Example: timestamp
  3. transforms.InsertTimezone.typeAdds a time zone field to the records.
    Example: org.apache.kafka.connect.transforms.InsertField$Value
  4. transforms.InsertTimezone.static.fieldStatic field for time zone.
    Example: timezone
  5. transforms.InsertTimezone.static.value:Time zone field value.
    Example: America/Mexico_City


Limitations

  1. When using the Onibex Clickhouse Sink Connector, please consider the following constraints, primarily due to ClickHouse JDBC driver behavior and schema handling:

  2. No Support for GENERATED ALWAYS AS Columns
    Auto-created columns do not support expressions using GENERATED ALWAYS AS. For nullable fields, the default value is NULL.

  3. Restricted Engine Support for Table Creation
    Automatic table creation is limited to ENGINE = ReplacingMergeTree(_version). Other table engines must be created manually.

  4. Schema Registry Requirements
    Both key and value schemas must be registered in the Schema Registry to support correct insert and delete operations.

  5. ClickHouse Kafka Connector Schema Requirements
Schema Format Limitations
  1. The ClickHouse Kafka Sink Connector has specific schema requirements that must be understood before implementation:

    Supported Schema Types

  2. JSON Schema with flat structure
  3. Avro Schema with flat structure
Critical Limitations

Flat Structure Requirement: The connector only supports schemas with a flat, non-nested structure. Complex or nested data structures are not supported because the connector maps each field directly to a table column with primitive data types in ClickHouse.

No String Serialization Support: Topics using plain string serialization are not supported because they lack the required schema definition needed for proper field mapping.

Required Message Structure

Each Kafka message must contain:

  • Key Schema: Used for partitioning or deduplication purposes
  • Value Schema: Contains the structured payload that maps to ClickHouse table columns
Value Schema Requirements

The connector requires structured objects (Struct) for the message value. Each message value must:

  1. Have a defined schema (JSON Schema or Avro)
  2. Contain only primitive data types (string, integer, boolean, etc.)
  3. Use a flat structure without nested objects or arrays
  4. Be registered with a Schema Registry (when applicable)
Unsupported Configurations
  • Plain string topics without schema
  • Nested JSON objects
  • Complex data structures (arrays of objects, nested structs)
  • Schema-less message formats
Example of Supported Structure
// โœ… Supported - Flat structure with primitive types
{
  "user_id": 12345,
  "username": "john_doe",
  "email": "john@example.com",
  "created_at": "2025-05-28T10:30:00Z",
  "is_active": true
}
// โŒ Not Supported - Nested structure
{
  "user_id": 12345,
  "profile": {
    "username": "john_doe",
    "contact": {
      "email": "john@example.com"
    }
  }
}

This limitation ensures that each schema field maps directly to a corresponding column in the target ClickHouse table with appropriate primitive data types.

Avro Schema Example
๐Ÿ”‘ Key Schema (Avro)
The Key Schema represents the Kafka message key and can be composed of one or more fields, typically used for partitioning, deduplication, or uniquely identifying the message.

Example with a single key (simple key):


    1. {

        "type": "record",

        "name": "KeySchema",

        "namespace": "com.example.kafka",

        "fields": [

          { "name": "id", "type": "string" }                                                                                                                                                                           

        ]

      }

            Example with multiple keys (composite key):
                  
      {
              "name": "Key",
              "type": "record",
              "fields": [
                      { "name": "id", "type": "string" },
                      { "name": "user_id", "type": "string" },
                      { "name": "contry", "type": "string"}                                                                                                                                                             
              ]
        }

         โœ… Value Schema (Avro)

          The value schema contains the payload of the Kafka message, which is mapped directly to the columns of the ClickHouse table. It must be consistent in both field            names and data types.

          Example:

    {
        "name": "UserEvent",
        "type": "record",
        "fields": [
                { "name": "user_id", "type": "string" },
                { "name": "country", "type": "string" },
                { "name": "event_type", "type": "string" },
                { "name": "event_time", "type": "float" },
                { "name": "metadata", "type": ["null", "string"], "default": null }                                                                                                                       
        ]
   }

            Json Schema Example

          ๐Ÿ”‘ Key Schema (JSON)

            The Key Schema represents the Kafka message key and can be composed of one or more fields, typically used for partitioning, deduplication, or uniquely             identifying the message.
            
            Example with a single key (simple key):

  {
  "$schema": "https://json-schema.org/draft/2020-12/schema",                                                                                                                       
  "title": "Key",
  "type": "object",
  "properties": {
       "id": { "type": "string" }
  },
  "required": ["id"]
}

            Example with multiple keys (composite key):

{
  "$schema": "https://json-schema.org/draft/2020-12/schema",                                                                                                                       
  "title": "Key",
  "type": "object",
  "properties": {
       "id": { "type": "string" },
      "user_id": { "type": "string" }
  },
  "required": ["id"," user_id"]
}




         โœ… Value Schema (Json)

          The value schema contains the payload of the Kafka message, which is mapped directly to the columns of the ClickHouse table. It must be consistent in both field            names and data types.

          Example:

  {
"$schema": "https://json-schema.org/draft/2020-12/schema",                                                                                                                        
  "title": "Value",
  "type": "object",
  "properties": {
      "id": { "type": "string" },
      "user_id": { "type": "string" },
      "num": { "type": "integer" },
      "created_at": { "type": "string"}
  },
  "required": ["id", "user_id"]
}





            

Data types supported for the connector

Length

DataType

Int8

Smallint

Int16

Smallint

Int32

Integrator

Int64

Bigint

Float32

Float

Float64

Double

Boolean

Boolean

Bytes

Binary

Null

Null



Configuration properties

Common

Name 
DescriptionValues

name
Name of connector<connector_name>
connector.classSpecifies the kind of connector that integration will handle integration with Clickhousecom.onibex.connect.clickhouse.jdbc.OnibexClickhouseSinkConnector
tasks.maxDefine the maximum number of tasks that the connector will runPositive integer value > 1
topics List of topics to be consumed by this connector<Topic_Name>

Connection

Name
Description
Values
conection.url
JDBC URL to connect to the Clickhouse database, specifying the port and database.
jdbc:clickhouse://<host/ip>:<port>/<database>
jdbc:clickhouse://<cloud_host>:<port>/<database>?ssl=true

conection.user
Clickhouse User
 <Clickhouse_Username>
conection.password
Clickhouse Password
<Clickhouse_Password>


Mapping tables

NameDescriptionValues
table.name.format
Format string used to define the name of the target table. Includes${topic}as placeholder for the name of the original theme.
${topic}
pk.mode
Specifies where to find the main key for the records that are inserted.
record_key / none / recod_value

Support for developments in the scheme

Name DescriptionValues
auto.create
Allows automatic table creation if they do not exist
true / false
auto.evolve
Allows automatic evolution of tables if the outline changes
true / false

Converters

Name DescriptionValues
key.converter
Key converter in Avro format
io.confluent.connect.avro.AvroConverter
value.converter
Value converter in Avro format
io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url
Confluent Platform schema log URLs
http:/<schema_registry_ip>:<schema_registry_port>
value.converter.schema.registry.url
Confluent Platform Log URL for Values
http:/<schema_registry_ip>:<schema_registry_port>

Transformations

Name 
Description
Values
transformations
Transformations applied to data
ExtractTimestamp, InsertTimezone
ExtractTimestamp.type
Type of transformation to add the timestamp to records
org.apache.kafka.connect.transforms.InsertField$Value
transforms.ExtractTimestamp.timestamp.field
Field where the event timestamp will be inserted
timestamp
transforms.InsertTimezone.type
Type of transformation to add the time zone
org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertTimezone.static.field
Static field where the time zone will be inserted
timezone
transforms.InsertTimezone.static.value
Time zone field value
America/Mexico_City


Examples Clickhouse On-premise

Example of setup with schema avro and password logging (Confluent Platform)

Notes{
 "value.converter.schema.registry.url":
"http://<schema_registry_ip>:<schema_registry_port>",
 "key.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",

"name": "<connector_name>",
"connector.class": "com.onibex.connect.clickhouse.jdbc.OnibexClickhouseSinkConnector",
 "tasks.max": "1",
 "key.converter": "io.confluent.connect.avro.AvroConverter",
 "value.converter": "io.confluent.connect.avro.AvroConverter",
 "transforms": "ExtractTimestamp",
 "topics": "<topic_name>",
 "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.ExtractTimestamp.timestamp.field": "timestamp",
 "connection.url": "jdbc:clickhouse://<host/ip>:<port>/<database>",
 "connection.user": "<username_clickhouse>",
 "connection.password": "<password_clickhouse>",
 "insert.mode": "INSERT",
 "delete.enabled": "true",
 "table.name.format": "{$topic}",
 "pk.mode": "record_key",
 "auto.create": "true",
 "auto.evolve": "true",
 "onibex.license": "<valid_onibex_license>"
 }

Example of setup with schema Json and password logging (Confluent Platform)


Notes{
 "value.converter.schema.registry.url": 
"http://<schema_registry_ip>:<schema_registry_port>",
 "key.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",

"name": "<connector_name>",
"connector.class": "com.onibex.connect.clickhouse.jdbc.OnibexClickhouseSinkConnector",
 "tasks.max": "1",
 "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
 "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
 "transforms": "ExtractTimestamp",
 "topics": "<topic_name>",
 "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",                                                                        
 "transforms.ExtractTimestamp.timestamp.field": "timestamp",
 "connection.url": "jdbc:clickhouse://<host/ip>:<port>/<database>",
 "connection.user": "<username_clickhouse>",
 "connection.password": "<password_clickhouse>",
 "insert.mode": "INSERT",
 "delete.enabled": "true",
 "table.name.format": "{$topic}",
 "pk.mode": "record_key",
 "auto.create": "true",
 "auto.evolve": "true",
 "onibex.license": "<valid_onibex_license>"
 }

Example of setup with schema avro  and password logging (Confluent Cloud)


Notes{
 "key.converter": "io.confluent.connect.avro.AvroConverter",
 "value.converter": "io.confluent.connect.avro.AvroConverter",
"confluent.custom.schema.registry.auto": "true",
"topics": "<topic_name>",
 "transforms": "ExtractTimestamp",
 "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",   "transforms.ExtractTimestamp.timestamp.field": "timestamp",
 "connection.url": "jdbc:clickhouse://<host/ip>:<port>/<database>",
 "connection.user": "<username_clickhouse>",
 "connection.password": "<password_clickhouse>",
 "insert.mode": "INSERT",
 "delete.enabled": "true",
 "table.name.format": "{$topic}",
 "pk.mode": "record_key",
 "auto.create": "true",
 "auto.evolve": "true",
 "onibex.license": "<valid_onibex_license>"
 }

Example of setup with schema Json and password logging (Confluent Cloud)


Notes{
 "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
 "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"confluent.custom.schema.registry.auto": "true",
"topics": "<topic_name>",
 "transforms": "ExtractTimestamp",
 "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",   "transforms.ExtractTimestamp.timestamp.field": "timestamp",
 "connection.url": "jdbc:clickhouse://<host/ip>:<port>/<database>",
 "connection.user": "<username_clickhouse>",
 "connection.password": "<password_clickhouse>",
 "insert.mode": "INSERT",
 "delete.enabled": "true",
 "table.name.format": "{$topic}",
 "pk.mode": "record_key",
 "auto.create": "true",
 "auto.evolve": "true",
 "onibex.license": "<valid_onibex_license>"
 }

Examples Clickhouse Cloud

Example of setup with schema avro and password logging (Confluent Platform)

Notes{
 "value.converter.schema.registry.url": 
"http://<schema_registry_ip>:<schema_registry_port>",
 "key.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",

"name": "<connector_name>",
"connector.class": "com.onibex.connect.clickhouse.jdbc.OnibexClickhouseSinkConnector",
 "tasks.max": "1",
 "key.converter": "io.confluent.connect.avro.AvroConverter",
 "value.converter": "io.confluent.connect.avro.AvroConverter",
 "transforms": "ExtractTimestamp",
 "topics": "<topic_name>",
 "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",                                                                        
"transforms.ExtractTimestamp.timestamp.field": "timestamp",
 "connection.url": "jdbc:clickhouse://<cloud_host>:<port>/<database>?ssl=true",
 "connection.user": "<username_clickhouse>",
 "connection.password": "<password_clickhouse>",
 "insert.mode": "INSERT",
 "delete.enabled": "true",
 "table.name.format": "{$topic}",
 "pk.mode": "record_key",
 "auto.create": "true",
 "auto.evolve": "true",
 "onibex.license": "<valid_onibex_license>"
 }

Example of setup with schema Json and password logging (Confluent Platform)


Notes{
 "value.converter.schema.registry.url": 
"http://<schema_registry_ip>:<schema_registry_port>",
 "key.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",

"name": "<connector_name>",
"connector.class": "com.onibex.connect.clickhouse.jdbc.OnibexClickhouseSinkConnector",
 "tasks.max": "1",
 "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
 "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
 "transforms": "ExtractTimestamp",
 "topics": "<topic_name>",
 "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",                                                                        
 "transforms.ExtractTimestamp.timestamp.field": "timestamp",
 "connection.url": "jdbc:clickhouse://<cloud_host>:<port>/<database>?ssl=true",
 "connection.user": "<username_clickhouse>",
 "connection.password": "<password_clickhouse>",
 "insert.mode": "INSERT",
 "delete.enabled": "true",
 "table.name.format": "{$topic}",
 "pk.mode": "record_key",
 "auto.create": "true",
 "auto.evolve": "true",
 "onibex.license": "<valid_onibex_license>"
 }

Example of setup with schema avro  and password logging (Confluent Cloud)


Notes{
 "key.converter": "io.confluent.connect.avro.AvroConverter",
 "value.converter": "io.confluent.connect.avro.AvroConverter",
"confluent.custom.schema.registry.auto": "true",
"topics": "<topic_name>",
 "transforms": "ExtractTimestamp",
 "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",   "transforms.ExtractTimestamp.timestamp.field": "timestamp",
 "connection.url": "jdbc:clickhouse://<cloud_host>:<port>/<database>?ssl=true",
 "connection.user": "<username_clickhouse>",
 "connection.password": "<password_clickhouse>",
 "insert.mode": "INSERT",
 "delete.enabled": "true",
 "table.name.format": "{$topic}",
 "pk.mode": "record_key",
 "auto.create": "true",
 "auto.evolve": "true",
 "onibex.license": "<valid_onibex_license>"
 }

Example of setup with schema Json and password logging (Confluent Cloud)


Notes{
 "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
 "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"confluent.custom.schema.registry.auto": "true",
"topics": "<topic_name>",
 "transforms": "ExtractTimestamp",
 "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",   "transforms.ExtractTimestamp.timestamp.field": "timestamp",
 "connection.url": "jdbc:clickhouse://<cloud_host>:<port>/<database>?ssl=true",
 "connection.user": "<username_clickhouse>",
 "connection.password": "<password_clickhouse>",
 "insert.mode": "INSERT",
 "delete.enabled": "true",
 "table.name.format": "{$topic}",
 "pk.mode": "record_key",
 "auto.create": "true",
 "auto.evolve": "true",
 "onibex.license": "<valid_onibex_license>"
 }
    • Related Articles

    • Onibex Snowflake Sink Connector Benefits

      The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
    • Performance Between Snowflake Connector vs Onibex Connector

      Introduction This article aims to compare the performance of two different connectors used to send data to Snowflake: Snowflake’s native connector and a custom connector developed by Onibex. The tests were conducted on both Confluent Cloud and ...
    • Onibex Snowflake Sink Connector for Confluent Platform and Cloud

      The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
    • Onibex Databricks JDBC Connector for Confluent Cloud

      JDBC Onibex Connector for Databricks The JDBC Onibex connector for Databricks sends real-time data from Kafka to write into live DeltaLake tables. Idempotent writes can be achieved using upserts. Automatic table creation and schema evolution are ...
    • OneConnect Deployment and Configuration

      One Connect Platform: Kubernetes Azure Deployment Manual for Private Network This manual outlines the steps required to deploy the One Connect system on a Kubernetes cluster within a Private Network (meaning the OneConnect Platform will not be ...