Onibex Clickhouse Sink Connector
The Onibex Clickhouse JDBC connector sends real-time data from Kafka to write to Tables based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and auto-evolution is supported using the Schema Registry.
Features
Idempotent writes: The default inser.mode is Insert. If it is configured as Uppsert, the connector will use ascending semantics instead of simple insertion statements. Uppsert's semantics relates to atomic adding a new row or updating the existing row if there is a main violation of the key restriction, providing idempotence.
- Schemes: The connector supports the Avro input format for Key and Value. Scheme logging must be able to use a format based on schema logging.
- Self-creation of table and column: auto.create and self-evolution are compatible. Missing tables or columns can be created automatically. Table names are created based on the name of the themes.
- Raw data: Connector supports sinking raw data into Clickhouse when inser.mode is Insert and pk.mode It's nothing.
Prerequisites
- Confluent Platform/Cloud with support for Confluent custom connector.
- Registration of Confluent schemes in progress.
- Access to a clickhouse data store.
- Valid clickhouse credentials.
- Onibex connector installed on Confluent Platform/Cloud.
1. Installation:
Contact Onibex for more details on installing this component.
2. Select the connector to configure it.
3. Choose the theme you want to insert as a table in Clickhouse.
4. Specify the class of connector to manage integration with Clickhouse:
io.confluent.connect.jdbc.JdbcSinkConnector
5. Connector configuration details:
6.- JDBC Conection:
- JDBC URL: The URL to connect to the Clickhouse database, specifying the port and database.
Example:
- jdbc:clickhouse://<host/ip>:<clickhouse_port>/<database>
- Connection user:
- JDBC user: Clic
kho
use Username.
Private password:
- JDBC password: Clickhouse Password.
7. Writes Settings:
- Insert mode: Registration insertion mode. Options:
Insert
. - Batch size: Number of records grouped for SQL transactions.
- Enable Delete: Allows the deletion of records in the target database. Options:
True
or False
.

Important note:
The insert mode depend on the Primary key mode configuration (record_key or record_value) to accurately identify records.
The Primary key mode, insert does not use keys to check for duplicates.
The Enable deletions para meter depends on the Primary key mode set of configuration a record_key.
8. Table and schema settings:
- Table name format: Specify the format of appointment of tables.
- Primary key mode: Define how the main key is extracted. Options:
record_key
, none
, record_value
. - Primary key fields: Specify the main key to the topic.
- Create a car: Create tables automatically if they do not exist. Options:
True
or False
. - Automatic evolution: Automatically update table outlines if changes occur. Options:
True
or False
9. Configuring the schema record:
- Key converter schema log URL: Confluent schema log URLs.
Example:
http://<ip_schema_registry>:<port>
- Value converter scheme record URL: URL of the Confluent Schemes Register for Values.
Example:
http://<ip_schema_registry>:<port>
- Transformations:
ExtractTimestamp.type:
Add a timestamp to the records.
Example: org.apache.kafka.connect.transforms.InsertFieldtransformes.ExtractTimestamp.timestamp.field:
Field where the timestamp of the event will be inserted.
Example:timestamp
transforms.InsertTimezone.type:
Adds a time zone field to the records.
Example: org.apache.kafka.connect.transforms.InsertFieldtransforms.InsertTimezone.static.field:
Static field for time zone.
Example:time zone
transforms.InsertTimezone.static.value:
Time zone field value.
Example:America/Mexico_City
Limitations
Review limitations and capabilities for Clickhouse JDBC driver.
- Table auto-creation doesn’t support “PORTIONED BY” and “PRIMARY KEY” clauses for column table. If partitions and a primary key are necessary to improve table performance, ALTER TABLE must be executed manually.
- Column auto-creation doesn’t support GENERATED ALWAYS AS expression. The default value for nullable columns is NULL.
- Automatic table creation only supports the ENGINE = ReplacingMergeTree(_version)
Data supported for the connector
Configuration properties
Common
Name
| Description | Values |
name | Name of connector | <name_connector> |
connector.class | Specifies the kind of connector that integration will handle integration with Clickhouse | io.confluent.connect.jdbc.JdbcSinkConnector |
tasks.max | Define the maximum number of tasks that the connector will run | Positive integer value > 1 |
topics | List of topics to be consumed by this connector | <Topic_Name> |
Connection
Name | Description | Values |
conection.url | JDBC URL to connect to the Clickhouse database, specifying the port and database. | jdbc:clickhouse://<host/ip>:<port>/<database> |
conection.user | Clickhouse User | <Clickhouse_Username> |
conection.password | Clickhouse Password | <Clickhouse_Password> |
Mapping tables
Name | Description | Values |
table.name.format | Format string used to define the name of the target table. Includes${topic}as placeholder for the name of the original theme. | ${topic} |
pk.mode | Specifies where to find the main key for the records that are inserted. | record_key |
Support for developments in the scheme
Name | Description | Values |
auto.create | Allows automatic table creation if they do not exist | true / false |
auto.evolve | Allows automatic evolution of tables if the outline changes | true / false |
Converters
Name | Description | Values |
key.converter | Key converter in Avro format | io.confluent.connect.avro.AvroConverter |
value.converter | Value converter in Avro format | io.confluent.connect.avro.AvroConverter |
.converter.schema.registry.url | Confluent schema log URLs | http:/<schema_registry_ip>:<schema_registry_port> |
.converter.schema.registry.url | Confluent Scheme Log URL for Values | http:/<schema_registry_ip>:<schema_registry_port> |
Name | Description | Values |
transformations | Transformations applied to data | ExtractTimestamp, InsertTimezone |
ExtractTimestamp.type | Type of transformation to add the timestamp to records | org.apache.kafka.connect.transforms.InsertField$Value |
transforms.ExtractTimestamp.timestamp.field | Field where the event timestamp will be inserted | timestamp |
transforms.InsertTimezone.type | Type of transformation to add the time zone | org.apache.kafka.connect.transforms.InsertField$Value |
transforms.InsertTimezone.static.field | Static field where the time zone will be inserted | timezone |
transforms.InsertTimezone.static.value | Time zone field value | America/Mexico_City |
Examples
{
"value.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",
"key.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",
"name": "<connector_name>",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "ExtractTimestamp",
"topics": "<topic_name>",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.ExtractTimestamp.timestamp.field": "timestamp",
"connection.url": "jdbc:clickhouse://<host/ip>:<port>/<database>",
"connection.user": "<username_clickhouse>",
"connection.password": "<password_clickhouse>",
"insert.mode": "INSERT",
"delete.enabled": "true",
"table.name.format": "{$topic}",
"pk.mode": "record_key",
"auto.create": "true",
"auto.evolve": "true"
}
Example of setup with schema avro and password logging (Confluent Cloud)
- {
"auto.create": "true",
"auto.evolve": "true",
"confluent.custom.schema.registry.auto": "true",
"connection.password": "<clickhouse_password>",
"connection.url": "jdbc:clickhouse://<host/ip>:<clickhouse_port>/<database>",
"connection.user": "<clickhouse_username>",
"delete.enabled": "true",
"insert.mode": "INSERT",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"pk.mode": "record_key",
"table.name.format": "${topic}",
"topics": "<topic>",
"transforms": "ExtractTimestamp",
"transforms.ExtractTimestamp.timestamp.field": "timestamp",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"value.converter": "io.confluent.connect.avro.AvroConverter"
}
Related Articles
Onibex Snowflake Sink Connector Benefits
The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
Onibex Snowflake Sink Connector for Confluent Platform and Cloud
The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
Onibex Databricks JDBC Connector for Confluent Cloud
JDBC Onibex Connector for Databricks The JDBC Onibex connector for Databricks sends real-time data from Kafka to write into live DeltaLake tables. Idempotent writes can be achieved using upserts. Automatic table creation and schema evolution are ...
Desempeño entre el Conector Snowflake vs Conector Onibex
Introducción Este artículo tiene como objetivo comparar el rendimiento entre dos conectores diferentes que se utilizan para enviar datos a Snowflake: el conector nativo de Snowflake y un conector custom desarrollado por Onibex. Las pruebas se ...
OneConnect Deployment and Configuration
One Connect Platform: Kubernetes Azure Deployment Manual for Private Network This manual outlines the steps required to deploy the One Connect system on a Kubernetes cluster within a Private Network (meaning the OneConnect Platform will not be ...