Onibex Snowflake Sink Connector for Confluent Platform and Cloud

Onibex Snowflake Sink Connector for Confluent Platform and Cloud

The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is supported using the Schema Registry.


Features

  1. Idempotent writes: The default inser.mode is INSERT. If it is configured as UPSERT, the connector will use ascending semantics instead of simple insertion statements. Uppsert's semantics relates to atomic adding a new row or updating the existing row if there is a main violation of the key restriction, providing idempotence.
  2. Schemes: The connector supports the Avro input format for Key and Value. Scheme logging must be able to use a format based on schema logging.
  3. Self-creation of table and column: auto.create and self-evolution are compatible. Missing tables or columns can be created automatically. Table names are created based on the name of the topics.
  4. Raw data: Connector supports sinking raw data into Snowflake when inser.mode is INSERT and pk.mode is none.



Prerequisites

  1. Confluent platform with support for Confluent custom connector.
  2. Confluent Schema Registry running.
  3. Access to a snowflake data store.
  4. Valid snowflake credentials.
  5. Onibex connector installed on Confluent Platform.



Quick Start Guide


1. Installation:
Contact Onibex for more details on installing this component.

2. Select the connector to configure it.




3. Choose the theme you want to insert as a table in Snowflake.


4. Specify the class of connector to manage integration with Snowflake:
  1. com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector

5. Connector configuration details:

  • First name: The name of the connector.
  • Task Max: Maximum number of tasks performed by the connector.
  • Key converter class: Key converter format.
  • Value converter class: Value converter format.
    • Key (Avro): io.confluent.connect.avro.AvroConverter
    • Value (Avro): io.confluent.connect.avro.AvroConverter
    • Key (JSON): io.confluent.connect.json.JsonSchemaConverter
    • Value (JSON): io.confluent.connect.json.JsonSchemaConverter
    • Key (String): apache.kafka.connect.storage.StringConverter
    • Value (cooling): apache.kafka.connect.storage.StringConverter
  • Transformation:Transformations applied to data.
    • ExtractTimestamp
    • InsertTimezone


6. JDBC configuration:

  • Jdbc url: The URL to connect to the Snowflake database, specifying the database, warehouse and outline.
    Example:
  1. jdbc:snowflake://<Account/Server URL>:443/?db=<snowflake_database>&warehouse=<snowflake_wharehouse>&schema=<snowflake_schema>
  • Connection user: snowflake_user.
  • Private password or key or private key or key and password phrase:
    • JDBC connection password: Snowflake password.
    • Private key: It is used for snowflake authentication.
    • Private key and password phrase: Private key with password phrase for authentication.


Configuring authentication with private keys (Optional)

This section provides a step-by-step guide on how to set up authentication using private keys instead of a password for secure snowflake connections. Follow these instructions to generate and configure a private key for your Snowflake user.

a. Generate a pair of private/public keys
Generate a new pair of private/public keys (RSA 2048) on your local machine. If you are using a Unix-based terminal, you can use the following command:

Notes
 1.- openssl genrsa -out rsa_key.p8 2048
 2.- openssl pkcs8 -topk8 -inform PEM -outform PEM -in rsa_key.p8 -out rsa_key.pem -nocrypt
 3.- openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
  1. rsa_key.p8: Private key file (keep this safe and provide in connector settings).
  2. rsa_key.pub: Public key file (used to set up the user in Snowflake).

b. Generate a pair of private/public keys and passphrase

Generate a new pair of private/public keys (RSA 2048) with a phrase on your local machine. If you are using a Unix-based terminal, you can use the following command:
Notes
1.- openssl genrsa -aes256 -passout pass:onibex -out rsa_key.pem 2048
2.- openssl rsa -in rsa_key.pem -passin pass:onibex -pubout -out rsa_key.pub
3.- openssl pkcs8 -topk8 -v1 PBE-SHA1-3DES -in rsa_key.pem -out rsa_key_pkcs8.pem -passin pass:<passphrase> -passout pass:<passphrase>

  1. rsa_key_pkcs8.pem: Private key file (keep this safe and provide in connector settings).
  2. rsa_key.pub: Public key file (used to set up the user in Snowflake).
  3. <passphase>:  Enter your custom passphrase



Configure the snowflake user with public key

Sign in to Snowflake as an administrator and run the following SQL command to associate the public key with the specific user. Replace<Snowflake_user>with your Snowflake username.


Notes
ALTER USER <SNOWFLAKE_USER>
SET RSA_PUBLIC_KEY='<public_key_value>';

  1. Note: Make sure that the private key file (rsa_key.pub) has appropriate reading permissions and is stored securely


7. Insert Settings:

  • Insert mode: Registration insertion mode. Options: Insert, ,Upsert.
  • Batch size: Number of records grouped for SQL transactions.
  • Enable Delete: Allows the deletion of records in the target database. Options: True or False.



Alert
Important note:
  • The upsert  mode depend on the Primary key mode configuration (record_key or record_value) to accurately identify records.

  • The Primary key mode , insert does not use keys to check for duplicates.

  • The Enable deletions para meter depends on the Primary key mode set  of configuration a record_key.



8. Table and schema settings:

  • Table name format: Specify the format of appointment of tables.
  • Primary key mode:Define how the main key is extracted. Options: record_key, none, ,record_value.
  • Primary key fields: Specify the main key to the topic.
  • Create a table: Create tables automatically if they do not exist. Options: True or False.
  • Automatic evolution: Automatically update table outlines if changes occur. Options: True or False.


9. Configuring the schema record:

  • Key converter schema log URL: Confluent schema log URLs.
    Example:
  1. http://<ip_schema_registry>:<port>
  • Value converter scheme record URL: URL of the Confluent Schemes Register for Values.
    Example:
  1. http://<ip_schema_registry>:<port>
  1. Transformations:
  1. ExtractTimestamp.type:Add a timestamp to the records.
    Example: org.apache.kafka.connect.transforms.InsertField$value
  2. transformes.ExtractTimestamp.timestamp.field:Field where the timestamp of the event will be inserted.
    Example:timestamp
  3. transforms.InsertTimezone.type:Adds a time zone field to the records.
    Example: org.apache.kafka.connect.transforms.InsertField$value
  4. transforms.InsertTimezone.static.field:Static field for time zone.
    Example: timezone
  5. transforms.InsertTimezone.static.value:Time zone field value.
    Example:America/Mexico_City




Limitations

  1. Review the limitations and capabilities of the Snowflake JDBC controller.
  1. The snowflake user must be without MFA authentication.
  1. Column self-creation does not support Generated all communications expression. The default value for overriding columns is Null.

Data supported for the connector

Length

DataType

Int8

Smallint

Int16

Smallint

Int32

Integrator

Int64

Bigint

Float32

Float

Float64

Double

Boolean

Boolean

Narrow

Primary key ->(English).64)
Teld -> varchar

Bytes

Pornario

Null

Null



Configuration properties

Common

Name 
DescriptionValues
nameName of connector<name_connector>
connector.classSpecifies the kind of connector that integration will handle integration with Snowflakecom.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector
tasks.max
Define the maximum number of tasks that the connector will run
Positive integer value > 1
topics 
List of topics to be consumed by this connector
<Topic_Name>

Connection

Name
Description
Values
connection.url
JDBC URL to connect to the Snowflake database, specifying the database, warehouse and outline
dbc:snowflake://<Account/Server URL>:443/?db=<snowflake_database>&warehouse<snowflake_wharehouse>=&schema=<snowflake_schema>
connection.user
Snowflake user
<snowflake_user>
connection.password
Snowflake Password
<Password>
connection.privateKey
Private key used for snowflake authentication
<PrivateKey>
connection.privateKeyPassphrase
Password phrase for private key used in snowflake authentication.
<PrivateKeyPassphrase>


Transaction

Name
Description
Values
batch.size
Specifies the number of records to group into a single SQL transaction, when possible.
Positive integer value >= 1
inser.mode
The insertion mode for records
insert / upsert 
delete.enabled
Enables the deletion of records in the target database
true / false

Mapping tables

Name
Description
Values
table.name.format
Format string used to define the name of the target table. Includes${topic} as placeholder for the name of the original theme.
${topic}
pk.mode
Specifies where to find the main key for the records that are inserted.
record_key

Support for developments in the scheme

Name 
Description
Values
auto.create
Allows automatic table creation if they do not exist
true/ false
auto.evolve
Allows automatic evolution of tables if the outline changes
true/ false

Converters

Name 
Description
Values
key.converter
Key converter in Avro format
io.confluent.connect.avro.AvroConverter
value.converter
Value converter in Avro format
io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url
Confluent schema log URLs for Keys
http://<schema_registry_ip>:<port>
value.converter.schema.registry.url
Confluent Scheme Log URL for Values
http://<schema_registry_ip>:<port>

Transformations

Name 
Description
Values

transforms
Transformations applied to data
ExtractTimestamp, InsertTimezone
transforms.ExtractTimestamp.type
Type of transformation to add the timestamp to records
org.apache.kafka.connect.transforms.InsertField$Value
transforms.ExtractTimestamp.timestamp.field
Field where the event timestamp will be inserted
timestamp
transforms.InsertTimezone.type
Type of transformation to add the time zone
org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertTimezone.static.field
Static field where the time zone will be inserted
timezone
transforms.InsertTimezone.static.value
Time zone field value
America/Mexico_City




Examples:

Example of setup with schema and password logging (Confluent Platform)

Notes
{
  "name": "<connector_name>",
  "config": {
    "value.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "key.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "transforms": "ExtractTimestamp, InsertTimezone",
    "topics": "<Topic_Name>",
    "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.ExtractTimestamp.timestamp.field": "timestamp",
    "transforms.InsertTimezone.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertTimezone.static.field": "timezone",
    "transforms.InsertTimezone.static.value": "<value_time_zone",
    "connection.url": "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>",
    "connection.user": "<snowflake_user>",
    "connection.password": "<snowflake_password>",
    "insert.mode": "UPSERT",
    "delete.enabled": "true",
    "table.name.format": "${topic}",
    "pk.mode": "record_key",
    "auto.create": "true",
    "auto.evolve": "true"
  }
}

Example of configuration with schema logging and private key (Confluent Platform)

Notes
 {
"name": "<connector_name>",
  "config": {
    "value.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "key.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "transforms": "ExtractTimestamp, InsertTimezone",
    "topics": "<Topic_Name>",
    "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.ExtractTimestamp.timestamp.field": "timestamp",
    "transforms.InsertTimezone.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertTimezone.static.field": "timezone",
    "transforms.InsertTimezone.static.value": "<value_time_zone",
    "connection.url": "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>",
    "connection.user": "<User_snow>",
    "connection.privateKey": "<Private_key>",
    "insert.mode": "UPSERT",
    "delete.enabled": "true",
    "table.name.format": "${topic}",
    "pk.mode": "record_key",
    "auto.create": "true",
    "auto.evolve": "true"
  }
}

Configuration example with schema registry, private key and passphrase (Confluent Platform)

Notes
 {
"name": "<connector_name>",
  "config": {
    "value.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "key.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "transforms": "ExtractTimestamp, InsertTimezone",
    "topics": "<Topic_Name>",
    "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.ExtractTimestamp.timestamp.field": "timestamp",
    "transforms.InsertTimezone.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertTimezone.static.field": "timezone",
    "transforms.InsertTimezone.static.value": "<value_time_zone",
    "connection.url": "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>",
    "connection.user": "<User_snow>",
    "connection.privateKey": "<Private_key>",
    "connection.privateKeyPassphrase": "<Private_Passphrase>",
    "insert.mode": "UPSERT",
    "delete.enabled": "true",
    "table.name.format": "${topic}",
    "pk.mode": "record_key",
    "auto.create": "true",
    "auto.evolve": "true"
  }
}

Configuration example with schema registry and password  (Confluent Cloud)


Notes
 {
"auto.create": "true",
  "auto.evolve": "true",
  "confluent.custom.schema.registry.auto": "true",
  "connection.maximumPoolSize": "1",
  "connection.url":  "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>",
  "connection.user": "<snowflake_user>",
 "connection.password": "<snowflake_password>",
  "delete.enabled": "true",
  "insert.mode": "INSERT",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "pk.mode": "record_key",
  "table.name.format": "${topic}",
  "topics": "<topic_name>",
  "value.converter": "io.confluent.connect.avro.AvroConverter"
}

Configuration example with schema registry and private key  (Confluent Cloud)


Notes
 {
"auto.create": "true",
  "auto.evolve": "true",
  "confluent.custom.schema.registry.auto": "true",
  "connection.authentication": "SNOWFLAKE_JWT",
  "connection.maximumPoolSize": "1",
  "connection.privateKey": "<private_key>",
  "connection.url":  "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>",
  "connection.user": "<snowflake_user>",
  "delete.enabled": "true",
  "insert.mode": "INSERT",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "pk.mode": "record_key",
  "table.name.format": "${topic}",
  "topics": "<topic_name>",
  "value.converter": "io.confluent.connect.avro.AvroConverter"
}

Configuration example with schema registry, private key and passphrase (Confluent Cloud)

Notes
 {
"auto.create": "true",
  "auto.evolve": "true",
  "confluent.custom.schema.registry.auto": "true",
  "connection.authentication": "SNOWFLAKE_JWT",
  "connection.maximumPoolSize": "1",
  "connection.privateKey": "<private_key>",
  "connection.privateKeyPassphrase": "<passphrase>",
  "connection.url":  "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>",
  "connection.user": "<snowflake_user>",
  "delete.enabled": "true",
  "insert.mode": "INSERT",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "pk.mode": "record_key",
  "table.name.format": "${topic}",
  "topics": "<topic_name>",
  "value.converter": "io.confluent.connect.avro.AvroConverter"
}

Configuration with string values without schema


Topic Schema for Snowflake Integration

When ingesting a `String` type topic from Confluent into Snowflake, it’s essential to structure the target Snowflake table to align with the message properties. Below is the structure and explanation of each field as they appear in Snowflake, following the ingestion.


Snowflake Table Schema for String Topics

Each Kafka message is inserted into Snowflake with the following table structure. This setup ensures proper parsing and interpretation of each field:



Field field
Type
Ordinal
Description

raw_message_header
Varchar
Represents kafka message headers in text format
raw_message_info
Varchar
1
Additional information or metadata about the message; it usually contains identification data or type of data
raw_message_key
Varchar
2
Stores the theme message key in text format, helping inrecord search and join
raw_message_timestamp
Number
5
Timestamp of the message, stored as a number to facilitate temporary order and filtering
raw_message_value
Varchar
Main content of the message in 'String' formatting, representing the body of the message


Field descriptions

1. raw_message_header (Varchar): Stores any information in the header accompanying the subject message. This may include contextual data such as event type, partition identifier, or additional metadata needed to interpret the message.

2. raw_message_info (Varchar): It has additional information or metadata related to the message. Depending on the connector settings, this could include application-specific identifiers or additional tags that classify the message.

3. raw_message_key (Varchar): Contains the message key in 'String' format. This field is essential for the unique identification of messages or operations such as 'Upsert' because it serves to uniquely identify each record at the Snowflake table.

4. raw_message_timestamp (Number): Stores the message's timestamp in numerical format. Useful for audits, orders and temporary filtering in Snowflake, allowing queries based on when the message was sent or processed.

5. raw_message_value (Varchar): The main field that stores the content or value of the message in 'String' format. It represents the body of the message and contains the data transmitted from Confluent Platform.

Configuration guidelines


To ensure that the connector ingests and stores the data according to this schema in Snowflake:
  1. Configure the connector to map and transform these fields as specified to match the snowflake table structure.
  2. Make sure that the snowflake table has the same column names and data types as outlined above to prevent schema mismatches or type errors.
  3. Verify the data structuring on the Confluent platform so that each topic message is properly divided into 'heading' , 'key', ,seal of time', and 'value' to ingestion in snowflake.

This configuration provides a standardized approach to storing topic messages in Snowflake, allowing for optimized real-time data queries and analysis.

Examples

Example of configuration with string values and password  without schema registry

Notes
 {
"name": "<connector_name>",
  "config": {
    "value.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "key.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "name": "prueba_snow",
    "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
    "tasks.max": "5",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "topics": "<Topic_Name>",
    "connection.url": "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>",
    "connection.user": "<User_snow>",
    "connection.password": "<Password>",
    "connection.maximumPoolSize": "5",
    "insert.mode": "UPSERT",
    "delete.enabled": "false",
    "batch.size": "10",
    "table.name.format": "${topic}",
    "pk.mode": "none",
    "auto.create": "true",
    "auto.evolve": "false"
  }
}

Example of configuration with string values and private key without schema registry


Notes
{
  "name": "<connector_name>",
  "config": {
    "value.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "key.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "name": "prueba_snow",
    "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
    "tasks.max": "5",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "topics": "<Topic_Name>",
    "connection.url": "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>",
    "connection.user": "<User_snow>",
    "connection.privateKey": "<Private_key>",
    "connection.maximumPoolSize": "5",
    "insert.mode": "UPSERT",
    "delete.enabled": "false",
    "batch.size": "10",
    "table.name.format": "${topic}",
    "pk.mode": "none",
    "auto.create": "true",
    "auto.evolve": "false"
  }
}

Example of configuration with string values, private key and password phrase without schema registry


Notes
 {
"name": "<connector_name>",
  "config": {
    "value.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "key.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "name": "prueba_snow",
    "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
    "tasks.max": "5",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "topics": "<Topic_Name>",
    "connection.url": "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>",
    "connection.user": "<User_snow>",
    "connection.privateKey": "<Private_key>",
    "connection.privateKeyPassphrase": "<Private_Passphrase>",
    "connection.maximumPoolSize": "5",
    "insert.mode": "UPSERT",
    "delete.enabled": "false",
    "batch.size": "10",
    "table.name.format": "${topic}",
    "pk.mode": "none",
    "auto.create": "true",
    "auto.evolve": "false"
  }
}




    • Related Articles

    • Onibex Snowflake Sink Connector Benefits

      The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
    • Onibex Databricks JDBC Connector for Confluent Cloud

      JDBC Onibex Connector for Databricks The JDBC Onibex connector for Databricks sends real-time data from Kafka to write into live DeltaLake tables. Idempotent writes can be achieved using upserts. Automatic table creation and schema evolution are ...
    • Performance Between Snowflake Connector vs Onibex Connector

      Introduction This article aims to compare the performance of two different connectors used to send data to Snowflake: Snowflake’s native connector and a custom connector developed by Onibex. The tests were conducted on both Confluent Cloud and ...
    • Onibex Clickhouse Sink Connector

      The Onibex Clickhouse JDBC connector sends real-time data from Kafka to write to Tables based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and auto-evolution is supported using the ...
    • One Connect Cloud Deployment

      Prerequisites Download the required .zip folders attached at the end of the document: sql.zip one-connect.zip kafka-compose.zip Requirements for the Virtual Machine System: Linux Architecture: 64-bit processors (x86_64) support Instance ...