Before setting up the Snowflake connector, gather the following information:
jdbc:snowflake://[account_name].snowflakecomputing.com/?user=[username]&warehouse=[warehouse]&db=[database]&schema=[schema]
The connector requires network access to Snowflake's services. These endpoints vary by account and region, so you need to retrieve your specific list.
Run this command in your Snowflake environment:
SELECT SYSTEM$ALLOWLIST();
This returns a JSON structure similar to:
[ {"host":"zg59224.us-east-2.aws.snowflakecomputing.com","port":443,"type":"SNOWFLAKE_DEPLOYMENT"}, {"host":"talactz-ds69451.snowflakecomputing.com","port":443,"type":"SNOWFLAKE_DEPLOYMENT_REGIONLESS"}, {"host":"sfc-repo.snowflakecomputing.com","port":443,"type":"SNOWSQL_REPO"}, {"host":"ocsp.rootg2.amazontrust.com","port":80,"type":"OCSP_RESPONDER"} ]
From the JSON response, extract the host
and port
values and format them as: <host>:<port>:TCP
Example:
{"host":"zg59224.us-east-2.aws.snowflakecomputing.com","port":443,"type":"SNOWFLAKE_DEPLOYMENT"}
zg59224.us-east-2.aws.snowflakecomputing.com:443:TCP
snowflakecomputing.com:443:TCP
Use the following JSON structure for your connector configuration. Replace the highlighted placeholders with your specific values:
{ "auto.create": "true", "auto.evolve": "true", "auto.offset.reset": "earliest", "batch.size": "500", "confluent.custom.schema.registry.auto": "true", "connection.password": "<your snowflake account password (4)>", "connection.url": "<the snowflake JDBC(3)>", "connection.user": "<your snowflake username (4)>", "delete.enabled": "true", "insert.mode": "upsert", "key.converter": "io.confluent.connect.json.JsonSchemaConverter", "offset.flush.interval.ms": "30000", "connection.maximumPoolSize": "10", "pk.mode": "record_key", "table.name.format": "${topic}", "topics": "<the name of the topic (2)>", "value.converter": "io.confluent.connect.json.JsonSchemaConverter" }
Replace <the name of the topic (2)>
with your actual topic name:
"topics": "<the name of the topic (2)>"
"topics": "shipping_info"
Apply the same process to all other highlighted placeholders using the corresponding values from your prerequisites list.
Note: For your convenience, the end of the documentation includes a clear reference table detailing all parameters, their possible values, and additional optional parameters to help you unlock even more functionality.
Your Snowflake connector is now running and sending data from your topic to Snowflake tables.
If your tables are in Avro format, you need to update the converters in your JSON configuration:
Key converter in Avro format:
io.confluent.connect.avro.AvroConverter
Value converter in Avro format:
io.confluent.connect.avro.AvroConverter
Replace the converter lines in your JSON configuration:
{ "auto.create": "true", "auto.evolve": "true", "auto.offset.reset": "earliest", "batch.size": "500", "confluent.custom.schema.registry.auto": "true", "connection.password": "<your snowflake account password (4)>", "connection.url": "<the snowflake JDBC(3)>", "connection.user": "<your snowflake username (4)>", "delete.enabled": "true", "insert.mode": "upsert", "key.converter": "io.confluent.connect.avro.AvroConverter", "offset.flush.interval.ms": "30000", "connection.maximumPoolSize": "10", "pk.mode": "record_key", "table.name.format": "${topic}", "topics": "<the name of the topic (2)>", "value.converter": "io.confluent.connect.avro.AvroConverter" }
"key.converter": "io.confluent.connect.avro.AvroConverter"
"value.converter": "io.confluent.connect.avro.AvroConverter"
For this configuration (apart from what you already have) you need:
To generate an unencrypted version, use the following command:
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
This command will create a file called rsa_key.p8
in your chosen folder. The commands generate a private key in PEM format.
From the same directory where you created the private key, run:
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
-----BEGIN PUBLIC KEY-----
and footer -----END PUBLIC KEY-----
ALTER USER <your_username> SET RSA_PUBLIC_KEY='<your_public_key_single_line>';
Check that the key was properly configured:
DESCRIBE USER <your_username>;
Look for the RSA_PUBLIC_KEY
property in the results.
Modify your connector JSON to use key-pair authentication instead of password:
{ "auto.create": "true", "auto.evolve": "true", "auto.offset.reset": "earliest", "batch.size": "500", "confluent.custom.schema.registry.auto": "true", "connection.url": "<the snowflake JDBC(3)>", "connection.user": "<your snowflake username (4)>", "connection.privateKey": "<content_of_rsa_key.p8_file>", "delete.enabled": "true", "insert.mode": "upsert", "key.converter": "io.confluent.connect.json.JsonSchemaConverter", "offset.flush.interval.ms": "30000", "connection.maximumPoolSize": "10", "pk.mode": "record_key", "table.name.format": "${topic}", "topics": "<the name of the topic (2)>", "value.converter": "io.confluent.connect.json.JsonSchemaConverter" }
connection.password
with connection.privateKey
and paste the entire content of your rsa_key.p8
file in a single line (not including the BEGIN and END lines). You must configure at least one authentication method.
Follow these steps to create and use a private key encrypted with a passphrase for key‑pair authentication:
Run the following command in your terminal; you'll be prompted to enter an encryption passphrase:
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8
Use the encrypted private key to generate the public key (you will need to supply the passphrase):
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Keep your rsa_key.p8, rsa_key.pub, and the encryption passphrase stored in a secure location for future reference and authentication tools.
Copy the contents of rsa_key.pub (excluding the BEGIN/END headers) and run:
alter user <user_name> set rsa_public_key='<public_key_single_line>';
If the user had a password login, disable it with:
alter user <user_name> unset password;
To confirm proper setup, run:
desc user <user_name>;
Then compute the SHA‑256 fingerprint using:
openssl rsa -pubin -in rsa_key.pub -outform DER | openssl dgst -sha256 -binary | openssl enc -base64
Compare this value to the RSA_PUBLIC_KEY_FP from DESC USER output to ensure they match.
If you are using a private key that is encrypted with a passphrase, you must include the following parameters in your connector configuration:
"connection.privateKey": "<the_content_of_your_rsa_key.p8_file_without_-----BEGIN_ENCRYPTED_PRIVATE_KEY-----and-----END_ENCRYPTED_PRIVATE_KEY----->", "connection.privateKeyPassphrase": "<the_passphrase>"
These fields must be added to the connector's JSON configuration if you're using an encrypted private key. Ensure that the private key is base64-encoded and does not contain headers or newlines.
-----BEGIN ENCRYPTED PRIVATE KEY-----
and -----END ENCRYPTED PRIVATE KEY-----
in the connection.privateKey parameter.
Field | Description | Possible Values | Default |
---|---|---|---|
auto.create | This parameter automatically creates tables in Snowflake if they do not already exist. | "auto.create": ("true", "false") |
"auto.create": "true" |
auto.evolve | This parameter allows the schema of the tables to evolve automatically as new data is ingested. | "auto.evolve": ("true", "false") |
"auto.evolve": "true" |
auto.offset.reset | Determines where to start reading data from a stream when no valid offset is available. Use 'earliest' to process all data from the beginning, or 'latest' to only process new data added after the consumer starts. | "auto.offset.reset": "earliest" or "latest" |
"auto.offset.reset": "earliest" |
batch.size | This parameter specifies the number of records to batch together before sending to Snowflake. | "batch.size": "1-1000" (if your messages have a lot of columns or info use less batches) |
"batch.size": "500" |
confluent.custom.schema.registry.auto | Automatically manages schema registration, reducing the need for manual setup and ensuring seamless integration with schema management systems. (only for confluent cloud) | "confluent.custom.schema.registry.auto": "true" |
"confluent.custom.schema.registry.auto": "true" |
connection.password | This parameter is the password for your Snowflake account. | "connection.password": "<your Snowflake account password>" |
"connection.password": "<your_snowflake_password>" |
connection.url | This parameter is the JDBC URL for connecting to your Snowflake instance. | "connection.url": "jdbc:snowflake://<Server_Hostname>:443/?db=<Database_Name>&warehouse=<COMPUTE_WH>&schema=<Schema_Name>" |
"connection.url": "jdbc:snowflake://<Your_Server_Hostname>:443/?db=<your_Database_Name>&warehouse=<your_COMPUTE_WH>&schema=<your_Schema_Name>" |
connection.user | This parameter is the username for your Snowflake account. (you have to use at least one authentication method) | "connection.user": "<your snowflake user account>" |
"connection.user": "<your_snowflake_user>" |
delete.enabled | This parameter enables the deletion of records in Snowflake based on the Kafka topic. | "delete.enabled": "true" or "false" |
"delete.enabled": "true" |
insert.mode | This parameter specifies the mode of inserting data into Snowflake (INSERT → Always add a new line. UPSERT → Replace line with same key.) | "insert.mode": "insert" or "upsert" |
"insert.mode": "upsert" |
key.converter | This parameter specifies the converter class for the key part of the Kafka messages. | "key.converter": "io.confluent.connect.(avro.AvroConverter, json.JsonSchemaConverter, storage.StringConverter)" |
"key.converter": "io.confluent.connect.json.JsonSchemaConverter" |
offset.flush.interval.ms | How often the system saves its "progress"(in milliseconds) so it can restart without reprocessing old data. Shorter times = safer in case of failure. Longer times = faster performance. | "offset.flush.interval.ms": "5000–60000" |
"offset.flush.interval.ms": "30000" |
connection.maximumPoolSize | Sets the maximum number of messages allowed to be send at the same time. | "connection.maximumPoolSize": "(numbers from 5 to 100)" |
"connection.maximumPoolSize": "10" |
pk.mode | This parameter specifies the mode for handling primary keys. "record_key": Uses a unique identifier for each data entry. "none": No unique identifier is used. | "pk.mode": ("none", "record_key") |
"pk.mode": "record_key" |
table.name.format | This parameter specifies the format for naming tables in Snowflake. The actual value of "topic" will replace ${topic}. For example, if "topic" is "Sales", the table name will be "Sales". | "table.name.format": "descriptive name for your topic" |
"table.name.format": "${topic}" |
topics | This parameter specifies the Kafka topics to be ingested into Snowflake. | "topics": "<The name of the kafka topic you want to send to snowflake>" |
"topics": "<Your_Topic_Name>" |
value.converter | This parameter specifies the converter class for the value part of the Kafka messages. | "value.converter": "io.confluent.connect.(avro.AvroConverter, json.JsonSchemaConverter, storage.StringConverter)" |
"value.converter": "io.confluent.connect.json.JsonSchemaConverter" |
Field | Description | Possible Values |
---|---|---|
connection.privateKey | The private key for authentication (if applicable) | "connection.privateKey": "<your .p8 content>" |
connection.privateKeyPassphrase | Passphrase for the private key (if it has) | "connection.privateKeyPassphrase": "<the passphrase that you put to the key>" |
tasks.max | The maximum number of tasks that should run for the connector (you can edit this in confluent cloud while creating the connector) | "tasks.max": "<the number of your topic partitions or less>" |
key.converter.schema.registry.url | Specifies the address where the system can find the schema registry, which helps manage and validate data formats. (only for confluent platform) | "key.converter.schema.registry.url": "http://<Ip_Host>:<Port>" |
value.converter.schema.registry.url | Specifies the address where the system can find the schema registry, which helps manage and validate data formats. (only for confluent platform) | "value.converter.schema.registry.url": "http://<Ip_Host>:<Port>" |
name | Optional connector name (you can edit this in confluent cloud while creating the connector) | "name": "<descriptive name for your connector>" |