Onibex Clickhouse Sink Connector

Onibex Clickhouse Sink Connector

The Onibex Clickhouse JDBC connector sends real-time data from Kafka to write to Tables based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and auto-evolution is supported using the Schema Registry.

Features

  1. Idempotent writes: The default inser.mode is Insert. If it is configured as Uppsert, the connector will use ascending semantics instead of simple insertion statements. Uppsert's semantics relates to atomic adding a new row or updating the existing row if there is a main violation of the key restriction, providing idempotence.
  2. Schemes: The connector supports the Avro input format for Key and Value. Scheme logging must be able to use a format based on schema logging.
  3. Self-creation of table and column: auto.create and self-evolution are compatible. Missing tables or columns can be created automatically. Table names are created based on the name of the themes.
  4. Raw data: Connector supports sinking raw data into Clickhouse when inser.mode is Insert and pk.mode It's nothing.



Prerequisites

  1. Confluent Platform/Cloud with support for Confluent custom connector.
  2. Registration of Confluent schemes in progress.
  3. Access to a clickhouse data store.
  4. Valid clickhouse credentials.
  5. Onibex connector installed on Confluent Platform/Cloud.



Quick Start Guide Confluent Platform


1. Installation:

Contact Onibex for more details on installing this component.


2. Select the connector to configure it.




3. Choose the theme you want to insert as a table in Clickhouse.



4. Specify the class of connector to manage integration with Clickhouse:

io.confluent.connect.jdbc.JdbcSinkConnector



5. Connector configuration details:
  • First name:The name of the connector.
  • Task Max:Maximum number of tasks performed by the connector.
  • Key converter class:Key converter format.
  • Value converter class:Value converter format.
    • Key (Avro): io.confluent.connect.avro.AvroConverter
    • Value (Avro): io.confluent.connect.avro.AvroConverter
    • Key (JSON): io.confluent.connect.json.JsonSchemaConverter
    • Value (JSON): io.confluent.connect.json.JsonSchemaConverter
    • Key (String): apache.kafka.connect.storage.StringConverter
    • Value (String): apache.kafka.connect.storage.StringConverter
  • Transformation: Transformations applied to data.
    • ExtractTimestamp
    • InsertTimezone

6.- JDBC Conection:
  • JDBC URLThe URL to connect to the Clickhouse database, specifying the port and database.
    Example:
  1. jdbc:clickhouse://<host/ip>:<clickhouse_port>/<database>
  • Connection user: 
    • JDBC user: Clic
      kho
      use Username.
  • Private password:
    • JDBC password: Clickhouse Password.

7. Writes Settings:

  • Insert mode: Registration insertion mode. Options: Insert.
  • Batch size: Number of records grouped for SQL transactions.
  • Enable Delete: Allows the deletion of records in the target database.  Options: True or False.


Important note:
  • The insert mode depend on the Primary key mode configuration (record_key or record_value) to accurately identify records.

  • The Primary key modeinsert does not use keys to check for duplicates.

  • The Enable deletions para meter depends on the Primary key mode set  of configuration a record_key.


8. Table and schema settings:

  • Table name format: Specify the format of appointment of tables.
  • Primary key mode: Define how the main key is extracted. Options:record_keynone, record_value.
  • Primary key fields: Specify the main key to the topic.
  • Create a car: Create tables automatically if they do not exist. Options: True 
    or False.
  • Automatic evolution: Automatically update table outlines if changes occur. Options: True or False


9. Configuring the schema record:
  • Key converter schema log URL: Confluent schema log URLs.
    Example:
  1. http://<ip_schema_registry>:<port>
  • Value converter scheme record URL: URL of the Confluent Schemes Register for Values.
    Example:
  1. http://<ip_schema_registry>:<port>
  1. Transformations:
  1. ExtractTimestamp.type:Add a timestamp to the records.
    Example: org.apache.kafka.connect.transforms.InsertField
  2. transformes.ExtractTimestamp.timestamp.field:Field where the timestamp of the event will be inserted.
    Example:timestamp
  3. transforms.InsertTimezone.type:Adds a time zone field to the records.
    Example: org.apache.kafka.connect.transforms.InsertField
  4. transforms.InsertTimezone.static.field:Static field for time zone.
    Example:time zone
  5. transforms.InsertTimezone.static.value:Time zone field value.
    Example:America/Mexico_City


Limitations

Review limitations and capabilities for Clickhouse JDBC driver.
  1. Table auto-creation doesn’t support “PORTIONED BY” and “PRIMARY KEY” clauses for column table. If partitions and a primary key are necessary to improve table performance, ALTER TABLE must be executed manually.
  2. Column auto-creation doesn’t support GENERATED ALWAYS AS expression. The default value for nullable columns is NULL.
  3. Automatic table creation only supports the ENGINE = ReplacingMergeTree(_version)


Data supported for the connector

Length

DataType

Int8

Smallint

Int16

Smallint

Int32

Integrator

Int64

Bigint

Float32

Float

Float64

Double

Boolean

Boolean

Bytes

Binary

Null

Null



Configuration properties

Common

Name 
DescriptionValues
nameName of connector<name_connector>
connector.classSpecifies the kind of connector that integration will handle integration with Clickhouseio.confluent.connect.jdbc.JdbcSinkConnector
tasks.maxDefine the maximum number of tasks that the connector will runPositive integer value > 1
topics List of topics to be consumed by this connector<Topic_Name>

Connection

Name
Description
Values
conection.url
JDBC URL to connect to the Clickhouse database, specifying the port and database.
jdbc:clickhouse://<host/ip>:<port>/<database>
conection.user
Clickhouse User
 <Clickhouse_Username>
conection.password
Clickhouse Password
<Clickhouse_Password>


Mapping tables

NameDescriptionValues
table.name.format
Format string used to define the name of the target table. Includes${topic}as placeholder for the name of the original theme.
${topic}
pk.mode
Specifies where to find the main key for the records that are inserted.
record_key

Support for developments in the scheme

Name DescriptionValues
auto.create
Allows automatic table creation if they do not exist
true / false
auto.evolve
Allows automatic evolution of tables if the outline changes
true / false

Converters

Name DescriptionValues
key.converter
Key converter in Avro format
io.confluent.connect.avro.AvroConverter
value.converter
Value converter in Avro format
io.confluent.connect.avro.AvroConverter
.converter.schema.registry.url
Confluent schema log URLs
http:/<schema_registry_ip>:<schema_registry_port>
.converter.schema.registry.url
Confluent Scheme Log URL for Values
http:/<schema_registry_ip>:<schema_registry_port>

Transformations

Name 
Description
Values
transformations
Transformations applied to data
ExtractTimestamp, InsertTimezone
ExtractTimestamp.type
Type of transformation to add the timestamp to records
org.apache.kafka.connect.transforms.InsertField$Value
transforms.ExtractTimestamp.timestamp.field
Field where the event timestamp will be inserted
timestamp
transforms.InsertTimezone.type
Type of transformation to add the time zone
org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertTimezone.static.field
Static field where the time zone will be inserted
timezone
transforms.InsertTimezone.static.value
Time zone field value
America/Mexico_City


Examples

Example of setup with schema avro and password logging (Confluent Platform)

  1. {
        "value.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",
        "key.converter.schema.registry.url": "http://<schema_registry_ip>:<schema_registry_port>",
    "name": "<connector_name>",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "transforms": "ExtractTimestamp", "topics": "<topic_name>", "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.ExtractTimestamp.timestamp.field": "timestamp", "connection.url": "jdbc:clickhouse://<host/ip>:<port>/<database>", "connection.user": "<username_clickhouse>", "connection.password": "<password_clickhouse>", "insert.mode": "INSERT", "delete.enabled": "true", "table.name.format": "{$topic}", "pk.mode": "record_key", "auto.create": "true", "auto.evolve": "true" }

Example of setup with schema avro  and password logging (Confluent Cloud)

  1. {
     "auto.create": "true",
     "auto.evolve": "true",
     "confluent.custom.schema.registry.auto": "true",
      "connection.password": "<clickhouse_password>",
      "connection.url": "jdbc:clickhouse://<host/ip>:<clickhouse_port>/<database>",
      "connection.user": "<clickhouse_username>",
      "delete.enabled": "true",
      "insert.mode": "INSERT",
      "key.converter": "io.confluent.connect.avro.AvroConverter",
      "pk.mode": "record_key",
      "table.name.format": "${topic}",
      "topics": "<topic>",
      "transforms": "ExtractTimestamp",
      "transforms.ExtractTimestamp.timestamp.field": "timestamp",
      "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
      "value.converter": "io.confluent.connect.avro.AvroConverter"
    }
    • Related Articles

    • Onibex Snowflake Sink Connector Benefits

      The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
    • Onibex Snowflake Sink Connector for Confluent Platform and Cloud

      The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
    • Onibex Databricks JDBC Connector for Confluent Cloud

      JDBC Onibex Connector for Databricks The JDBC Onibex connector for Databricks sends real-time data from Kafka to write into live DeltaLake tables. Idempotent writes can be achieved using upserts. Automatic table creation and schema evolution are ...
    • 3. One Connect - Connection

      3. CONNECTION The One Connect system provides the capability to extract information from SAP and its transfer it to Kafka. This facilitates the creation of a requisite database for analysis, which serves as the foundation for generating financial ...
    • BTP confirmation

      1. Install cloud connector Step 1 Make sure you download Virtual Java Version 8 (either for Windows or Linux) andBinary Installation Files. 2. Connect the cloud connector to BTP Step 1 Cloud connector Sign in to Cloud Connector.Enter the default ...