Onibex Snowflake Iceberg Sink Connector for Confluent Platform and Cloud

Onibex Snowflake Iceberg Sink Connector for Confluent Platform and Cloud

Snowflake Connector Setup Guide (JSON, No Primary Key Configuration for Confluent Cloud)

Prerequisites

Before setting up the Snowflake connector, gather the following information:

1. API Key - Your Confluent Cloud API key
2. Topic Name - The name of the Kafka topic you want to send to Snowflake
3. Snowflake JDBC URL - Obtain this by:
  • Logging into Snowflake
  • Click "View Account Details"
  • Navigate to "Connectors/Drivers" tab
  • Select "JDBC Connection String"
  • Choose your warehouse and database
4. Snowflake Credentials - Username and password for your Snowflake account
5. JSON Configuration - The connector configuration (provided below)
6. Snowflake Endpoints - Network endpoints for connectivity (see next section)
Finding Snowflake Endpoints

The connector requires network access to Snowflake's services. These endpoints vary by account and region, so you need to retrieve your specific list.

Step 1: Extract Endpoints from Snowflake

Run this command in your Snowflake environment:

SELECT SYSTEM$ALLOWLIST();

This returns a JSON structure similar to:

[
  {"host":"zg59224.us-east-2.aws.snowflakecomputing.com","port":443,"type":"SNOWFLAKE_DEPLOYMENT"},
  {"host":"talactz-ds69451.snowflakecomputing.com","port":443,"type":"SNOWFLAKE_DEPLOYMENT_REGIONLESS"},
  {"host":"sfc-repo.snowflakecomputing.com","port":443,"type":"SNOWSQL_REPO"},
  {"host":"ocsp.rootg2.amazontrust.com","port":80,"type":"OCSP_RESPONDER"}
]
Step 2: Format Endpoints for Confluent

From the JSON response, extract the host and port values and format them as: <host>:<port>:TCP

Example:

Input: {"host":"zg59224.us-east-2.aws.snowflakecomputing.com","port":443,"type":"SNOWFLAKE_DEPLOYMENT"}
Output: zg59224.us-east-2.aws.snowflakecomputing.com:443:TCP
Pro Tip: Use wildcards to simplify endpoint management. Instead of listing every specific host, you can use: snowflakecomputing.com:443:TCP
Confluent has wildcard support integrated, so this wildcard will match all endpoints ending with snowflakecomputing.com:443:TCP.
Configuring the JSON

Use the following JSON structure for your connector configuration. Replace the highlighted placeholders with your specific values:

[
  {
    "auto.create": "true",
    "auto.evolve": "true",
    "auto.offset.reset": "earliest",
    "batch.size": "500",
    "confluent.custom.schema.registry.auto": "true",
    "connection.password": "<your snowflake account password (4)>",
    "connection.url": "<the snowflake JDBC(3)>",
    "connection.user": "<your snowflake username (4)>",
    "delete.enabled": "true",
    "insert.mode": "upsert",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "offset.flush.interval.ms": "10000",
    "pk.mode": "record_key",
    "table.name.format": "${topic}",
    "topics": "<the name of the topic (2)>",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter"
  }
]
Configuration Example:

Replace <the name of the topic (2)> with your actual topic name:

Before: "topics": "<the name of the topic (2)>"
After: "topics": "shipping_info"

Apply the same process to all other highlighted placeholders using the corresponding values from your prerequisites list.

Creating the Connector
1. Navigate to Connectors
In your Confluent Cloud cluster, select "Connectors"
Click "Add Connector"
2. Select Snowflake Connector
Search for "Snowflake Connector"
Choose "Use existing API key" and enter your API key
3. Configure the Connector
In the configuration section, select "JSON"
Paste your configured JSON from the previous section
4. Add Endpoints
Enter the formatted endpoints you obtained from the Snowflake allowlist
5. Configure Tasks
Select the number of tasks for the connector
Best Practice: Use the same number of tasks as partitions in your topic for optimal performance
6. Name Your Connector
Provide a descriptive name for your connector
7. Deploy
Review your configuration and deploy the connector

Your Snowflake connector is now running and sending data from your topic to Snowflake tables.

How to Implement a Private Key with No Passphrase

For this configuration (apart from what you already have) you need:

7. OpenSSL
8. A folder to put the keys
Step 1: Generate the Private Key (No Passphrase)

To generate an unencrypted version, use the following command:

openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt

This command will create a file called rsa_key.p8 in your chosen folder. The commands generate a private key in PEM format.

Step 2: Generate the Public Key

From the same directory where you created the private key, run:

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Step 3: Configure the Public Key in Snowflake
1. Copy the content of the public key file (rsa_key.pub)
2. Remove the header -----BEGIN PUBLIC KEY----- and footer -----END PUBLIC KEY-----
3. Remove all line breaks to make it a single line
4. In Snowflake, run this command to assign the public key to your user:
ALTER USER <your_username> SET RSA_PUBLIC_KEY='<your_public_key_single_line>';
Step 4: Verify the Key was Set

Check that the key was properly configured:

DESCRIBE USER <your_username>;

Look for the RSA_PUBLIC_KEY property in the results.

Step 5: Update Your JSON Configuration

Modify your connector JSON to use key-pair authentication instead of password:

[
  {
    "auto.create": "true",
    "auto.evolve": "true",
    "auto.offset.reset": "earliest",
    "batch.size": "500",
    "confluent.custom.schema.registry.auto": "true",
    "connection.url": "<the snowflake JDBC(3)>",
    "connection.user": "<your snowflake username (4)>",
    "connection.private_key": "<content_of_rsa_key.p8_file>",
    "delete.enabled": "true",
    "insert.mode": "upsert",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "offset.flush.interval.ms": "10000",
    "pk.mode": "record_key",
    "table.name.format": "${topic}",
    "topics": "<the name of the topic (2)>",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter"
  }
]
Note: Replace connection.password with connection.private_key and paste the entire content of your rsa_key.p8 file (including the BEGIN and END lines).
Step 6: Test the Connection

You can test if the private key authentication is working by connecting to Snowflake using the Snowflake CLI:

snow sql --accountname <your_account> --username <your_username> --authenticator SNOWFLAKE_JWT --private-key-path rsa_key.p8 --query "SELECT current_user();"

If successful, this will return your username, confirming that the key-pair authentication is properly configured.

    • Related Articles

    • Onibex Snowflake Sink Connector for Confluent Platform and Cloud

      The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
    • Onibex Snowflake Sink Connector Benefits

      The Onibex Snowflake Sink Connector enables real-time data ingestion from Confluent Platform and Confluent Cloud into topic-based subscription tables in Snowflake. It supports idempotent writes through elevator logic and allows for automatic table ...
    • Onibex Databricks JDBC Connector for Confluent Cloud

      JDBC Onibex Connector for Databricks The JDBC Onibex connector for Databricks sends real-time data from Kafka to write into live DeltaLake tables. Idempotent writes can be achieved using upserts. Automatic table creation and schema evolution are ...
    • Performance Between Snowflake Connector vs Onibex Connector

      Onibex Connector vs Snowflake Native Performance Analysis and Comparison Study Onibex Connector: Up to 35% Performance Improvement Optimized for high-throughput data ingestion into Snowflake Executive Summary This comprehensive performance analysis ...
    • Onibex Clickhouse Sink Connector

      The Onibex Clickhouse JDBC connector sends real-time data from Kafka to write to Tables based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and auto-evolution is supported using the ...