Onibex Snowflake Iceberg Sink Connector for Confluent Platform and Cloud

Onibex Snowflake Iceberg Sink Connector for Confluent Platform and Cloud

Snowflake Connector Setup Guide (JSON, No Primary Key Configuration for Confluent Cloud)

Prerequisites

Before setting up the Snowflake connector, gather the following information:

1. API Key - Your Confluent Cloud API key. You can create your Kafka API Key going to Confluent Cloud → Kafka Clusters → Your Cluster → API keys → Add API Key, then save the generated Key and Secret.
2. Topic Name - The name of the Kafka topic to which you intend to send data, including the format of the messages it contains (e.g., Avro, JSON, String, etc.).
3. Snowflake JDBC URL - Obtain this by:
  • Logging into Snowflake
  • Click "View Account Details"
  • Navigate to "Connectors/Drivers" tab
  • Select "JDBC Connection String". The URL should follow this format: jdbc:snowflake://[account_name].snowflakecomputing.com/?user=[username]&warehouse=[warehouse]&db=[database]&schema=[schema]
  • Choose your warehouse and database
4. Snowflake Credentials - Username and password for your Snowflake account
5. JSON Configuration - The connector configuration (provided below)
6. Snowflake Endpoints - Network endpoints for connectivity (see next section)
Finding Snowflake Endpoints

The connector requires network access to Snowflake's services. These endpoints vary by account and region, so you need to retrieve your specific list.

Step 1: Extract Endpoints from Snowflake

Run this command in your Snowflake environment:

SELECT SYSTEM$ALLOWLIST();

This returns a JSON structure similar to:

[
  {"host":"zg59224.us-east-2.aws.snowflakecomputing.com","port":443,"type":"SNOWFLAKE_DEPLOYMENT"},
  {"host":"talactz-ds69451.snowflakecomputing.com","port":443,"type":"SNOWFLAKE_DEPLOYMENT_REGIONLESS"},
  {"host":"sfc-repo.snowflakecomputing.com","port":443,"type":"SNOWSQL_REPO"},
  {"host":"ocsp.rootg2.amazontrust.com","port":80,"type":"OCSP_RESPONDER"}
]
Step 2: Format Endpoints for Confluent

From the JSON response, extract the host and port values and format them as: <host>:<port>:TCP

Example:

Input: {"host":"zg59224.us-east-2.aws.snowflakecomputing.com","port":443,"type":"SNOWFLAKE_DEPLOYMENT"}
Output: zg59224.us-east-2.aws.snowflakecomputing.com:443:TCP
Pro Tip: Use wildcards to simplify endpoint management. Instead of listing every specific host, you can use: snowflakecomputing.com:443:TCP
Configuring the JSON

Use the following JSON structure for your connector configuration. Replace the highlighted placeholders with your specific values:

{
  "auto.create": "true",
  "auto.evolve": "true",
  "auto.offset.reset": "earliest",
  "batch.size": "500",
  "confluent.custom.schema.registry.auto": "true",
  "connection.password": "<your snowflake account password (4)>",
  "connection.url": "<the snowflake JDBC(3)>",
  "connection.user": "<your snowflake username (4)>",
  "delete.enabled": "true",
  "insert.mode": "upsert",
  "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
  "offset.flush.interval.ms": "30000",
  "connection.maximumPoolSize": "10",
  "pk.mode": "record_key",
  "table.name.format": "${topic}",
  "topics": "<the name of the topic (2)>",
  "value.converter": "io.confluent.connect.json.JsonSchemaConverter"
}
Configuration Example:

Replace <the name of the topic (2)> with your actual topic name:

Before: "topics": "<the name of the topic (2)>"
After: "topics": "shipping_info"

Apply the same process to all other highlighted placeholders using the corresponding values from your prerequisites list.

Note: For your convenience, the end of the documentation includes a clear reference table detailing all parameters, their possible values, and additional optional parameters to help you unlock even more functionality.

Creating the Connector
1. Navigate to Connectors
In your Confluent Cloud cluster, select "Connectors" and click "Add Connector".
2. Select Snowflake Connector
Search for "Onibex Snowflake Sink Connector". Choose "Use existing API key" if you have already created one and enter your API key, or instead choose "Generate API key & download".
3. Configure the Connector
In the configuration section, select "JSON" and paste your configured JSON from the previous section.
4. Add Endpoints
Enter the formatted endpoints you obtained from the Snowflake allowlist.
5. Configure Tasks
Select the number of tasks for the connector. In Confluent, a connector's tasks are the parallel execution units that actually move data; each task is an independent worker thread (or process) created by the connector to split the workload.
Best Practice: Use the same number of tasks as partitions in your topic for optimal performance.
6. Name Your Connector
Provide a descriptive name for your connector.
7. Deploy
Review your configuration and deploy the connector.

Your Snowflake connector is now running and sending data from your topic to Snowflake tables.

Avro Format Configuration

If your tables are in Avro format, you need to update the converters in your JSON configuration:

Key converter in Avro format:

io.confluent.connect.avro.AvroConverter

Value converter in Avro format:

io.confluent.connect.avro.AvroConverter

Updated JSON for Avro Format

Replace the converter lines in your JSON configuration:

{
  "auto.create": "true",
  "auto.evolve": "true",
  "auto.offset.reset": "earliest",
  "batch.size": "500",
  "confluent.custom.schema.registry.auto": "true",
  "connection.password": "<your snowflake account password (4)>",
  "connection.url": "<the snowflake JDBC(3)>",
  "connection.user": "<your snowflake username (4)>",
  "delete.enabled": "true",
  "insert.mode": "upsert",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "offset.flush.interval.ms": "30000",
  "connection.maximumPoolSize": "10",
  "pk.mode": "record_key",
  "table.name.format": "${topic}",
  "topics": "<the name of the topic (2)>",
  "value.converter": "io.confluent.connect.avro.AvroConverter"
}
Note: The key changes are:
  • "key.converter": "io.confluent.connect.avro.AvroConverter"
  • "value.converter": "io.confluent.connect.avro.AvroConverter"
How to Implement a Private Key with No Passphrase

For this configuration (apart from what you already have) you need:

7. OpenSSL
8. A folder to put the keys
Step 1: Generate the Private Key (No Passphrase)

To generate an unencrypted version, use the following command:

openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt

This command will create a file called rsa_key.p8 in your chosen folder. The commands generate a private key in PEM format.

Step 2: Generate the Public Key

From the same directory where you created the private key, run:

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Step 3: Configure the Public Key in Snowflake
1. Copy the content of the public key file (rsa_key.pub)
2. Remove the header -----BEGIN PUBLIC KEY----- and footer -----END PUBLIC KEY-----
3. Remove all line breaks to make it a single line
4. In Snowflake, run this command to assign the public key to your user:
ALTER USER <your_username> SET RSA_PUBLIC_KEY='<your_public_key_single_line>';
Step 4: Verify the Key was Set

Check that the key was properly configured:

DESCRIBE USER <your_username>;

Look for the RSA_PUBLIC_KEY property in the results.

Step 5: Update Your JSON Configuration

Modify your connector JSON to use key-pair authentication instead of password:

{
  "auto.create": "true",
  "auto.evolve": "true",
  "auto.offset.reset": "earliest",
  "batch.size": "500",
  "confluent.custom.schema.registry.auto": "true",
  "connection.url": "<the snowflake JDBC(3)>",
  "connection.user": "<your snowflake username (4)>",
  "connection.privateKey": "<content_of_rsa_key.p8_file>",
  "delete.enabled": "true",
  "insert.mode": "upsert",
  "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
  "offset.flush.interval.ms": "30000",
  "connection.maximumPoolSize": "10",
  "pk.mode": "record_key",
  "table.name.format": "${topic}",
  "topics": "<the name of the topic (2)>",
  "value.converter": "io.confluent.connect.json.JsonSchemaConverter"
}
Note: Replace connection.password with connection.privateKey and paste the entire content of your rsa_key.p8 file in a single line (not including the BEGIN and END lines). You must configure at least one authentication method.
Encrypted Private Key (With Passphrase) Configuration

Follow these steps to create and use a private key encrypted with a passphrase for key‑pair authentication:

Step 1: Generate an encrypted private key

Run the following command in your terminal; you'll be prompted to enter an encryption passphrase:

openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8
Step 2: Generate the public key

Use the encrypted private key to generate the public key (you will need to supply the passphrase):

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Step 3: Store the key pair and passphrase securely

Keep your rsa_key.p8, rsa_key.pub, and the encryption passphrase stored in a secure location for future reference and authentication tools.

Step 4: Assign the public key to a Snowflake user

Copy the contents of rsa_key.pub (excluding the BEGIN/END headers) and run:

alter user <user_name> set rsa_public_key='<public_key_single_line>';

If the user had a password login, disable it with:

alter user <user_name> unset password;
Step 5: (Optional) Validate public key configuration

To confirm proper setup, run:

desc user <user_name>;

Then compute the SHA‑256 fingerprint using:

openssl rsa -pubin -in rsa_key.pub -outform DER | openssl dgst -sha256 -binary | openssl enc -base64

Compare this value to the RSA_PUBLIC_KEY_FP from DESC USER output to ensure they match.

Connector Configuration for Encrypted Key

If you are using a private key that is encrypted with a passphrase, you must include the following parameters in your connector configuration:

"connection.privateKey": "<the_content_of_your_rsa_key.p8_file_without_-----BEGIN_ENCRYPTED_PRIVATE_KEY-----and-----END_ENCRYPTED_PRIVATE_KEY----->",
"connection.privateKeyPassphrase": "<the_passphrase>"

These fields must be added to the connector's JSON configuration if you're using an encrypted private key. Ensure that the private key is base64-encoded and does not contain headers or newlines.

Important: Do not include the header and footer lines -----BEGIN ENCRYPTED PRIVATE KEY----- and -----END ENCRYPTED PRIVATE KEY----- in the connection.privateKey parameter.
Connection Parameters
Field Description Possible Values Default
auto.create This parameter automatically creates tables in Snowflake if they do not already exist. "auto.create": ("true", "false") "auto.create": "true"
auto.evolve This parameter allows the schema of the tables to evolve automatically as new data is ingested. "auto.evolve": ("true", "false") "auto.evolve": "true"
auto.offset.reset Determines where to start reading data from a stream when no valid offset is available. Use 'earliest' to process all data from the beginning, or 'latest' to only process new data added after the consumer starts. "auto.offset.reset": "earliest" or "latest" "auto.offset.reset": "earliest"
batch.size This parameter specifies the number of records to batch together before sending to Snowflake. "batch.size": "1-1000" (if your messages have a lot of columns or info use less batches) "batch.size": "500"
confluent.custom.schema.registry.auto Automatically manages schema registration, reducing the need for manual setup and ensuring seamless integration with schema management systems. (only for confluent cloud) "confluent.custom.schema.registry.auto": "true" "confluent.custom.schema.registry.auto": "true"
connection.password This parameter is the password for your Snowflake account. "connection.password": "<your Snowflake account password>" "connection.password": "<your_snowflake_password>"
connection.url This parameter is the JDBC URL for connecting to your Snowflake instance. "connection.url": "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>" "connection.url": "jdbc:snowflake://<Your_Server_Hostname>:443/?db=<your_Database_Name>&warehouse=<your_COMPUTE_WH>&schema=<your_Schema_Name>"
connection.user This parameter is the username for your Snowflake account. (you have to use at least one authentication method) "connection.user": "<your snowflake user account>" "connection.user": "<your_snowflake_user>"
delete.enabled This parameter enables the deletion of records in Snowflake based on the Kafka topic. "delete.enabled": "true" or "false" "delete.enabled": "true"
insert.mode This parameter specifies the mode of inserting data into Snowflake (INSERT → Always add a new line. UPSERT → Replace line with same key.) "insert.mode": "insert" or "upsert" "insert.mode": "upsert"
key.converter This parameter specifies the converter class for the key part of the Kafka messages. "key.converter": "io.confluent.connect.(avro.AvroConverter, json.JsonSchemaConverter, storage.StringConverter)" "key.converter": "io.confluent.connect.json.JsonSchemaConverter"
offset.flush.interval.ms How often the system saves its "progress"(in milliseconds) so it can restart without reprocessing old data. Shorter times = safer in case of failure. Longer times = faster performance. "offset.flush.interval.ms": "5000–60000" "offset.flush.interval.ms": "30000"
connection.maximumPoolSize Sets the maximum number of messages allowed to be send at the same time. "connection.maximumPoolSize": "(numbers from 5 to 100)" "connection.maximumPoolSize": "10"
pk.mode This parameter specifies the mode for handling primary keys. "record_key": Uses a unique identifier for each data entry. "none": No unique identifier is used. "pk.mode": ("none", "record_key") "pk.mode": "record_key"
table.name.format This parameter specifies the format for naming tables in Snowflake. The actual value of "topic" will replace ${topic}. For example, if "topic" is "Sales", the table name will be "Sales". "table.name.format": "descriptive name for your topic" "table.name.format": "${topic}"
topics This parameter specifies the Kafka topics to be ingested into Snowflake. "topics": "<The name of the kafka topic you want to send to snowflake>" "topics": "<Your_Topic_Name>"
value.converter This parameter specifies the converter class for the value part of the Kafka messages. "value.converter": "io.confluent.connect.(avro.AvroConverter, json.JsonSchemaConverter, storage.StringConverter)" "value.converter": "io.confluent.connect.json.JsonSchemaConverter"
Optional Parameters
Field Description Possible Values
connection.privateKey The private key for authentication (if applicable) "connection.privateKey": "<your .p8 content>"
connection.privateKeyPassphrase Passphrase for the private key (if it has) "connection.privateKeyPassphrase": "<the passphrase that you put to the key>"
tasks.max The maximum number of tasks that should run for the connector (you can edit this in confluent cloud while creating the connector) "tasks.max": "<the number of your topic partitions or less>"
key.converter.schema.registry.url Specifies the address where the system can find the schema registry, which helps manage and validate data formats. (only for confluent platform) "key.converter.schema.registry.url": "http://<Ip_Host>:<Port>"
value.converter.schema.registry.url Specifies the address where the system can find the schema registry, which helps manage and validate data formats. (only for confluent platform) "value.converter.schema.registry.url": "http://<Ip_Host>:<Port>"
name Optional connector name (you can edit this in confluent cloud while creating the connector) "name": "<descriptive name for your connector>"
    • Related Articles

    • Onibex Snowflake Sink Connector for Confluent Platform and Cloud

      The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
    • Onibex Snowflake Sink Connector Benefits

      The Onibex Snowflake Sink Connector enables real-time data ingestion from Confluent Platform and Confluent Cloud into topic-based subscription tables in Snowflake. It supports idempotent writes through elevator logic and allows for automatic table ...
    • Onibex Databricks JDBC Connector for Confluent Cloud

      JDBC Onibex Connector for Databricks The JDBC Onibex connector for Databricks sends real-time data from Kafka to write into live DeltaLake tables. Idempotent writes can be achieved using upserts. Automatic table creation and schema evolution are ...
    • Performance Between Snowflake Connector vs Onibex Connector

      Onibex Connector vs Snowflake Native Performance Analysis and Comparison Study Onibex Connector: Up to 35% Performance Improvement Optimized for high-throughput data ingestion into Snowflake Executive Summary This comprehensive performance analysis ...
    • Onibex Clickhouse Sink Connector

      The Onibex Clickhouse JDBC connector sends real-time data from Kafka to write to Tables based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and auto-evolution is supported using the ...