Onibex Clickhouse Sink Connector
The Onibex Clickhouse JDBC connector sends real-time data from Kafka to write to Tables based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and auto-evolution is supported using the Schema Registry.
Features
Idempotent writes: The default insert.mode is Insert. If it is configured as Upsert, the connector will use ascending semantics instead of simple insertion statements. Upserts semantics relates to atomic adding a new row or updating the existing row if there is a main violation of the key restriction, providing idempotence.
- Schemes: The connector supports the Avro input format for Key and Value. Scheme logging must be able to use a format based on schema logging.
- Self-creation of table and column: auto.create and self-evolution are compatible. Missing tables or columns can be created automatically. Table names are created based on the name of the themes.
- Raw data: Connector supports sinking raw data into Clickhouse when inser.mode is Insert and pk.mode It's nothing.
Prerequisites
- Confluent Platform/Cloud with support for Confluent custom connector.
- Registration of Confluent schemas in progress.
- Access to a clickhouse data store.
- Valid clickhouse credentials.
- A valid Onibex license.
- Onibex connector installed on Confluent Platform/Cloud.
1. Installation:
Contact Onibex for more details on installing this component.
2. Choose the theme you want to insert as a table in Clickhouse.
3. Specify the class of connector to manage integration with Clickhouse:
com.onibex.connect.clickhouse.jdbc.OnibexClickhouseSinkConnector
4. Connector configuration details:
5.- JDBC Conection:
- JDBC URL:
- The URL to connect to the Clickhouse On-premise database, specifying the port and database.
Example:
- jdbc:clickhouse://<host/ip>:<clickhouse_port>/<database>
- The URL to connect to the Clickhouse Cloud database, specifying the port and database.
Example: - jdbc:clickhouse://<cloud_host>:<clickhouse_port>/<database>?ssl=true
- Connection user:
- JDBC user: Clickhouse Username.
Private password:
- JDBC password: Clickhouse Password.
6. Writes Settings:
- Insert mode: Registration insertion mode. Options:
Insert
. - Batch size: Number of records grouped for SQL transactions.
- Enable Delete: Allows the deletion of records in the target database. Options:
True
or False
.

Important note:
The insert mode depend on the Primary key mode configuration (record_key or record_value) to accurately identify records.
The Primary key mode, insert does not use keys to check for duplicates.
The Enable deletions para meter depends on the Primary key mode set of configuration a record_key.
7. Table and schema settings:
- Table name format: Specify the format of appointment of tables.
- Primary key mode: Define how the main key is extracted. Options:
record_key
, none
, record_value
. - Primary key fields: Specify the main key to the topic.
- Create a table: Create tables automatically if they do not exist. Options:
True
or False
. - Automatic evolution: Automatically update table outlines if changes occur. Options:
True
or False
8. Configuring the schema record (Confluent Platform):
- Key converter schema log URL: Confluent schema log URLs.
Example:
http://<ip_schema_registry>:<port>
- Value converter scheme record URL: URL of the Confluent Schemes Register for Values.
Example:
http://<ip_schema_registry>:<port>
- Transformations:
ExtractTimestamp.type:
Add a timestamp to the records.
Example: org.apache.kafka.connect.transforms.InsertField$Valuetransformes.ExtractTimestamp.timestamp.field:
Field where the timestamp of the event will be inserted.
Example: timestamp
transforms.InsertTimezone.type:
Adds a time zone field to the records.
Example: org.apache.kafka.connect.transforms.InsertField$Valuetransforms.InsertTimezone.static.field:
Static field for time zone.
Example: timezone
transforms.InsertTimezone.static.value:
Time zone field value.
Example: America/Mexico_City
Limitations
Review limitations and capabilities for Clickhouse JDBC driver.
- Column auto-creation doesn’t support GENERATED ALWAYS AS expression. The default value for nullable columns is NULL.
- Automatic table creation only supports the ENGINE = ReplacingMergeTree(_version)
- The schema registry must have a value and key to perform insertions and deletions correctly.
- The connector does not support String type topics, it is required that the value of each message has a structure (Struct), that is, it must have a schema with defined fields.
Data supported for the connector
Configuration properties
Common
Name
| Description | Values |
name | Name of connector | <connector_name> |
connector.class | Specifies the kind of connector that integration will handle integration with Clickhouse | com.onibex.connect.clickhouse.jdbc.OnibexClickhouseSinkConnector |
tasks.max | Define the maximum number of tasks that the connector will run | Positive integer value > 1 |
topics | List of topics to be consumed by this connector | <Topic_Name> |
Connection
Name | Description | Values |
conection.url | JDBC URL to connect to the Clickhouse database, specifying the port and database. | jdbc:clickhouse://<host/ip>:<port>/<database>
jdbc:clickhouse://<cloud_host>:<port>/<database>?ssl=true
|
conection.user | Clickhouse User | <Clickhouse_Username> |
conection.password | Clickhouse Password | <Clickhouse_Password> |
Mapping tables
Name | Description | Values |
table.name.format | Format string used to define the name of the target table. Includes${topic}as placeholder for the name of the original theme. | ${topic} |
pk.mode | Specifies where to find the main key for the records that are inserted. | record_key / none / recod_value |
Support for developments in the scheme
Name | Description | Values |
auto.create | Allows automatic table creation if they do not exist | true / false |
auto.evolve | Allows automatic evolution of tables if the outline changes | true / false |
Converters
Name | Description | Values |
key.converter | Key converter in Avro format | io.confluent.connect.avro.AvroConverter |
value.converter | Value converter in Avro format | io.confluent.connect.avro.AvroConverter |
.converter.schema.registry.url | Confluent Platform schema log URLs | http:/<schema_registry_ip>:<schema_registry_port> |
.converter.schema.registry.url | Confluent Platform Log URL for Values | http:/<schema_registry_ip>:<schema_registry_port> |
Name | Description | Values |
transformations | Transformations applied to data | ExtractTimestamp, InsertTimezone |
ExtractTimestamp.type | Type of transformation to add the timestamp to records | org.apache.kafka.connect.transforms.InsertField$Value |
transforms.ExtractTimestamp.timestamp.field | Field where the event timestamp will be inserted | timestamp |
transforms.InsertTimezone.type | Type of transformation to add the time zone | org.apache.kafka.connect.transforms.InsertField$Value |
transforms.InsertTimezone.static.field | Static field where the time zone will be inserted | timezone |
transforms.InsertTimezone.static.value | Time zone field value | America/Mexico_City |
Examples Clickhouse On-premise
{
"value.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",
"key.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",
"name": "<connector_name>",
"connector.class": "com.onibex.connect.clickhouse.jdbc.OnibexClickhouseSinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "ExtractTimestamp",
"topics": "<topic_name>",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.ExtractTimestamp.timestamp.field": "timestamp",
"connection.url": "jdbc:clickhouse://<host/ip>:<port>/<database>",
"connection.user": "<username_clickhouse>",
"connection.password": "<password_clickhouse>",
"insert.mode": "INSERT",
"delete.enabled": "true",
"table.name.format": "{$topic}",
"pk.mode": "record_key",
"auto.create": "true",
"auto.evolve": "true",
"onibex.license": "<valid_onibex_license>"
}
{
"value.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",
"key.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",
"name": "<connector_name>",
"connector.class": "com.onibex.connect.clickhouse.jdbc.OnibexClickhouseSinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"transforms": "ExtractTimestamp",
"topics": "<topic_name>",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.ExtractTimestamp.timestamp.field": "timestamp",
"connection.url": "jdbc:clickhouse://<host/ip>:<port>/<database>",
"connection.user": "<username_clickhouse>",
"connection.password": "<password_clickhouse>",
"insert.mode": "INSERT",
"delete.enabled": "true",
"table.name.format": "{$topic}",
"pk.mode": "record_key",
"auto.create": "true",
"auto.evolve": "true",
"onibex.license": "<valid_onibex_license>"
}
Example of setup with schema avro and password logging (Confluent Cloud)
{
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"confluent.custom.schema.registry.auto": "true",
"topics": "<topic_name>",
"transforms": "ExtractTimestamp",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.ExtractTimestamp.timestamp.field": "timestamp",
"connection.url": "jdbc:clickhouse://<host/ip>:<port>/<database>",
"connection.user": "<username_clickhouse>",
"connection.password": "<password_clickhouse>",
"insert.mode": "INSERT",
"delete.enabled": "true",
"table.name.format": "{$topic}",
"pk.mode": "record_key",
"auto.create": "true",
"auto.evolve": "true",
"onibex.license": "<valid_onibex_license>"
}Example of setup with schema Json and password logging (Confluent Cloud)
{
"key.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"confluent.custom.schema.registry.auto": "true",
"topics": "<topic_name>",
"transforms": "ExtractTimestamp",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.ExtractTimestamp.timestamp.field": "timestamp",
"connection.url": "jdbc:clickhouse://<host/ip>:<port>/<database>",
"connection.user": "<username_clickhouse>",
"connection.password": "<password_clickhouse>",
"insert.mode": "INSERT",
"delete.enabled": "true",
"table.name.format": "{$topic}",
"pk.mode": "record_key",
"auto.create": "true",
"auto.evolve": "true",
"onibex.license": "<valid_onibex_license>"
}
Examples Clickhouse Cloud
{
"value.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",
"key.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",
"name": "<connector_name>",
"connector.class": "com.onibex.connect.clickhouse.jdbc.OnibexClickhouseSinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "ExtractTimestamp",
"topics": "<topic_name>",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.ExtractTimestamp.timestamp.field": "timestamp",
"connection.url": "jdbc:clickhouse://<cloud_host>:<port>/<database>?ssl=true",
"connection.user": "<username_clickhouse>",
"connection.password": "<password_clickhouse>",
"insert.mode": "INSERT",
"delete.enabled": "true",
"table.name.format": "{$topic}",
"pk.mode": "record_key",
"auto.create": "true",
"auto.evolve": "true",
"onibex.license": "<valid_onibex_license>"
}
{
"value.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",
"key.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",
"name": "<connector_name>",
"connector.class": "com.onibex.connect.clickhouse.jdbc.OnibexClickhouseSinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"transforms": "ExtractTimestamp",
"topics": "<topic_name>",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.ExtractTimestamp.timestamp.field": "timestamp",
"connection.url": "jdbc:clickhouse://<cloud_host>:<port>/<database>?ssl=true",
"connection.user": "<username_clickhouse>",
"connection.password": "<password_clickhouse>",
"insert.mode": "INSERT",
"delete.enabled": "true",
"table.name.format": "{$topic}",
"pk.mode": "record_key",
"auto.create": "true",
"auto.evolve": "true",
"onibex.license": "<valid_onibex_license>"
}
Example of setup with schema avro and password logging (Confluent Cloud)
{
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"confluent.custom.schema.registry.auto": "true",
"topics": "<topic_name>",
"transforms": "ExtractTimestamp",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.ExtractTimestamp.timestamp.field": "timestamp",
"connection.url": "jdbc:clickhouse://<cloud_host>:<port>/<database>?ssl=true",
"connection.user": "<username_clickhouse>",
"connection.password": "<password_clickhouse>",
"insert.mode": "INSERT",
"delete.enabled": "true",
"table.name.format": "{$topic}",
"pk.mode": "record_key",
"auto.create": "true",
"auto.evolve": "true",
"onibex.license": "<valid_onibex_license>"
}Example of setup with schema Json and password logging (Confluent Cloud)
{
"key.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"confluent.custom.schema.registry.auto": "true",
"topics": "<topic_name>",
"transforms": "ExtractTimestamp",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.ExtractTimestamp.timestamp.field": "timestamp",
"connection.url": "jdbc:clickhouse://<cloud_host>:<port>/<database>?ssl=true",
"connection.user": "<username_clickhouse>",
"connection.password": "<password_clickhouse>",
"insert.mode": "INSERT",
"delete.enabled": "true",
"table.name.format": "{$topic}",
"pk.mode": "record_key",
"auto.create": "true",
"auto.evolve": "true",
"onibex.license": "<valid_onibex_license>"
}
Related Articles
Onibex Snowflake Sink Connector Benefits
The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
Performance Between Snowflake Connector vs Onibex Connector
Introduction This article aims to compare the performance of two different connectors used to send data to Snowflake: Snowflake’s native connector and a custom connector developed by Onibex. The tests were conducted on both Confluent Cloud and ...
Onibex Snowflake Sink Connector for Confluent Platform and Cloud
The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
Onibex Databricks JDBC Connector for Confluent Cloud
JDBC Onibex Connector for Databricks The JDBC Onibex connector for Databricks sends real-time data from Kafka to write into live DeltaLake tables. Idempotent writes can be achieved using upserts. Automatic table creation and schema evolution are ...
OneConnect Deployment and Configuration
One Connect Platform: Kubernetes Azure Deployment Manual for Private Network This manual outlines the steps required to deploy the One Connect system on a Kubernetes cluster within a Private Network (meaning the OneConnect Platform will not be ...