Onibex Databricks Delta Lake Sink Connector for Confluent Cloud

Onibex Databricks Delta Lake Sink Connector for Confluent Cloud


The Onibex Databricks JDBC connector facilitates the real-time transfer of data from Kafka for writing to DeltaLake Live Tables based on topic subscriptions. It enables idempotent writes with upserts and supports the auto-creation of tables and auto-evolution using the Schema Registry.

Features

  1. Idempotent writes: The default insert.mode is INSERT. If it is configured as UPSERT, the connector will use upsert semantics rather than plain insert statements. Upsert semantics refer to atomically adding a new row or updating the existing row if there is a primary key constraint violation, which provides idempotence.
  2. Schemas: The connector supports Avro input format for Key and Value. Schema Registry must be enabled to use a Schema Registry-based format
  3. Table and column auto-creation: auto.create and auto-evolve are supported. If tables or columns are missing, they can be created automatically. Table names are created based on Kafka topic names.

Raw Data

The connector supports sinking raw data to Databricks when insert.mode is INSERT and pk.mode is none.

Limitations

  1. Review limitations and capabilities for the Databricks JDBC driver.
  1. Table auto-creation does not support "PORTIONED BY" and "PRIMARY KEY" clauses for column tables. If partitions and a primary key are necessary for enhancing table performance, ALTER TABLE must be executed manually.
  1. Column auto-creation does not support GENERATED ALWAYS AS expressions. The default value for nullable columns is NULL.

Configuration Properties


Connection

Name
Description
Values
connection.url
To connect databricks database
jdbc:databricks://<Server Hostname>:443;HttpPath=<Http Path>[;property=value[;property=value]]

Example:
jdbc:databricks://xxxxx.cloud.databricks.com:443/MY_SCHEMA_NAME;transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/xxxxxxx;ConnCatalog=MY_CATALOG_NAME;ConnSchema= MY_SCHEMA_NAME


connection.user
Optional if the url contains PWD parameter
String value
connection.password
Optional if the url contains PWD parameter
String value





Transaction

Name
Description
Values
insert.mode
SQL transaction to write data.
insert
upsert
update
batch.size
Specifies how many records to attempt to batch together for the SQL transaction into the destination table, when possible.
Positive integer value > 1
delete.enabled
Whether to treat ``null`` record values as deletes. Requires pk.mode set to record_key
true
false

Table Mapping

Name
Description
Values
table.name.format
A format string for the destination table name, which may contain '${topic}' as a placeholder for the originating topic name.

For example: my_table_${topic} for the topic 'product' will map to the table name ‘my_table_product’
String Value
pk.mode
Place to find the primary key.
none
record_key
record_value
pk.fields
List of comma-separated primary key field names.
none
record_key
record_value
fields.whitelist
List of comma-separated record value field names. If empty, all fields from the record

db.timezone

UTC

Schema Evolution Support

Name
Description
Values
auto.create
Whether to automatically create the destination table based on record schema
true
false
auto.evolve
Whether to automatically add columns in the table schema after successful schema evolution.
true
false

Connector Recovery

Name
Description
Values
max.retries
Maximum Retries
true
false
retry.backoff.ms
The time in milliseconds to wait following an error before a retry attempt is made
true
false

Converters

Name
Description
Values
key.converter
Serializer
io.confluent.connect.avro.AvroConverter

header.converter
Serializer
io.confluent.connect.avro.AvroConverter
org.apache.kafka.connect.converters.ByteArrayConverter
value.converter
Serializer
io.confluent.connect.avro.AvroConverter
org.apache.kafka.connect.converters.ByteArrayConverter

Examples:

Confluent Platform. Onibex Databricks Avro Connector.

  1. {
      "name":"ONIBEX_DATABRICKS_AVRO_CONNECTOR_CP",
      "config":{
         "connector.class":"com.onibex.connect.delta.jdbc.OnibexDeltaSinkConnector",
         "key.converter":"io.confluent.connect.avro.AvroConverter",
         "value.converter":"io.confluent.connect.avro.AvroConverter",
         "topics":"YOUR_AVRO_TOPIC",
         "connection.url":"jdbc:databricks://xxxxxxxxx.cloud.databricks.com:443/YOUR_SCHEMA;transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/yyyy;ConnCatalog=YOUR_CATALOG;ConnSchema=YOUR_SCHEMA",
         "connection.user":"YOUR_USER",
         "connection.password":"YOUR_PASSWORD",
         "insert.mode":"upsert",
         "delete.enabled":"true",
         "table.name.format":"${topic}",
         "pk.mode":"record_key",
         "pk.fields":"id",
         "auto.create":"true",
         "auto.evolve":"true",
         "schema.registry.url":"http://schema-registry:8081",
         "value.converter.schema.registry.url":"http://schema-registry:8081",
         "key.converter.schema.registry.url":"http://schema-registry:8081"
      }
    }


Confluent Platform. Onibex Databricks Raw Data Connector

  1. {
      "name":"ONIBEX_DATABRICKS_RAW_DATA_CONNECTOR_CP",
      "config":{
         "connector.class":"com.onibex.connect.delta.jdbc.OnibexDeltaSinkConnector",
         "key.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
         "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
         "topics":"YOUR_AVRO_TOPIC",
         "connection.url":"jdbc:databricks://xxxxxxxxx.cloud.databricks.com:443/YOUR_SCHEMA;transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/yyyyyy;ConnCatalog=YOUR_CATALOG;ConnSchema=YOUR_SCHEMA",
         "connection.user":"YOUR_USER",
         "connection.password":"YOUR_PASSWORD",
         "insert.mode":"insert",
         "delete.enabled":"false",
         "table.name.format":"RAW_DATA_${topic}",
         "pk.mode":"none",
         "auto.create":"true",
         "auto.evolve":"false",
         "schema.registry.url":"http://schema-registry:8081",
         "value.converter.schema.registry.url":"http://schema-registry:8081",
         "key.converter.schema.registry.url":"http://schema-registry:8081"
      }
    }

Installation in Confluent Cloud


1) Add new connector PlugIn





2) Fill out connector details



3) Add your connection to Databricks using the new Onibex Databricks Delta Lake Sink Connector Plugin 





    • Related Articles

    • Cloud Connector Connection Manual

      Note For RISE with SAP customers, the Cloud Connector is provided at no additional cost as part of the RISE for SAP components. If installation is required, please refer to the SAP guideline for detailed instructions on setting up the Cloud ...
    • 3. One Connect - Connection

      3. CONNECTION The One Connect system provides the capability to extract information from SAP and its transfer it to Kafka. This facilitates the creation of a requisite database for analysis, which serves as the foundation for generating financial ...
    • 2. One Connect - Setup

      2. SETUP In addition to setting up entries and core modules, One Connect requires the configuration of specific customer tables. These tables play a crucial role in facilitating the application’s data retrieval process from SAP and its subsequent ...
    • One Connect Idoc Configuration Manual

      1. Output Message Creation for IDOCS STEP 1 Access SAP STEP 2 Go to the Transaction field and enter “NACE” in the input box. Press Enter or click the checkmark. STEP 3 Select the application that you want to configure. For this manual, we will choose ...
    • 1. One Connect - Installation

      Step by Step SAP Integration Manual. The following information provide a condensed overview of the comprehensive SAP Integration Manual. In this manual, you'll find detailed step-by-step instructions for seamless SAP integration. Here's a glimpse of ...