The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is supported using the Schema Registry.
5. Connector configuration details:
6. JDBC configuration:
- jdbc:snowflake://<Account/Server URL>:443/?db=<snowflake_database>&warehouse=<snowflake_wharehouse>&schema=<snowflake_schema>
This section provides a step-by-step guide on how to set up authentication using private keys instead of a password for secure snowflake connections. Follow these instructions to generate and configure a private key for your Snowflake user.
a. Generate a pair of private/public keys
Generate a new pair of private/public keys (RSA 2048) on your local machine. If you are using a Unix-based terminal, you can use the following command:
7. Insert Settings:
Insert
, ,Upsert
.True
or False
.Important note:
The upsert mode depend on the Primary key mode configuration (record_key or record_value) to accurately identify records.
The Primary key mode , insert does not use keys to check for duplicates.
The Enable deletions para meter depends on the Primary key mode set of configuration a record_key.
8. Table and schema settings:
record_key
, none
, ,record_value
.True
or False
.True
or False
.9. Configuring the schema record:
http://<ip_schema_registry>:<port>
http://<ip_schema_registry>:<port>
ExtractTimestamp.type:
Add a timestamp to the records.transformes.ExtractTimestamp.timestamp.field:
Field where the timestamp of the event will be inserted.timestamp
transforms.InsertTimezone.type:
Adds a time zone field to the records.transforms.InsertTimezone.static.field:
Static field for time zone.timezone
transforms.InsertTimezone.static.value:
Time zone field value.America/Mexico_City

Length | DataType |
Int8 | Smallint |
Int16 | Smallint |
Int32 | Integrator |
Int64 | Bigint |
Float32 | Float |
Float64 | Double |
Boolean | Boolean |
Narrow | Primary key ->(English).64) |
Bytes | Pornario |
Null | Null |
Name | Description | Values |
name | Name of connector | <name_connector> |
connector.class | Specifies the kind of connector that integration will handle integration with Snowflake | com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector |
tasks.max | Define the maximum number of tasks that the connector will run | Positive integer value > 1 |
topics | List of topics to be consumed by this connector | <Topic_Name> |
Name | Description | Values |
connection.url | JDBC URL to connect to the Snowflake database, specifying the database, warehouse and outline | dbc:snowflake://<Account/Server URL>:443/?db=<snowflake_database>&warehouse<snowflake_wharehouse>=&schema=<snowflake_schema> |
connection.user | Snowflake user | <snowflake_user> |
connection.password | Snowflake Password | <Password> |
connection.privateKey | Private key used for snowflake authentication | <PrivateKey> |
connection.privateKeyPassphrase | Password phrase for private key used in snowflake authentication. | <PrivateKeyPassphrase> |
Name | Description | Values |
batch.size | Specifies the number of records to group into a single SQL transaction, when possible. | Positive integer value >= 1 |
inser.mode | The insertion mode for records | insert / upsert |
delete.enabled | Enables the deletion of records in the target database | true / false |
Name | Description | Values |
table.name.format | Format string used to define the name of the target table. Includes${topic} as placeholder for the name of the original theme. | ${topic} |
pk.mode | Specifies where to find the main key for the records that are inserted. | record_key |
Name | Description | Values |
auto.create | Allows automatic table creation if they do not exist | true/ false |
auto.evolve | Allows automatic evolution of tables if the outline changes | true/ false |
Name | Description | Values |
key.converter | Key converter in Avro format | io.confluent.connect.avro.AvroConverter |
value.converter | Value converter in Avro format | io.confluent.connect.avro.AvroConverter |
key.converter.schema.registry.url | Confluent schema log URLs for Keys | http://<schema_registry_ip>:<port> |
value.converter.schema.registry.url | Confluent Scheme Log URL for Values | http://<schema_registry_ip>:<port> |
Name | Description | Values |
transforms | Transformations applied to data | ExtractTimestamp, InsertTimezone |
transforms.ExtractTimestamp.type | Type of transformation to add the timestamp to records | org.apache.kafka.connect.transforms.InsertField$Value |
transforms.ExtractTimestamp.timestamp.field | Field where the event timestamp will be inserted | timestamp |
transforms.InsertTimezone.type | Type of transformation to add the time zone | org.apache.kafka.connect.transforms.InsertField$Value |
transforms.InsertTimezone.static.field | Static field where the time zone will be inserted | timezone |
transforms.InsertTimezone.static.value | Time zone field value | America/Mexico_City |
Field field | Type | Ordinal | Description |
raw_message_header | Varchar | 4 | Represents kafka message headers in text format |
raw_message_info | Varchar | 1 | Additional information or metadata about the message; it usually contains identification data or type of data |
raw_message_key | Varchar | 2 | Stores the theme message key in text format, helping inrecord search and join |
raw_message_timestamp | Number | 5 | Timestamp of the message, stored as a number to facilitate temporary order and filtering |
raw_message_value | Varchar | 3 | Main content of the message in 'String' formatting, representing the body of the message |
1. raw_message_header (Varchar): Stores any information in the header accompanying the subject message. This may include contextual data such as event type, partition identifier, or additional metadata needed to interpret the message.2. raw_message_info (Varchar): It has additional information or metadata related to the message. Depending on the connector settings, this could include application-specific identifiers or additional tags that classify the message.3. raw_message_key (Varchar): Contains the message key in 'String' format. This field is essential for the unique identification of messages or operations such as 'Upsert' because it serves to uniquely identify each record at the Snowflake table.4. raw_message_timestamp (Number): Stores the message's timestamp in numerical format. Useful for audits, orders and temporary filtering in Snowflake, allowing queries based on when the message was sent or processed.5. raw_message_value (Varchar): The main field that stores the content or value of the message in 'String' format. It represents the body of the message and contains the data transmitted from Confluent Platform.