Onibex Snowflake Sink Connector for Confluent Platform and Cloud

Onibex Snowflake Sink Connector for Confluent Platform and Cloud

The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is supported using the Schema Registry.


  1. Idempotent writes: The default inser.mode is Insert. If it is configured as Uppsert, the connector will use ascending semantics instead of simple insertion statements. Uppsert's semantics relates to atomic adding a new row or updating the existing row if there is a main violation of the key restriction, providing idempotence.
  2. Schemes: The connector supports the Avro input format for Key and Value. Scheme logging must be able to use a format based on schema logging.
  3. Self-creation of table and column: auto.create and self-evolution are compatible. Missing tables or columns can be created automatically. Table names are created based on the name of the themes.
  4. Raw data: Connector supports sinking raw data into Snowflake when inser.mode is Insert and pk.mode It's nothing.


  1. Confluent platform with support for Confluent custom connector.
  2. Registration of Confluent schemes in progress.
  3. Access to a snowflake data store.
  4. Valid snowflake credentials.
  5. Onibex connector installed on Confluent Platform.

Quick Start Guide

1. Installation:
Contact Onibex for more details on installing this component.

2. Select the connector to configure it.

3. Choose the theme you want to insert as a table in Snowflake.

4. Specify the class of connector to manage integration with Snowflake:
  1. com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector

5. Connector configuration details:

  • First name:The name of the connector.
  • Task Max:Maximum number of tasks performed by the connector.
  • Key converter class:Key converter format.
  • Value converter class:Value converter format.
    • Key (Avro):io.confluent.connect.avro.
    • Value (Avro):io.confluent.connect.avro.
    • Key (JSON):io.confluent.connect.json.JsonSchemaConverter
    • Value (JSON):io.confluent.connect.json.JsonSchemaConverter
    • Key (String):.apache.kafka.connect.storage.StringConverter
    • Value (cooling):.apache.kafka.connect.storage.StringConverter
  • Transformation:Transformations applied to data.
    • ExtractTimestamp
    • InsertTimezone

6. JDBC configuration:

  • Jdbc url: The URL to connect to the Snowflake database, specifying the database, warehouse and outline.
  1. jdbc:snowflake://:443/?db=&warehouse=&schema=
  • Connection user:Username.
  • Private password or key or private key or key and password phrase:
    • JDBC connection password:Snowflake password.
    • Private key:It is used for snowflake authentication.
    • Private key and password phrase:Private key with password phrase for authentication.

7. Insert Settings:

  • Insert mode:Registration insertion mode. Options: Insert, ,Uppsert, ,Update.
  • Batch size:Number of records grouped for SQL transactions.
  • Enable Delete:Allows the deletion of records in the target database. Options:TrueorFalse.

Important note:
  • The upsert and update update modes depend on the Primary key mode configuration (record_keyorrecord_value) to accurately identify records.

  • The Primary key mode ,insert does not use keys to check for duplicates.

  • The Enable deletions para meter depends on the Primary key mode set  of configuration a record_key.

8. Table and schema settings:

  • Table name format: Specify the format of appointment of tables.
  • Primary key mode:Define how the main key is extracted. Options:record_key, ,none, ,record_value.
  • Primary key fields:Specify the main key to the topic.
  • Create a car:Create tables automatically if they do not exist. Options:TrueorFalse.
  • Automatic evolution:Automatically update table outlines if changes occur. Options:TrueorFalse.

9. Configuring the schema record:

  • Key converter schema log URL: Confluent schema log URLs.
  1. http://<ip_schema_registry>:<port>
  • Value converter scheme record URL: URL of the Confluent Schemes Register for Values.
  1. http://<ip_schema_registry>:<port>
  1. Transformations:
  1. ExtractTimestamp.type:Add a timestamp to the records.
    Example: org.apache.kafka.connect.transforms.InsertField
  2. transformes.ExtractTimestamp.timestamp.field:Field where the timestamp of the event will be inserted.
  3. transforms.InsertTimezone.type:Adds a time zone field to the records.
    Example: org.apache.kafka.connect.transforms.InsertField
  4. transforms.InsertTimezone.static.field:Static field for time zone.
    Example:time zone
  5. transforms.InsertTimezone.static.value:Time zone field value.


  1. Review the limitations and capabilities of the Snowflake JDBC controller.
  1. Table self-creation does not support "PORTIONED by" and Primary school clauses for the column table. If partitions and a primary key are needed to improve table performance, Alter table must be executed manually.
  1. Column self-creation does not support Generated all communications expression. The default value for overriding columns is Null.

Dataratessupportedfor thetheconnector


















Primary key ->(English).64)
Teld -> varchar





Configuration properties


Name Name Name Name
nameName of connector<name_connector>
connector.classSpecifies the kind of connector that integration will handle integration with Snowflakecom.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector
Define the maximum number of tasks that the connector will run
Positive integer value > 1
topics topics topics topics topics
List of topics to be consumed by this connector


Name Name Name Name
JDBC URL to connect to the Snowflake database, specifying the database, warehouse and outline
Snowflake user
Snowflake Password
Private key used for snowflake authentication
Password phrase for private key used in snowflake authentication.


Name Name Name Name
Specifies the number of records to group into a single SQL transaction, when possible.
Positive integer value > 1
The insertion mode for records
insert / upsert / upsert
Edeletion of records in the target database
truth / false

Mapping tables

Format string used to define the name of the target table. Includes${topic}as placeholder for the name of the original theme.
Specifies where to find the main key for the records that are inserted.

Support for developments in the scheme

Allows automatic table creation if they do not exist
truth / false
Allows automatic evolution of tables if the outline changes
truth / false


Key converter in Avro format
Value converter in Avro format
Confluent schema log URLs
Confluent Scheme Log URL for Values



Transformations applied to data
ExtractTimestamp, InsertTimezone
Type of transformation to add the timestamp to records
Field where the event timestamp will be inserted
Type of transformation to add the time zone
Static field where the time zone will be inserted
time zone
Time zone field value


Example of setup with schema and password logging

  1. ''json'
  2. { { {
  3. "name": "",
  4. "configure": {
  5. "value.converter.schema.registry.url": "http://:",
  6. "key.converter.schema.registry.url": "http://:",
  7. "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
  8. "tasks.max": "1",
  9. "key.converter": "io.confluent.connect.avro.
  10. "value.converter": "io.confluent.connect.avro
  11. "transforms": "ExtractTimestamp, InsertTimezone",
  12. "themes": "",
  13. "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  14. "transforms.ExtractTimestamp.timestamp.field": "timestamp",
  15. "transforms.InsertTimezone.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  16. "transforms.InsertTimezone.static.field": "Time zone",
  17. "transforms.InsertTimezone.static.value": "<value_time_zone",
  18. "connection.url": "jdbc:snowflake://:443/?db=&warehouse=&schema=",
  19. "connection.user": "",
  20. "driving.password":"
  21. "insert.mode": "UPSERT",
  22. "delete.enabled": "true",
  23. "table.name.format": "${topic}",
  24. "pk.mode": "record_key",
  25. "auto.create": "true",
  26. "auto.evolve": "true"
  27. }
  28. }
  29. ''''

Example of configuration with schema logging and private key

  1. ''json'
  2. { { {
  3. "name": "",
  4. "configure": {
  5. "value.converter.schema.registry.url": "http://:",
  6. "key.converter.schema.registry.url": "http://:",
  7. "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
  8. "tasks.max": "1",
  9. "key.converter": "io.confluent.connect.avro.
  10. "value.converter": "io.confluent.connect.avro
  11. "transforms": "ExtractTimestamp, InsertTimezone",
  12. "themes": "",
  13. "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  14. "transforms.ExtractTimestamp.timestamp.field": "timestamp",
  15. "transforms.InsertTimezone.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  16. "transforms.InsertTimezone.static.field": "Time zone",
  17. "transforms.InsertTimezone.static.value": "<value_time_zone",
  18. "connection.url": "jdbc:snowflake://:443/?db=&warehouse=&schema=",
  19. "connection.user": "",
  20. "connection.privateKey": "
  21. "insert.mode": "UPSERT",
  22. "delete.enabled": "true",
  23. "table.name.format": "${topic}",
  24. "pk.mode": "record_key",
  25. "auto.create": "true",
  26. "auto.evolve": "true"
  27. }
  28. }
  29. ''''

Configuration example with schema log, private key and password phrase

  1. ''json'
  2. { { {
  3. "name": "",
  4. "configure": {
  5. "value.converter.schema.registry.url": "http://:",
  6. "key.converter.schema.registry.url": "http://:",
  7. "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
  8. "tasks.max": "1",
  9. "key.converter": "io.confluent.connect.avro.
  10. "value.converter": "io.confluent.connect.avro
  11. "transforms": "ExtractTimestamp, InsertTimezone",
  12. "themes": "",
  13. "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  14. "transforms.ExtractTimestamp.timestamp.field": "timestamp",
  15. "transforms.InsertTimezone.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  16. "transforms.InsertTimezone.static.field": "Time zone",
  17. "transforms.InsertTimezone.static.value": "<value_time_zone",
  18. "connection.url": "jdbc:snowflake://:443/?db=&warehouse=&schema=",
  19. "connection.user": "",
  20. "connection.privateKey": "
  21. "connection.privateKeyPassphrase": "",
  22. "insert.mode": "UPSERT",
  23. "delete.enabled": "true",
  24. "table.name.format": "${topic}",
  25. "pk.mode": "record_key",
  26. "auto.create": "true",
  27. "auto.evolve": "true"
  28. }
  29. }
  30. ''''

Configuration with string values without outline

Thematic outline for snowflake integration

By eating aChaintype subjectfrom Confluent to the snowflake, it is essential to structure the target snowflake table in order to align with the properties of the message. Below is the structure and explanation of each field as they appear in Snowflake after ingestion.

Snowflake table outline for string themes

Each theme message is inserted into Snowflake using the following table structure. This configuration ensures appropriate analysis and interpretation of each field:

Field field

4 4 4
Represents theme message headers in text format
1 1 1
Additional information or metadata about the message; it usually contains identification data or type of data
2 2 2
Stores the theme message key in text format, helping inrecord search and join
Timestamp of the message, stored as a number to facilitate temporary order and filtering
3 3 3
Main content of the message in'String'formatting, representing the body of the message

Field descriptions

1. raw_message_header (Varchar): Stores any information in the header accompanying the subject message. This may include contextual data such as event type, partition identifier, or additional metadata needed to interpret the message.

2. raw_message_info (Varchar): It has additional information or metadata related to the message. Depending on the connector settings, this could include application-specific identifiers or additional tags that classify the message.

3. raw_message_key (Varchar): Contains the message key in'String'format. This field is essential for the unique identification of messages or operations such as'Upsert'because it serves to uniquely identify each record at the Snowflake table.

4. raw_message_timestamp (Number): Stores the message's timestamp in numerical format. Useful for audits, orders and temporary filtering in Snowflake, allowing queries based on when the message was sent or processed.

5. raw_message_value (Varchar): The main field that stores the content or value of the message in'Stringe'Hello world!format. It represents the body of the message and contains the data transmitted from Confluent Platform.

Configuration guidelines

To ensure that the connector swallows and stores the data according to this scheme in Snowflake:
  1. Configure the connector to assign and transform these fields as specified to match the snowflake table structure.
  2. Make sure that the snowflake table has the same column names and data types as described above to avoid outline mismatches or type errors.
  3. Verify the data structuring on the Confluent platform so that each topic message is properly divided into'heading', ,"key', ,seal of time', and'value'to ingestion in snowflake.

This configuration provides a standardized approach to storing thematic messages in Snowflake, allowing for optimized real-time data queries and analysis.


Example of configuration with string and password values without outline record

  1. ''json'
  2. { { {
  3. "name": "",
  4. "configure": {
  5. "value.converter.schema.registry.url": "http://:",
  6. "key.converter.schema.registry.url": "http://:",
  7. "name": "test_snow",
  8. "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
  9. "tasks.max": "5",
  10. "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  11. "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  12. "themes": "",
  13. "connection.url": "jdbc:snowflake://:443/?db=&warehouse=&schema=",
  14. "connection.user": "",
  15. "driving.password":"
  16. "connection.maximumPoolSize": "5",
  17. "insert.mode": "UPSERT",
  18. "delete.enabled": "false",
  19. "batch.size": "10",
  20. "table.name.format": "${topic}",
  21. "pk.mode": "none",
  22. "auto.create": "true",
  23. "auto.evolve": "false"
  24. }
  25. }
  26. ''''

Example of configuration with string values and private key without outline record

  1. ''json'
  2. { { {
  3. "name": "",
  4. "configure": {
  5. "value.converter.schema.registry.url": "http://:",
  6. "key.converter.schema.registry.url": "http://:",
  7. "name": "test_snow",
  8. "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
  9. "tasks.max": "5",
  10. "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  11. "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  12. "themes": "",
  13. "connection.url": "jdbc:snowflake://:443/?db=&warehouse=&schema=",
  14. "connection.user": "",
  15. "connection.privateKey": "
  16. "connection.maximumPoolSize": "5",
  17. "insert.mode": "UPSERT",
  18. "delete.enabled": "false",
  19. "batch.size": "10",
  20. "table.name.format": "${topic}",
  21. "pk.mode": "none",
  22. "auto.create": "true",
  23. "auto.evolve": "false"
  24. }
  25. }
  26. ''''

Example of configuration with string values, private key and password phrase without outline record

  1. ''json'
  2. { { {
  3. "name": "",
  4. "configure": {
  5. "value.converter.schema.registry.url": "http://:",
  6. "key.converter.schema.registry.url": "http://:",
  7. "name": "test_snow",
  8. "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
  9. "tasks.max": "5",
  10. "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  11. "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  12. "themes": "",
  13. "connection.url": "jdbc:snowflake://:443/?db=&warehouse=&schema=",
  14. "connection.user": "",
  15. "connection.privateKey": "
  16. "connection.privateKeyPassphrase": "",
  17. "connection.maximumPoolSize": "5",
  18. "insert.mode": "UPSERT",
  19. "delete.enabled": "false",
  20. "batch.size": "10",
  21. "table.name.format": "${topic}",
  22. "pk.mode": "none",
  23. "auto.create": "true",
  24. "auto.evolve": "false"
  25. }
  26. }
  27. ''''

Configuring authentication with private keys

This section provides a step-by-step guide on how to set up authentication using private keys instead of a password for secure snowflake connections. Follow these instructions to generate and configure a private key for your Snowflake user.

a. Generate a pair of private/public keys
Generate a new pair of private/public keys (RSA 2048) on your local machine. If you are using a Unix-based terminal, you can use the following command:

  1. ''.
  2. opensl genrsa -out rsa_key.p8 2048
  3. opensl pkcs8 -topk8 -inform PEM -outform PEM -outform PEM -out rsa_key.pem -nocrypt
  4. opensl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
  5. ''''

  1. rsa_key.p8: Private key file (keep this safe and provide in connector settings).
  2. rsa_key.pub: Public key file (used to set up the user in Snowflake).

b. Generate a pair of private/public keys and password phrase

Generate a new pair of private/public keys (RSA 2048) with a phrase on your local machine. If you are using a Unix-based terminal, you can use the following command:
  1. ''.
  2. opensl genrsa -aes256 -out rsa_key.p8 2048
  3. opensl pkcs8 -topk8 -inform PEM -outform PEM -outform PEM -out rsa_key.p8 -out rsa_key.pem
  4. <insert-your-Passphrase>
  5. opensl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
  6. <insert-your-Passphrase>
  7. ''''
  1. rsa_key.p8: Private key file (keep this safe and provide in connector settings).
  2. rsa_key.pub: Public key file (used to set up the user in Snowflake).

Configure the snowflake user with public key

Sign in to Snowflake as an administrator and run the following SQL command to associate the public key with the specific user. Replace<Snowflake_user>with your Snowflake username.

  1. ''sql
  2. User user
  3. Set rsa_public_key=';
  4. ''''
  1. Note:Make sure that the private key file (rsa_key.p8) has appropriate reading permissions and is stored securely

    • Related Articles

    • Onibex Snowflake Sink Connector Benefits

      The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
    • Onibex Databricks JDBC Connector for Confluent Cloud

      JDBC Onibex Connector for Databricks The JDBC Onibex connector for Databricks sends real-time data from Kafka to write into live DeltaLake tables. Idempotent writes can be achieved using upserts. Automatic table creation and schema evolution are ...
    • Onibex Clickhouse Sink Connector

      The Onibex Clickhouse JDBC connector sends real-time data from Kafka to write to Tables based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and auto-evolution is supported using the ...
    • One Connect Cloud Deployment

      Prerequisites Download the required .zip folders attached at the end of the document: sql.zip one-connect.zip kafka-compose.zip Requirements for the Virtual Machine System: Linux Architecture: 64-bit processors (x86_64) support Instance ...
    • BTP confirmation

      1. Install cloud connector Step 1 Make sure you download Virtual Java Version 8 (either for Windows or Linux) andBinary Installation Files. 2. Connect the cloud connector to BTP Step 1 Cloud connector Sign in to Cloud Connector.Enter the default ...