Onibex Clickhouse Sink Connector
The Onibex Clickhouse JDBC connector sends real-time data from Kafka to write to Tables based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and auto-evolution is supported using the Schema Registry.
Features
Idempotent writes: The default inser.mode is Insert. If it is configured as Uppsert, the connector will use ascending semantics instead of simple insertion statements. Uppsert's semantics relates to atomic adding a new row or updating the existing row if there is a main violation of the key restriction, providing idempotence.
- Schemes: The connector supports the Avro input format for Key and Value. Scheme logging must be able to use a format based on schema logging.
- Self-creation of table and column: auto.create and self-evolution are compatible. Missing tables or columns can be created automatically. Table names are created based on the name of the themes.
- Raw data: Connector supports sinking raw data into Clickhouse when inser.mode is Insert and pk.mode It's nothing.
Prerequisites
- Confluent Platform/Cloud with support for Confluent custom connector.
- Registration of Confluent schemes in progress.
- Access to a clickhouse data store.
- Valid clickhouse credentials.
- Onibex connector installed on Confluent Platform/Cloud.
1. Installation:
Contact Onibex for more details on installing this component.
2. Select the connector to configure it.
3. Choose the theme you want to insert as a table in Clickhouse.
4. Specify the class of connector to manage integration with Clickhouse:
io.confluent.connect.jdbc.JdbcSinkConnector
5. Connector configuration details:
6.- JDBC Conection:
- JDBC URL: The URL to connect to the Clickhouse database, specifying the port and database.
Example:
- jdbc:clickhouse://<host/ip>:<clickhouse_port>/<database>
- Connection user:
- JDBC user: Clic
kho
use Username.
Private password:
- JDBC password: Clickhouse Password.
7. Writes Settings:
- Insert mode: Registration insertion mode. Options:
Insert
. - Batch size: Number of records grouped for SQL transactions.
- Enable Delete: Allows the deletion of records in the target database. Options:
True
or False
.

Important note:
The insert mode depend on the Primary key mode configuration (record_key or record_value) to accurately identify records.
The Primary key mode, insert does not use keys to check for duplicates.
The Enable deletions para meter depends on the Primary key mode set of configuration a record_key.
8. Table and schema settings:
- Table name format: Specify the format of appointment of tables.
- Primary key mode: Define how the main key is extracted. Options:
record_key
, none
, record_value
. - Primary key fields: Specify the main key to the topic.
- Create a car: Create tables automatically if they do not exist. Options:
True
or False
. - Automatic evolution: Automatically update table outlines if changes occur. Options:
True
or False
9. Configuring the schema record:
- Key converter schema log URL: Confluent schema log URLs.
Example:
http://<ip_schema_registry>:<port>
- Value converter scheme record URL: URL of the Confluent Schemes Register for Values.
Example:
http://<ip_schema_registry>:<port>
- Transformations:
ExtractTimestamp.type:
Add a timestamp to the records.
Example: org.apache.kafka.connect.transforms.InsertFieldtransformes.ExtractTimestamp.timestamp.field:
Field where the timestamp of the event will be inserted.
Example:timestamp
transforms.InsertTimezone.type:
Adds a time zone field to the records.
Example: org.apache.kafka.connect.transforms.InsertFieldtransforms.InsertTimezone.static.field:
Static field for time zone.
Example:time zone
transforms.InsertTimezone.static.value:
Time zone field value.
Example:America/Mexico_City
Limitations
Review limitations and capabilities for Clickhouse JDBC driver.
- Table auto-creation doesn’t support “PORTIONED BY” and “PRIMARY KEY” clauses for column table. If partitions and a primary key are necessary to improve table performance, ALTER TABLE must be executed manually.
- Column auto-creation doesn’t support GENERATED ALWAYS AS expression. The default value for nullable columns is NULL.
- Automatic table creation only supports the ENGINE = ReplacingMergeTree(_version)
Data supported for the connector
Configuration properties
Common
Name
| Description | Values |
name | Name of connector | <name_connector> |
connector.class | Specifies the kind of connector that integration will handle integration with Clickhouse | io.confluent.connect.jdbc.JdbcSinkConnector |
tasks.max | Define the maximum number of tasks that the connector will run | Positive integer value > 1 |
topics | List of topics to be consumed by this connector | <Topic_Name> |
Connection
Name | Description | Values |
conection.url | JDBC URL to connect to the Clickhouse database, specifying the port and database. | jdbc:clickhouse://<host/ip>:<port>/<database> |
conection.user | Clickhouse User | <Clickhouse_Username> |
conection.password | Clickhouse Password | <Clickhouse_Password> |
Mapping tables
Name | Description | Values |
table.name.format | Format string used to define the name of the target table. Includes${topic}as placeholder for the name of the original theme. | ${topic} |
pk.mode | Specifies where to find the main key for the records that are inserted. | record_key |
Support for developments in the scheme
Name | Description | Values |
auto.create | Allows automatic table creation if they do not exist | true / false |
auto.evolve | Allows automatic evolution of tables if the outline changes | true / false |
Converters
Name | Description | Values |
key.converter | Key converter in Avro format | io.confluent.connect.avro.AvroConverter |
value.converter | Value converter in Avro format | io.confluent.connect.avro.AvroConverter |
.converter.schema.registry.url | Confluent schema log URLs | http:/<schema_registry_ip>:<schema_registry_port> |
.converter.schema.registry.url | Confluent Scheme Log URL for Values | http:/<schema_registry_ip>:<schema_registry_port> |
Name | Description | Values |
transformations | Transformations applied to data | ExtractTimestamp, InsertTimezone |
ExtractTimestamp.type | Type of transformation to add the timestamp to records | org.apache.kafka.connect.transforms.InsertField$Value |
transforms.ExtractTimestamp.timestamp.field | Field where the event timestamp will be inserted | timestamp |
transforms.InsertTimezone.type | Type of transformation to add the time zone | org.apache.kafka.connect.transforms.InsertField$Value |
transforms.InsertTimezone.static.field | Static field where the time zone will be inserted | timezone |
transforms.InsertTimezone.static.value | Time zone field value | America/Mexico_City |
Examples
{
"value.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",
"key.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",
"name": "<connector_name>",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "ExtractTimestamp",
"topics": "<topic_name>",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.ExtractTimestamp.timestamp.field": "timestamp",
"connection.url": "jdbc:clickhouse://<host/ip>:<port>/<database>",
"connection.user": "<username_clickhouse>",
"connection.password": "<password_clickhouse>",
"insert.mode": "INSERT",
"delete.enabled": "true",
"table.name.format": "{$topic}",
"pk.mode": "record_key",
"auto.create": "true",
"auto.evolve": "true"
}
Example of setup with schema avro and password logging (Confluent Cloud)
- {
"auto.create": "true",
"auto.evolve": "true",
"confluent.custom.schema.registry.auto": "true",
"connection.password": "<clickhouse_password>",
"connection.url": "jdbc:clickhouse://<host/ip>:<clickhouse_port>/<database>",
"connection.user": "<clickhouse_username>",
"delete.enabled": "true",
"insert.mode": "INSERT",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"pk.mode": "record_key",
"table.name.format": "${topic}",
"topics": "<topic>",
"transforms": "ExtractTimestamp",
"transforms.ExtractTimestamp.timestamp.field": "timestamp",
"transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"value.converter": "io.confluent.connect.avro.AvroConverter"
}
Related Articles
Onibex Snowflake Sink Connector Benefits
The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
Onibex Snowflake Sink Connector for Confluent Platform and Cloud
The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
Onibex Databricks JDBC Connector for Confluent Cloud
JDBC Onibex Connector for Databricks The JDBC Onibex connector for Databricks sends real-time data from Kafka to write into live DeltaLake tables. Idempotent writes can be achieved using upserts. Automatic table creation and schema evolution are ...
3. One Connect - Connection
3. CONNECTION The One Connect system provides the capability to extract information from SAP and its transfer it to Kafka. This facilitates the creation of a requisite database for analysis, which serves as the foundation for generating financial ...
BTP confirmation
1. Install cloud connector Step 1 Make sure you download Virtual Java Version 8 (either for Windows or Linux) andBinary Installation Files. 2. Connect the cloud connector to BTP Step 1 Cloud connector Sign in to Cloud Connector.Enter the default ...