insert.mode=insert
, performing standard row insertions. When configured with insert.mode=upsert
, it applies upsert semanticsโinserting a new row or updating an existing one if a primary key conflict is detected. This ensures idempotent data delivery, a critical feature for exactly-once or retry-safe pipelines.auto.create=true
and auto.evolve=true
, the connector can automatically create missing tables and columns in ClickHouse. Table names are dynamically derived from the Kafka topic names, simplifying deployment and reducing operational overhead.insert.mode=insert
and pk.mode=none
, the connector can ingest raw data directly into ClickHouse without requiring primary key constraints. This is ideal for use cases where denormalized or append-only data models are preferred.To deploy and operate the Onibex Clickhouse Sink Connector, the following requirements must be met:
Confluent Platform or Confluent Cloud with support for custom connectors (bring-your-own-connector capability).
Schema Registry integration with active Avro schema registration for keys and values.
Access to a ClickHouse datastore, either self-managed or cloud-hosted.
Valid ClickHouse credentials, with appropriate privileges to create and write to tables.
A valid Onibex license, provisioned and activated for the connector runtime.
Onibex Clickhouse Sink Connector installed on your Confluent environment.
- jdbc:clickhouse://<host/ip>:<clickhouse_port>/<database>
6. Writes Settings:
Insert
.True
or False
.Important note:
The insert mode depend on the Primary key mode configuration (record_key or record_value) to accurately identify records.
The Primary key mode, insert does not use keys to check for duplicates.
The Enable deletions para meter depends on the Primary key mode set of configuration a record_key.
7. Table and schema settings:
${topic}
as a placeholder to insert the Kafka topic name. Example: my_table_${topic}
with topic product
maps to table my_table_product
.record_key
, none
, record_value
.True
False
.True
or False
http://<ip_schema_registry>:<port>
http://<ip_schema_registry>:<port>
ExtractTimestamp.type:
Add a timestamp to the records.transformes.ExtractTimestamp.timestamp.field:
Field where the timestamp of the event will be inserted.timestamp
transforms.InsertTimezone.type:
Adds a time zone field to the records.transforms.InsertTimezone.static.field:
Static field for time zone.timezone
transforms.InsertTimezone.static.value:
Time zone field value.America/Mexico_City
When using the Onibex Clickhouse Sink Connector, please consider the following constraints, primarily due to ClickHouse JDBC driver behavior and schema handling:
No Support for GENERATED ALWAYS AS
Columns
Auto-created columns do not support expressions using GENERATED ALWAYS AS
. For nullable fields, the default value is NULL
.
Restricted Engine Support for Table Creation
Automatic table creation is limited to ENGINE = ReplacingMergeTree(_version)
. Other table engines must be created manually.
Schema Registry Requirements
Both key and value schemas must be registered in the Schema Registry to support correct insert and delete operations.
The ClickHouse Kafka Sink Connector has specific schema requirements that must be understood before implementation:
Flat Structure Requirement: The connector only supports schemas with a flat, non-nested structure. Complex or nested data structures are not supported because the connector maps each field directly to a table column with primitive data types in ClickHouse.
No String Serialization Support: Topics using plain string serialization are not supported because they lack the required schema definition needed for proper field mapping.
Each Kafka message must contain:
The connector requires structured objects (Struct) for the message value. Each message value must:
// โ
Supported - Flat structure with primitive types
{
"user_id": 12345,
"username": "john_doe",
"email": "john@example.com",
"created_at": "2025-05-28T10:30:00Z",
"is_active": true
}
// โ Not Supported - Nested structure
{
"user_id": 12345,
"profile": {
"username": "john_doe",
"contact": {
"email": "john@example.com"
}
}
}
This limitation ensures that each schema field maps directly to a corresponding column in the target ClickHouse table with appropriate primitive data types.
Avro Schema Example
๐ Key Schema (Avro)
The Key Schema represents the Kafka message key and can be composed of one or more fields, typically used for partitioning, deduplication, or uniquely identifying the message.
Example with a single key (simple key):
{
"type": "record",
"name": "KeySchema",
"namespace": "com.example.kafka",
"fields": [
{ "name": "id", "type": "string" }
]
}
{"name": "Key","type": "record","fields": [{ "name": "id", "type": "string" },{ "name": "user_id", "type": "string" },{ "name": "contry", "type": "string"}]}
The value schema contains the payload of the Kafka message, which is mapped directly to the columns of the ClickHouse table. It must be consistent in both field names and data types.
Example:
{"name": "UserEvent","type": "record","fields": [{ "name": "user_id", "type": "string" },{ "name": "country", "type": "string" },{ "name": "event_type", "type": "string" },{ "name": "event_time", "type": "float" },{ "name": "metadata", "type": ["null", "string"], "default": null }]}
{"$schema": "https://json-schema.org/draft/2020-12/schema","title": "Key","type": "object","properties": {"id": { "type": "string" }},"required": ["id"]}
{"$schema": "https://json-schema.org/draft/2020-12/schema","title": "Key","type": "object","properties": {"id": { "type": "string" },"user_id": { "type": "string" }},"required": ["id"," user_id"]}
The value schema contains the payload of the Kafka message, which is mapped directly to the columns of the ClickHouse table. It must be consistent in both field names and data types.
Example:
{
"$schema": "https://json-schema.org/draft/2020-12/schema","title": "Value","type": "object","properties": {"id": { "type": "string" },"user_id": { "type": "string" },"num": { "type": "integer" },"created_at": { "type": "string"}},"required": ["id", "user_id"]}
Length | DataType |
Int8 | Smallint |
Int16 | Smallint |
Int32 | Integrator |
Int64 | Bigint |
Float32 | Float |
Float64 | Double |
Boolean | Boolean |
Bytes | Binary |
Null | Null |
Name | Description | Values |
name | Name of connector | <connector_name> |
connector.class | Specifies the kind of connector that integration will handle integration with Clickhouse | com.onibex.connect.clickhouse.jdbc.OnibexClickhouseSinkConnector |
tasks.max | Define the maximum number of tasks that the connector will run | Positive integer value > 1 |
topics | List of topics to be consumed by this connector | <Topic_Name> |
Name | Description | Values |
conection.url | JDBC URL to connect to the Clickhouse database, specifying the port and database. | jdbc:clickhouse://<host/ip>:<port>/<database> jdbc:clickhouse://<cloud_host>:<port>/<database>?ssl=true |
conection.user | Clickhouse User | <Clickhouse_Username> |
conection.password | Clickhouse Password | <Clickhouse_Password> |
Name | Description | Values |
table.name.format | Format string used to define the name of the target table. Includes${topic}as placeholder for the name of the original theme. | ${topic} |
pk.mode | Specifies where to find the main key for the records that are inserted. | record_key / none / recod_value |
Name | Description | Values |
auto.create | Allows automatic table creation if they do not exist | true / false |
auto.evolve | Allows automatic evolution of tables if the outline changes | true / false |
Name | Description | Values |
key.converter | Key converter in Avro format | io.confluent.connect.avro.AvroConverter |
value.converter | Value converter in Avro format | io.confluent.connect.avro.AvroConverter |
key.converter.schema.registry.url | Confluent Platform schema log URLs | http:/<schema_registry_ip>:<schema_registry_port> |
value.converter.schema.registry.url | Confluent Platform Log URL for Values | http:/<schema_registry_ip>:<schema_registry_port> |
Name | Description | Values |
transformations | Transformations applied to data | ExtractTimestamp, InsertTimezone |
ExtractTimestamp.type | Type of transformation to add the timestamp to records | org.apache.kafka.connect.transforms.InsertField$Value |
transforms.ExtractTimestamp.timestamp.field | Field where the event timestamp will be inserted | timestamp |
transforms.InsertTimezone.type | Type of transformation to add the time zone | org.apache.kafka.connect.transforms.InsertField$Value |
transforms.InsertTimezone.static.field | Static field where the time zone will be inserted | timezone |
transforms.InsertTimezone.static.value | Time zone field value | America/Mexico_City |