Onibex Snowflake Sink Connector for Confluent Platform

Onibex Snowflake Sink Connector for Confluent Platform

The JDBC snowflake connector sends real-time data from Confluent Platform for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is supported using the Schema Registry.


Features

  1. Idempotent writes: The default inser.mode is Insert. If it is configured as Uppsert, the connector will use ascending semantics instead of simple insertion statements. Uppsert's semantics relates to atomic adding a new row or updating the existing row if there is a main violation of the key restriction, providing idempotence.
  2. Schemes: The connector supports the Avro input format for Key and Value. Scheme logging must be able to use a format based on schema logging.
  3. Self-creation of table and column: auto.create and self-evolution are compatible. Missing tables or columns can be created automatically. Table names are created based on the name of the themes.
  4. Raw data: Connector supports sinking raw data into Snowflake when inser.mode is Insert and pk.mode It's nothing.



Prerequisites

  1. Confluent platform with support for Confluent custom connector.
  2. Registration of Confluent schemes in progress.
  3. Access to a snowflake data store.
  4. Valid snowflake credentials.
  5. Onibex connector installed on Confluent Platform.



Quick Start Guide


1. Installation:
Contact Onibex for more details on installing this component.

2. Select the connector to configure it.




3. Choose the theme you want to insert as a table in Snowflake.


4. Specify the class of connector to manage integration with Snowflake:
  1. com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector

5. Connector configuration details:

  • First name:The name of the connector.
  • Task Max:Maximum number of tasks performed by the connector.
  • Key converter class:Key converter format.
  • Value converter class:Value converter format.
    • Key (Avro):io.confluent.connect.avro.
    • Value (Avro):io.confluent.connect.avro.
    • Key (JSON):io.confluent.connect.json.JsonSchemaConverter
    • Value (JSON):io.confluent.connect.json.JsonSchemaConverter
    • Key (String):.apache.kafka.connect.storage.StringConverter
    • Value (cooling):.apache.kafka.connect.storage.StringConverter
  • Transformation:Transformations applied to data.
    • ExtractTimestamp
    • InsertTimezone


6. JDBC configuration:

  • Jdbc url: The URL to connect to the Snowflake database, specifying the database, warehouse and outline.
    Example:
  1. jdbc:snowflake://:443/?db=&warehouse=&schema=
  • Connection user:Username.
  • Private password or key or private key or key and password phrase:
    • JDBC connection password:Snowflake password.
    • Private key:It is used for snowflake authentication.
    • Private key and password phrase:Private key with password phrase for authentication.



7. Insert Settings:

  • Insert mode:Registration insertion mode. Options: Insert, ,Uppsert, ,Update.
  • Batch size:Number of records grouped for SQL transactions.
  • Enable Delete:Allows the deletion of records in the target database. Options:TrueorFalse.


Important note:
  • The upsert and update update modes depend on the Primary key mode configuration (record_keyorrecord_value) to accurately identify records.

  • The Primary key mode ,insert does not use keys to check for duplicates.

  • The Enable deletions para meter depends on the Primary key mode set  of configuration a record_key.


8. Table and schema settings:

  • Table name format: Specify the format of appointment of tables.
  • Primary key mode:Define how the main key is extracted. Options:record_key, ,none, ,record_value.
  • Primary key fields:Specify the main key to the topic.
  • Create a car:Create tables automatically if they do not exist. Options:TrueorFalse.
  • Automatic evolution:Automatically update table outlines if changes occur. Options:TrueorFalse.


9. Configuring the schema record:

  • Key converter schema log URL: Confluent schema log URLs.
    Example:
  1. http://<ip_schema_registry>:<port>
  • Value converter scheme record URL: URL of the Confluent Schemes Register for Values.
    Example:
  1. http://<ip_schema_registry>:<port>
  1. Transformations:
  1. ExtractTimestamp.type:Add a timestamp to the records.
    Example: org.apache.kafka.connect.transforms.InsertField
  2. transformes.ExtractTimestamp.timestamp.field:Field where the timestamp of the event will be inserted.
    Example:timestamp
  3. transforms.InsertTimezone.type:Adds a time zone field to the records.
    Example: org.apache.kafka.connect.transforms.InsertField
  4. transforms.InsertTimezone.static.field:Static field for time zone.
    Example:time zone
  5. transforms.InsertTimezone.static.value:Time zone field value.
    Example:America/Mexico_City




Limitations

  1. Review the limitations and capabilities of the Snowflake JDBC controller.
  1. Table self-creation does not support "PORTIONED by" and Primary school clauses for the column table. If partitions and a primary key are needed to improve table performance, Alter table must be executed manually.
  1. Column self-creation does not support Generated all communications expression. The default value for overriding columns is Null.

Dataratessupportedfor thetheconnector

Length

DataType

Int8

Smallint

Int16

Smallint

Int32

Integrator

Int64

Bigint

Float32

Float

Float64

Double

Boolean

Boolean

Narrow

Primary key ->(English).64)
Teld -> varchar

Bytes

Pornario

Null

Null



Configuration properties

Common

Name Name Name Name
DescriptionValues
nameName of connector<name_connector>
connector.classSpecifies the kind of connector that integration will handle integration with Snowflakecom.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector
tasks.max
Define the maximum number of tasks that the connector will run
Positive integer value > 1
topics topics topics topics topics
List of topics to be consumed by this connector
<Topic_Name>

Connection

Name Name Name Name
Description
Values
connection.url
JDBC URL to connect to the Snowflake database, specifying the database, warehouse and outline
dbc:snowflake://:443/?db=&warehouse=&schema=
plug.user
Snowflake user
<User_snow>
connection.password
Snowflake Password
<Password>
connection.privateKey
Private key used for snowflake authentication
<PrivateKey>
connection.privateKeyPasfrasase
Password phrase for private key used in snowflake authentication.
<PrivateKeyPassphrase>


Transaction

Name Name Name Name
Description
Values
batch.size
Specifies the number of records to group into a single SQL transaction, when possible.
Positive integer value > 1
inser.mode
The insertion mode for records
insert / upsert / upsert
delete.enabled
Edeletion of records in the target database
truth / false

Mapping tables

Name Name Name Name
Description
Values
ta ta ta ta table.name.format
Format string used to define the name of the target table. Includes${topic}as placeholder for the name of the original theme.
${topic}
pk.mode
Specifies where to find the main key for the records that are inserted.
forwardingcord_key

Support for developments in the scheme

Name Name Name Name
Description
Values
auto.create
Allows automatic table creation if they do not exist
truth / false
auto.evolve
Allows automatic evolution of tables if the outline changes
truth / false

Converters

Name Name Name Name
Description
Values
key.converter
Key converter in Avro format
io.confluent.connect.avro.
value.converter
Value converter in Avro format
io.confluent.connect.avro.
.converter.schema.registry.url
Confluent schema log URLs
http:/
.converter.schema.registry.url
Confluent Scheme Log URL for Values
http:/

Transformations

Name Name Name Name
Description
Values

transformations
Transformations applied to data
ExtractTimestamp, InsertTimezone
ExtractTimestamp.type
Type of transformation to add the timestamp to records
org.apache.kafka.connect.transforms.InsertField$Value
transformes.ExtractTimestamp.timestamp.field
Field where the event timestamp will be inserted
timestamp
transforms.InsertTimezone.type
Type of transformation to add the time zone
org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertTimezone.static.field
Static field where the time zone will be inserted
time zone
transforms.InsertTimezone.static.value
Time zone field value
America/Mexico_City



Examples:

Example of setup with schema and password logging

  1. ''json'
  2. { { {
  3. "name": "",
  4. "configure": {
  5. "value.converter.schema.registry.url": "http://:",
  6. "key.converter.schema.registry.url": "http://:",
  7. "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
  8. "tasks.max": "1",
  9. "key.converter": "io.confluent.connect.avro.
  10. "value.converter": "io.confluent.connect.avro
  11. "transforms": "ExtractTimestamp, InsertTimezone",
  12. "themes": "",
  13. "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  14. "transforms.ExtractTimestamp.timestamp.field": "timestamp",
  15. "transforms.InsertTimezone.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  16. "transforms.InsertTimezone.static.field": "Time zone",
  17. "transforms.InsertTimezone.static.value": "<value_time_zone",
  18. "connection.url": "jdbc:snowflake://:443/?db=&warehouse=&schema=",
  19. "connection.user": "",
  20. "driving.password":"
  21. "insert.mode": "UPSERT",
  22. "delete.enabled": "true",
  23. "table.name.format": "${topic}",
  24. "pk.mode": "record_key",
  25. "auto.create": "true",
  26. "auto.evolve": "true"
  27. }
  28. }
  29. ''''

Example of configuration with schema logging and private key

  1. ''json'
  2. { { {
  3. "name": "",
  4. "configure": {
  5. "value.converter.schema.registry.url": "http://:",
  6. "key.converter.schema.registry.url": "http://:",
  7. "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
  8. "tasks.max": "1",
  9. "key.converter": "io.confluent.connect.avro.
  10. "value.converter": "io.confluent.connect.avro
  11. "transforms": "ExtractTimestamp, InsertTimezone",
  12. "themes": "",
  13. "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  14. "transforms.ExtractTimestamp.timestamp.field": "timestamp",
  15. "transforms.InsertTimezone.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  16. "transforms.InsertTimezone.static.field": "Time zone",
  17. "transforms.InsertTimezone.static.value": "<value_time_zone",
  18. "connection.url": "jdbc:snowflake://:443/?db=&warehouse=&schema=",
  19. "connection.user": "",
  20. "connection.privateKey": "
  21. "insert.mode": "UPSERT",
  22. "delete.enabled": "true",
  23. "table.name.format": "${topic}",
  24. "pk.mode": "record_key",
  25. "auto.create": "true",
  26. "auto.evolve": "true"
  27. }
  28. }
  29. ''''

Configuration example with schema log, private key and password phrase

  1. ''json'
  2. { { {
  3. "name": "",
  4. "configure": {
  5. "value.converter.schema.registry.url": "http://:",
  6. "key.converter.schema.registry.url": "http://:",
  7. "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
  8. "tasks.max": "1",
  9. "key.converter": "io.confluent.connect.avro.
  10. "value.converter": "io.confluent.connect.avro
  11. "transforms": "ExtractTimestamp, InsertTimezone",
  12. "themes": "",
  13. "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  14. "transforms.ExtractTimestamp.timestamp.field": "timestamp",
  15. "transforms.InsertTimezone.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  16. "transforms.InsertTimezone.static.field": "Time zone",
  17. "transforms.InsertTimezone.static.value": "<value_time_zone",
  18. "connection.url": "jdbc:snowflake://:443/?db=&warehouse=&schema=",
  19. "connection.user": "",
  20. "connection.privateKey": "
  21. "connection.privateKeyPassphrase": "",
  22. "insert.mode": "UPSERT",
  23. "delete.enabled": "true",
  24. "table.name.format": "${topic}",
  25. "pk.mode": "record_key",
  26. "auto.create": "true",
  27. "auto.evolve": "true"
  28. }
  29. }
  30. ''''

Configuration with string values without outline


Thematic outline for snowflake integration


By eating aChaintype subjectfrom Confluent to the snowflake, it is essential to structure the target snowflake table in order to align with the properties of the message. Below is the structure and explanation of each field as they appear in Snowflake after ingestion.


Snowflake table outline for string themes


Each theme message is inserted into Snowflake using the following table structure. This configuration ensures appropriate analysis and interpretation of each field:


Field field
Type
Ordinal
Description

raw_message
Varchar
4 4 4
Represents theme message headers in text format
raw_message
Varchar
1 1 1
Additional information or metadata about the message; it usually contains identification data or type of data
raw_message_key
Varchar
2 2 2
Stores the theme message key in text format, helping inrecord search and join
raw_message_timet
Number
5
Timestamp of the message, stored as a number to facilitate temporary order and filtering
raw_message
Varchar
3 3 3
Main content of the message in'String'formatting, representing the body of the message


Field descriptions

1. raw_message_header (Varchar): Stores any information in the header accompanying the subject message. This may include contextual data such as event type, partition identifier, or additional metadata needed to interpret the message.

2. raw_message_info (Varchar): It has additional information or metadata related to the message. Depending on the connector settings, this could include application-specific identifiers or additional tags that classify the message.

3. raw_message_key (Varchar): Contains the message key in'String'format. This field is essential for the unique identification of messages or operations such as'Upsert'because it serves to uniquely identify each record at the Snowflake table.

4. raw_message_timestamp (Number): Stores the message's timestamp in numerical format. Useful for audits, orders and temporary filtering in Snowflake, allowing queries based on when the message was sent or processed.

5. raw_message_value (Varchar): The main field that stores the content or value of the message in'Stringe'Hello world!format. It represents the body of the message and contains the data transmitted from Confluent Platform.

Configuration guidelines


To ensure that the connector swallows and stores the data according to this scheme in Snowflake:
  1. Configure the connector to assign and transform these fields as specified to match the snowflake table structure.
  2. Make sure that the snowflake table has the same column names and data types as described above to avoid outline mismatches or type errors.
  3. Verify the data structuring on the Confluent platform so that each topic message is properly divided into'heading', ,"key', ,seal of time', and'value'to ingestion in snowflake.

This configuration provides a standardized approach to storing thematic messages in Snowflake, allowing for optimized real-time data queries and analysis.

Examples

Example of configuration with string and password values without outline record

  1. ''json'
  2. { { {
  3. "name": "",
  4. "configure": {
  5. "value.converter.schema.registry.url": "http://:",
  6. "key.converter.schema.registry.url": "http://:",
  7. "name": "test_snow",
  8. "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
  9. "tasks.max": "5",
  10. "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  11. "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  12. "themes": "",
  13. "connection.url": "jdbc:snowflake://:443/?db=&warehouse=&schema=",
  14. "connection.user": "",
  15. "driving.password":"
  16. "connection.maximumPoolSize": "5",
  17. "insert.mode": "UPSERT",
  18. "delete.enabled": "false",
  19. "batch.size": "10",
  20. "table.name.format": "${topic}",
  21. "pk.mode": "none",
  22. "auto.create": "true",
  23. "auto.evolve": "false"
  24. }
  25. }
  26. ''''

Example of configuration with string values and private key without outline record

  1. ''json'
  2. { { {
  3. "name": "",
  4. "configure": {
  5. "value.converter.schema.registry.url": "http://:",
  6. "key.converter.schema.registry.url": "http://:",
  7. "name": "test_snow",
  8. "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
  9. "tasks.max": "5",
  10. "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  11. "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  12. "themes": "",
  13. "connection.url": "jdbc:snowflake://:443/?db=&warehouse=&schema=",
  14. "connection.user": "",
  15. "connection.privateKey": "
  16. "connection.maximumPoolSize": "5",
  17. "insert.mode": "UPSERT",
  18. "delete.enabled": "false",
  19. "batch.size": "10",
  20. "table.name.format": "${topic}",
  21. "pk.mode": "none",
  22. "auto.create": "true",
  23. "auto.evolve": "false"
  24. }
  25. }
  26. ''''

Example of configuration with string values, private key and password phrase without outline record

  1. ''json'
  2. { { {
  3. "name": "",
  4. "configure": {
  5. "value.converter.schema.registry.url": "http://:",
  6. "key.converter.schema.registry.url": "http://:",
  7. "name": "test_snow",
  8. "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
  9. "tasks.max": "5",
  10. "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  11. "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  12. "themes": "",
  13. "connection.url": "jdbc:snowflake://:443/?db=&warehouse=&schema=",
  14. "connection.user": "",
  15. "connection.privateKey": "
  16. "connection.privateKeyPassphrase": "",
  17. "connection.maximumPoolSize": "5",
  18. "insert.mode": "UPSERT",
  19. "delete.enabled": "false",
  20. "batch.size": "10",
  21. "table.name.format": "${topic}",
  22. "pk.mode": "none",
  23. "auto.create": "true",
  24. "auto.evolve": "false"
  25. }
  26. }
  27. ''''



Configuring authentication with private keys

This section provides a step-by-step guide on how to set up authentication using private keys instead of a password for secure snowflake connections. Follow these instructions to generate and configure a private key for your Snowflake user.


a. Generate a pair of private/public keys
Generate a new pair of private/public keys (RSA 2048) on your local machine. If you are using a Unix-based terminal, you can use the following command:

  1. ''.
  2. opensl genrsa -out rsa_key.p8 2048
  3. opensl pkcs8 -topk8 -inform PEM -outform PEM -outform PEM -out rsa_key.pem -nocrypt
  4. opensl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
  5. ''''

  1. rsa_key.p8: Private key file (keep this safe and provide in connector settings).
  2. rsa_key.pub: Public key file (used to set up the user in Snowflake).

b. Generate a pair of private/public keys and password phrase

Generate a new pair of private/public keys (RSA 2048) with a phrase on your local machine. If you are using a Unix-based terminal, you can use the following command:
  1. ''.
  2. opensl genrsa -aes256 -out rsa_key.p8 2048
  3. opensl pkcs8 -topk8 -inform PEM -outform PEM -outform PEM -out rsa_key.p8 -out rsa_key.pem
  4. <insert-your-Passphrase>
  5. opensl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
  6. <insert-your-Passphrase>
  7. ''''
  1. rsa_key.p8: Private key file (keep this safe and provide in connector settings).
  2. rsa_key.pub: Public key file (used to set up the user in Snowflake).



Configure the snowflake user with public key

Sign in to Snowflake as an administrator and run the following SQL command to associate the public key with the specific user. Replace<Snowflake_user>with your Snowflake username.

  1. ''sql
  2. User user
  3. Set rsa_public_key=';
  4. ''''
  1. Note:Make sure that the private key file (rsa_key.p8) has appropriate reading permissions and is stored securely


    • Related Articles

    • Onibex Databricks JDBC Connector for Confluent Cloud

      The JDBC Onibex Databricks connector sends real-time data from Kafka to write to DeltaLake live tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution are supported using theRegistration ...
    • 03 - Deployment of One Connect Platform

      Prerequisite Note Before you start deploying the One Connect platform, make sure you meet the following requirements: Compatible Operating Systems: Kubernetes cluster nodes must use Linux operating systems. Affinity for Other Operating Systems: If ...
    • BTP confirmation

      Note For RISE with SAP customers, the Cloud Connector is provided at no additional cost as part of the RISE for SAP components. If installation is required, please refer to the SAP guideline for detailed instructions on setting up the Cloud ...
    • 3. One Connect - Connection

      3. CONNECTION The One Connect system provides the capability to extract information from SAP and its transfer it to Kafka. This facilitates the creation of a requisite database for analysis, which serves as the foundation for generating financial ...
    • 1. One Connect - Installation

      Step by Step SAP Integration Manual. The following information provide a condensed overview of the comprehensive SAP Integration Manual. In this manual, you'll find detailed step-by-step instructions for seamless SAP integration. Here's a glimpse of ...