The JDBC snowflake connector sends real-time data from Confluent Platform for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is supported using the Schema Registry.
5. Connector configuration details:
6. JDBC configuration:
- jdbc:snowflake://:443/?db=&warehouse=&schema=
7. Insert Settings:
Insert
, ,Uppsert
, ,Update
.True
orFalse
.Important note:
The upsert and update update modes depend on the Primary key mode configuration (record_keyorrecord_value) to accurately identify records.
The Primary key mode ,insert does not use keys to check for duplicates.
The Enable deletions para meter depends on the Primary key mode set of configuration a record_key.
8. Table and schema settings:
record_key
, ,none
, ,record_value
.True
orFalse
.True
orFalse
.9. Configuring the schema record:
http://<ip_schema_registry>:<port>
http://<ip_schema_registry>:<port>
ExtractTimestamp.type:
Add a timestamp to the records.transformes.ExtractTimestamp.timestamp.field:
Field where the timestamp of the event will be inserted.timestamp
transforms.InsertTimezone.type:
Adds a time zone field to the records.transforms.InsertTimezone.static.field:
Static field for time zone.time zone
transforms.InsertTimezone.static.value:
Time zone field value.America/Mexico_City
Length | DataType |
Int8 | Smallint |
Int16 | Smallint |
Int32 | Integrator |
Int64 | Bigint |
Float32 | Float |
Float64 | Double |
Boolean | Boolean |
Narrow | Primary key ->(English).64) |
Bytes | Pornario |
Null | Null |
Name Name Name Name | Description | Values |
name | Name of connector | <name_connector> |
connector.class | Specifies the kind of connector that integration will handle integration with Snowflake | com.onibex.connect.datalake.jdbc.OnibexSnowflakeSinkConnector |
tasks.max | Define the maximum number of tasks that the connector will run | Positive integer value > 1 |
topics topics topics topics topics | List of topics to be consumed by this connector | <Topic_Name> |
Name Name Name Name | Description | Values |
connection.url | JDBC URL to connect to the Snowflake database, specifying the database, warehouse and outline | dbc:snowflake://:443/?db=&warehouse=&schema= |
plug.user | Snowflake user | <User_snow> |
connection.password | Snowflake Password | <Password> |
connection.privateKey | Private key used for snowflake authentication | <PrivateKey> |
connection.privateKeyPasfrasase | Password phrase for private key used in snowflake authentication. | <PrivateKeyPassphrase> |
Name Name Name Name | Description | Values |
batch.size | Specifies the number of records to group into a single SQL transaction, when possible. | Positive integer value > 1 |
inser.mode | The insertion mode for records | insert / upsert / upsert |
delete.enabled | Edeletion of records in the target database | truth / false |
Name Name Name Name | Description | Values |
ta ta ta ta table.name.format | Format string used to define the name of the target table. Includes${topic}as placeholder for the name of the original theme. | ${topic} |
pk.mode | Specifies where to find the main key for the records that are inserted. | forwardingcord_key |
Name Name Name Name | Description | Values |
auto.create | Allows automatic table creation if they do not exist | truth / false |
auto.evolve | Allows automatic evolution of tables if the outline changes | truth / false |
Name Name Name Name | Description | Values |
key.converter | Key converter in Avro format | io.confluent.connect.avro. |
value.converter | Value converter in Avro format | io.confluent.connect.avro. |
.converter.schema.registry.url | Confluent schema log URLs | http:/ |
.converter.schema.registry.url | Confluent Scheme Log URL for Values | http:/ |
Name Name Name Name | Description | Values |
transformations | Transformations applied to data | ExtractTimestamp, InsertTimezone |
ExtractTimestamp.type | Type of transformation to add the timestamp to records | org.apache.kafka.connect.transforms.InsertField$Value |
transformes.ExtractTimestamp.timestamp.field | Field where the event timestamp will be inserted | timestamp |
transforms.InsertTimezone.type | Type of transformation to add the time zone | org.apache.kafka.connect.transforms.InsertField$Value |
transforms.InsertTimezone.static.field | Static field where the time zone will be inserted | time zone |
transforms.InsertTimezone.static.value | Time zone field value | America/Mexico_City |
Chain
type subjectfrom Confluent to the snowflake, it is essential to structure the target snowflake table in order to align with the properties of the message. Below is the structure and explanation of each field as they appear in Snowflake after ingestion.Field field | Type | Ordinal | Description |
raw_message | Varchar | 4 4 4 | Represents theme message headers in text format |
raw_message | Varchar | 1 1 1 | Additional information or metadata about the message; it usually contains identification data or type of data |
raw_message_key | Varchar | 2 2 2 | Stores the theme message key in text format, helping inrecord search and join |
raw_message_timet | Number | 5 | Timestamp of the message, stored as a number to facilitate temporary order and filtering |
raw_message | Varchar | 3 3 3 | Main content of the message in'String'formatting, representing the body of the message |
1. raw_message_header (Varchar): Stores any information in the header accompanying the subject message. This may include contextual data such as event type, partition identifier, or additional metadata needed to interpret the message.2. raw_message_info (Varchar): It has additional information or metadata related to the message. Depending on the connector settings, this could include application-specific identifiers or additional tags that classify the message.3. raw_message_key (Varchar): Contains the message key in'String'format. This field is essential for the unique identification of messages or operations such as'Upsert'because it serves to uniquely identify each record at the Snowflake table.4. raw_message_timestamp (Number): Stores the message's timestamp in numerical format. Useful for audits, orders and temporary filtering in Snowflake, allowing queries based on when the message was sent or processed.5. raw_message_value (Varchar): The main field that stores the content or value of the message in'Stringe'Hello world!format. It represents the body of the message and contains the data transmitted from Confluent Platform.
This section provides a step-by-step guide on how to set up authentication using private keys instead of a password for secure snowflake connections. Follow these instructions to generate and configure a private key for your Snowflake user.
a. Generate a pair of private/public keys
Generate a new pair of private/public keys (RSA 2048) on your local machine. If you are using a Unix-based terminal, you can use the following command: