Onibex Snowflake Sink Connector for Confluent Platform

Onibex Snowflake Sink Connector for Confluent Platform



The Onibex Snowflake JDBC connector sends real-time data from Kafka to write to the DeltaLake Live Tables based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and auto-evolution is supported using the Schema Registry.



Features

  1. Idempotent writes: The default insert.mode is INSERT. If configured as UPSERT, the connector will use upsert semantics rather than plain insert statements. Upsert semantics refer to atomically adding a new row or updating the existing row if there is a primary key constraint violation, providing idempotence.
  2. Schemas: The connector supports Avro input format for Key and Value. Schema Registry must be enabled to use a Schema Registry-based format.
  3. Table and column auto-creation: auto.create and auto-evolve are supported. Missing tables or columns can be created automatically. Table names are created based on Kafka topic names.
  4. Raw Data: The connector supports sinking raw data to Snowflake when insert.mode is INSERT and pk.mode is none.

Limitations

  1. Review limitations and capabilities for Snowflake JDBC driver.
  2. Table auto-creation doesn’t support “PORTIONED BY” and “PRIMARY KEY” clauses for column table. If partitions and a primary key are necessary to improve table performance, ALTER TABLE must be executed manually.
  3. Column auto-creation doesn’t support GENERATED ALWAYS AS expression. The default value for nullable columns is NULL.


Prerequisites

  1. Apache Kafka with support for Kafka Connect.
  2. Confluent Schema Registry running.
  3. Access to a Snowflake Data Warehouse.
  4. Valid Snowflake credentials.
  5. Onibex Connector installed in Kafka Connect.

Optional: Setting up Authentication with Private Keys


This section provides a step-by-step guide on how to set up authentication using private keys instead of a password for secure Snowflake connections. Follow these instructions to generate and configure a private key for your Snowflake user.

a. Generate a Private/Public Key Pair

Generate a new private/public key pair (RSA 2048) on your local machine. If you are using a Unix-based terminal, you can use the following command:

```bash
 openssl genrsa -out rsa_key.p8 2048
 openssl pkcs8 -topk8 -inform PEM -outform PEM -in rsa_key.p8 -out rsa_key.pem -nocrypt
 openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
```

- rsa_key.p8: Private key file (keep this secure and provide it in the connector configuration).
- rsa_key.pub: Public key file (used for setting up the user in Snowflake).

b. Generate a Private/Public Key Pair and Passphrase

Generate a new private/public key pair (RSA 2048) with Passphrase on your local machine. If you are using a Unix-based terminal, you can use the following command:
 
```bash
  openssl genrsa -aes256 -out rsa_key.p8 2048
  openssl pkcs8 -topk8 -inform PEM -outform PEM -in rsa_key.p8 -out rsa_key.pem
  <insert-your-Passphrase>
  openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
  <insert-your-Passphrase>
```
- rsa_key.p8: Private key file (keep this secure and provide it in the connector configuration).
- rsa_key.pub: Public key file (used for setting up the user in Snowflake).


Configure Snowflake User with Public Key

Log into Snowflake as an administrator, and execute the following SQL command to associate the public key with the specific user. Replace <SNOWFLAKE_USER> with your Snowflake username.

```sql
ALTER USER <SNOWFLAKE_USER>
SET RSA_PUBLIC_KEY='<public_key_value>';
```
- Note: Make sure the private key file (rsa_key.p8) has appropriate read permissions and is securely stored.

Configuration Properties

Snowflake Sink Connector Configuration

  1.  [Connection](#connection)
  2.  [Transaction](#transaction)
  3.  [Table Mapping](#table-mapping)
  4.  [Schema Evolution Support](#schema-evolution-support)
  5.  [Connector Recovery](#connector-recovery)
  6.  [Converters](#converters)
  7.  [Transforms](#transforms)

---

Configuration Properties

Connection


  Name
  Description
  Values
  name
  Name of the connector
 <name_connector>
  connector.class
  Specifies the connector class that will handle integration with Snowflake
 com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector
  connection.url
  JDBC URL to connect to the Snowflake database, specifying the database, warehouse, and schema
  dbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>
 connection.user
 Snowflake user
<User_snow>
 connection.password
 Snowflake password
<Password>
 connection.privateKey
 Private key used for authentication with Snowflake
<PrivateKey>
 connection.privateKeyPassphrase
 Passphrase for the private key used in Snowflake authentication.
<PrivateKeyPassphrase>


Transaction


 Name
 Description
 Values
  tasks.max
  Defines the maximum number of tasks that the connector will execute
  Positive integer value > 1
  batch.size
  Specifies the number of records to batch together in a single SQL transaction, when possible. 
  Positive integer value > 1


Table Mapping

 Name
 Description
 Values
table.name.format
A format string used to define the destination table name. It includes ${topic} as a placeholder for the originating topic name.
${topic}
 pk.mode
Specifies where to find the primary key for the records being inserted.
record_key
insert.mode
The insertion mode for the records
insert / upsert / update
delete.enabled
Enables the deletion of records in the target database
true / false


Schema Evolution Support

 Name
 Description
 Values
 auto.create
 Allows automatic creation of tables if they do not exist
 true / false
 auto.evolve
 Allows automatic evolution of tables if the schema changes
 true / false


Connector Recovery

 Name
 Description
 Values
 topics
 List of Kafka topics that this connector will consume
 <Topic_Name>

Converters

 Name
 Description
 Values
key.converter
 Key converter in Avro format
io.confluent.connect.avro.AvroConverter
value.converter
Value converter in Avro format
io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url
URL of the Confluent schema registry for keys
http://<Ip_Host>:<Port>
value.converter.schema.registry.url
URL of the Confluent schema registry for values
http://<Ip_Host>:<Port>

Transforms


Name 
Description
Values

transforms
Transformations applied to the data
ExtractTimestamp, InsertTimezone
transforms.ExtractTimestamp.type
Transformation type to add the timestamp to the records
org.apache.kafka.connect.transforms.InsertField$Value
transforms.ExtractTimestamp.timestamp.field
Field where the event timestamp will be inserted
timestamp
transforms.InsertTimezone.type
Transformation type to add the timezone
org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertTimezone.static.field
Static field where the timezone will be inserted
timezone
transforms.InsertTimezone.static.value
Value of the timezone field
America/Mexico_City

Examples 

Configuration Example whit Schema Registry and Password


```json
{
  "name": "<name_connector>",
  "config": {
    "value.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "key.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "transforms": "ExtractTimestamp, InsertTimezone",
    "topics": "<Topic_Name>",
    "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.ExtractTimestamp.timestamp.field": "timestamp",
    "transforms.InsertTimezone.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertTimezone.static.field": "timezone",
    "transforms.InsertTimezone.static.value": "<value_time_zone",
    "connection.url": "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>",
    "connection.user": "<User_snow>",
    "connection.password": "<Password>",
    "insert.mode": "UPSERT",
    "delete.enabled": "true",
    "table.name.format": "${topic}",
    "pk.mode": "record_key",
    "auto.create": "true",
    "auto.evolve": "true"
  }
}
```

Configuration Example whit Schema Registry and Private Key 


```json
{
  "name": "<name_connector>",
  "config": {
    "value.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "key.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "transforms": "ExtractTimestamp, InsertTimezone",
    "topics": "<Topic_Name>",
    "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.ExtractTimestamp.timestamp.field": "timestamp",
    "transforms.InsertTimezone.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertTimezone.static.field": "timezone",
    "transforms.InsertTimezone.static.value": "<value_time_zone",
    "connection.url": "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>",
    "connection.user": "<User_snow>",
    "connection.privateKey": "<Private_key>",
    "insert.mode": "UPSERT",
    "delete.enabled": "true",
    "table.name.format": "${topic}",
    "pk.mode": "record_key",
    "auto.create": "true",
    "auto.evolve": "true"
  }
}
```

Configuration Example whit Schema Registry, Private Key and Passphrase


```json
{
  "name": "<name_connector>",
  "config": {
    "value.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "key.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "transforms": "ExtractTimestamp, InsertTimezone",
    "topics": "<Topic_Name>",
    "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.ExtractTimestamp.timestamp.field": "timestamp",
    "transforms.InsertTimezone.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertTimezone.static.field": "timezone",
    "transforms.InsertTimezone.static.value": "<value_time_zone",
    "connection.url": "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>",
    "connection.user": "<User_snow>",
    "connection.privateKey": "<Private_key>",
    "connection.privateKeyPassphrase": "<Private_Passphrase>",
    "insert.mode": "UPSERT",
    "delete.enabled": "true",
    "table.name.format": "${topic}",
    "pk.mode": "record_key",
    "auto.create": "true",
    "auto.evolve": "true"
  }
}
```

Configuration whit String Values whitout Schema


Topic Schema for Snowflake Integration


When ingesting a `String` type topic from Confluent into Snowflake, it’s essential to structure the target Snowflake table to align with the message properties. Below is the structure and explanation of each field as they appear in Snowflake, following the ingestion.


Snowflake Table Schema for String Topics


Each Kafka message is inserted into Snowflake with the following table structure. This setup ensures proper parsing and interpretation of each field:


 Field
 Type
 Ordinal
 Description

  raw_message_header
  Varchar
  4
 Represents the Kafka message headers in text format
  raw_message_info
  Varchar
 1
 Additional information or metadata about the message; typically contains identification or type data
  raw_message_key
  Varchar
 2
 Stores the Kafka message key in text format, aiding in record search and joins
  raw_message_timestamp
  Number
 5
 Timestamp of the message, stored as a number to facilitate temporal ordering and filtering
  raw_message_value
  Varchar
 3
 Main content of the message in `String` format, representing the message body


Field Descriptions

1. raw_message_header (Varchar): Stores any header information accompanying the Kafka message. This may include context data like event type, partition identifier, or additional metadata necessary for interpreting the message.

2. raw_message_info (Varchar): Holds supplementary information or metadata related to the message. Based on connector configuration, this could include application-specific identifiers or additional tags classifying the message.

3. raw_message_key (Varchar): Contains the message key in `String` format. This field is essential for unique message identification or operations such as `UPSERT`, as it serves to uniquely identify each record in the Snowflake table.

4. raw_message_timestamp (Number): Stores the message's timestamp in numeric format. Useful for audits, ordering, and temporal filtering in Snowflake, enabling queries based on when the message was sent or processed.

5. raw_message_value (Varchar): The primary field storing the content or message value in `String` format. Represents the body of the message and holds the data being transmitted from Kafka.

Configuration Guidelines


To ensure that the connector ingests and stores the data according to this schema in Snowflake:
  1. Configure the connector to map and transform these fields as specified to match the Snowflake table structure.
  2. Ensure the Snowflake table has the same column names and data types as outlined above to prevent schema mismatches or type errors.
  3. Verify data structuring in Confluent so each Kafka message correctly segments into `header`, `key`, `timestamp`, and `value` upon ingestion in Snowflake.

This setup provides a standardized approach for storing Kafka messages in Snowflake, enabling streamlined data querying and real-time analytics.

Examples 

Configuration Example whit String Values and Password whitout Schema Registry 


```json
{
  "name": "<name_connector>",
  "config": {
    "value.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "key.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "name": "prueba_snow",
    "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
    "tasks.max": "5",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "topics": "<Topic_Name>",
    "connection.url": "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>",
    "connection.user": "<User_snow>",
    "connection.password": "<Password>",
    "connection.maximumPoolSize": "5",
    "insert.mode": "UPSERT",
    "delete.enabled": "false",
    "batch.size": "10",
    "table.name.format": "${topic}",
    "pk.mode": "none",
    "auto.create": "true",
    "auto.evolve": "false"
  }
}
```

Configuration Example whit String Values and Private Key whitout Schema Registry 


```json
{
  "name": "<name_connector>",
  "config": {
    "value.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "key.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "name": "prueba_snow",
    "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
    "tasks.max": "5",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "topics": "<Topic_Name>",
    "connection.url": "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>",
    "connection.user": "<User_snow>",
    "connection.privateKey": "<Private_key>",
    "connection.maximumPoolSize": "5",
    "insert.mode": "UPSERT",
    "delete.enabled": "false",
    "batch.size": "10",
    "table.name.format": "${topic}",
    "pk.mode": "none",
    "auto.create": "true",
    "auto.evolve": "false"
  }
}
```

Configuration Example whit String Values, Private Key and Passphrase whitout Schema Registry 

```json
{
  "name": "<name_connector>",
  "config": {
    "value.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "key.converter.schema.registry.url": "http://<Ip_Host>:<Port>",
    "name": "prueba_snow",
    "connector.class": "com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector",
    "tasks.max": "5",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "topics": "<Topic_Name>",
    "connection.url": "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>",
    "connection.user": "<User_snow>",
    "connection.privateKey": "<Private_key>",
    "connection.privateKeyPassphrase": "<Private_Passphrase>",
    "connection.maximumPoolSize": "5",
    "insert.mode": "UPSERT",
    "delete.enabled": "false",
    "batch.size": "10",
    "table.name.format": "${topic}",
    "pk.mode": "none",
    "auto.create": "true",
    "auto.evolve": "false"
  }
}
```





    • Related Articles

    • Onibex Databricks Sink Connector for Confluent Cloud

      The Onibex Databricks JDBC connector sends real-time data from Kafka to write to the DeltaLake Live Tables based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and auto-evolution is ...
    • Cloud Connector Connection Manual

      Note For RISE with SAP customers, the Cloud Connector is provided at no additional cost as part of the RISE for SAP components. If installation is required, please refer to the SAP guideline for detailed instructions on setting up the Cloud ...
    • 3. One Connect - Connection

      3. CONNECTION The One Connect system provides the capability to extract information from SAP and its transfer it to Kafka. This facilitates the creation of a requisite database for analysis, which serves as the foundation for generating financial ...
    • One Connect Idoc Configuration Manual

      1. Output Message Creation for IDOCS STEP 1 Access SAP STEP 2 Go to the Transaction field and enter “NACE” in the input box. Press Enter or click the checkmark. STEP 3 Select the application that you want to configure. For this manual, we will choose ...
    • 1. One Connect - Installation

      Step by Step SAP Integration Manual. The following information provide a condensed overview of the comprehensive SAP Integration Manual. In this manual, you'll find detailed step-by-step instructions for seamless SAP integration. Here's a glimpse of ...