Onibex Databricks JDBC Connector for Confluent Cloud

Onibex Databricks JDBC Connector for Confluent Cloud

JDBC Onibex Connector for Databricks

The JDBC Onibex connector for Databricks sends real-time data from Kafka to write into live DeltaLake tables. Idempotent writes can be achieved using upserts. Automatic table creation and schema evolution are supported through schema registration.

Prerequisites

  1. Confluent platform with support for Confluent custom connector.
  2. Confluent Schema Registry running.
  3. Access to a databricks data store.
  4. Valid databricks credentials.
  5. A valid Onibex license.
  6. Onibex connector installed on Confluent Platform / Cloud.

Features

  • Idempotent Writes: The insert.mode is set to INSERT. If configured as UPSERT, the connector will use upsert semantics instead of simple insert statements. UPSERT semantics ensure atomic aggregation of a new row or updating an existing row if there is a key constraint violation, providing idempotency.
  • Schemas: The connector supports the Avro input format for both key and value. Schema registration must be enabled to use a Schema Registry-based format.
  • Automatic Table and Column Creation: Auto-creation (auto.create) and auto-evolution are supported. Missing tables or columns can be created automatically. Table names are generated based on Kafka topic names.
  • Raw Data Support: The connector supports raw data ingestion into Databricks when insert.mode is set to INSERT and pk.mode is set to none

Limitations

Review the limitations and capabilities for Databricks JDBC driver.

  1. Catalogue and Scheme Management: The connector does not have the ability to create catalogs or schemes. These must be created manually before the connector can be used.

  2. Creation of tables: Automatic table creation by the connector supports the inclusion of "PARTITIONED BY" or "PRIMARY KEY" in the definition of table. 

  3. Column creation: The connector does not support automatic column creation with the "GENERATE ALWAYS AS GENERATE" expression. By default, columns that are cancelled will have their default value set in NULL.

Configuration properties

Common
Name
Description
Values
name connector
Name of connector
<name_connector>
connector class
Specifies the kind of connector that integration will handle integration with Databricks
 com.onibex.connect.datalake.jdbc.OnibexDataLakeSinkConnector
topics
List of topics to be consumed by this connector
<topic_name>


Connection

Name
Description
Values
connection.host_name
Host server name
<Databricks_Server_Hostname>
connection.Auth_AccessToken
Access Token
<Databricks_AccessToken>
connection.httppath
Los http Path provided in the JDBC connection details.
<Databricks_Http_Path>


connection.ConnCatalog


The names of the catalogue in Unity Catalog.
<Databricks_Catalog>



connection.ConnSchema



The name of the schema
<Databricks_schema>
onibex.lincense
Onibex License
<onibex_license>



Transaction

Name
Description
Values
insert.mode
Definitions The SQL operation used to write data in the target table.
insert/upsert/update
batch.size
Specifies the number of records to be grouped into a single SQL transaction, when possible.
Positive integer value > 1
delete.enabled
Indicates null registration values should be treated as deleted. Requires pk.mode record_key.
true/false

Mapping tables

Name

Description

Values

table.name.format
Format string  used to define the name of the target table. Includes ${topic} as a placeholder for the original theme name.
${topic}
pk.mode
Specifies  where you find the main key for the records that are inserted.
none/record_key/record_value
pk.fields
A list separated by commas of field names representing the main  key.
record_key
fields.whitelist
Commas separated by commas of field names to be included from the record value. If left, all fields in the registry will be included.
(optional)

Evolution of the scheme Support

Name

Description

Values

auto.create
Specification if the connector must automatically create the target table based on the target table in the log scheme.
true/false
auto.evolve
Definition if you automatically add new columns to the target table scheme when the log scheme evolves.
true/false

Connector recovery

Name 

Description

Values

max.retries
Specifies  the maximum number of retry attempts to be made by the connector in the event of failure.
Positive full value >= 1
retry.backoff.ms
Time in milliseconds to wait after finding a mistake before making a new attempt.
Positive full value >=1

Converters

Name DescriptionValues
key.converter
Key converter in Avro format
io.confluent.connect.avro.AvroConverter
value.converter
Value converter in Avro format
io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url
Confluent schema log URLs for Keys
http://<schema_registry_ip>:<port>
value.converter.schema.registry.url
Confluent Scheme Log URL for Values
http://<schema_registry_ip>:<port>


Example

Connector configuration with schema registry (Confluent Platform)

    Notes
    { "name": "<name_connector>", "config": { "value.converter.schema.registry.url": "http://<ip_schema_registry>:<port>", "key.converter.schema.registry.url": "http://<ip_schema_registry>:<port>", "connector.class": "com.onibex.connect.datalake.jdbc.OnibexDataLakeSinkConnector", "tasks.max": "1", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "topics": "<topic_name>", "connection.host_name": "<databricks_host_name>", "connection.httpPath": "<databricks_htppPath>", "connection.Auth_AccessToken": "<databricks_AccessToken", "connection.ConnCatalog": "<databricks_catalog>", "connection.ConnSchema": "<databricks_schema", "connection.timeout": "10000", "connection.idleTimeout": "30000", "connection.maximumPoolSize": "1", "insert.mode": "UPSERT", "delete.enabled": "true", "table.name.format": "${topic}", "pk.mode": "record_key", "pk.fields": "", "auto.create": "true", "auto.evolve": "true", "onibex.license": "<onibex_license>" } }

Connector configuration with schema registry (Confluent Cloud)


Notes
 {                                       
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "confluent.custom.schema.registry.auto": "true", 
    "topics": "<topic_name>",
    "connection.host_name": "<databricks_host_name>",
    "connection.httpPath": "<databricks_htppPath>",
    "connection.Auth_AccessToken": "<databricks_AccessToken",                                                                                       
    "connection.ConnCatalog": "<databricks_catalog>",
    "connection.ConnSchema": "<databricks_schema",
    "connection.timeout": "10000",
    "connection.idleTimeout": "30000",
    "connection.maximumPoolSize": "1",
    "insert.mode": "UPSERT",
    "delete.enabled": "true",
    "table.name.format": "${topic}",
    "pk.mode": "record_key",
    "pk.fields": "",
    "auto.create": "true",
    "auto.evolve": "true",
    "onibex.license": "<onibex_license>"
  }


    Permits required for the user or service account

    In order for the sink connector to correctly create, modify and modify administer tables in Databricks, user account or service account authenticated through A OAuth2 you must obtain the following minimum permissions:

    1. Table management permits:

    CREATE: Permission to create new tables in the data objective or outline.

    ALTER: Permission to modify the existing table scheme (for example, add new columns).

    INSERT: Permission to insert data into existing or new posts created.

    UPDATE: Permission to update records within the table.

    DELETE: Permission to delete records from the table.

    MERGE: Permission to carry out MERGE operations, which combine INSERT, UPDATE and DELETE.


    2. Outline and catalogue permissions:

    SELECTION: Permission to read from existing tables and schemes, as this may be necessary for the evolution and verification of the scheme.

    USAGE: Permission to access the catalog and outline where tops are located.


    3. Specific database permissions:

    Make sure that the user or service account has enough privileges at the database or outline level to execute these operations. Excludes: - Catalogue: Permits to list and access relevant catalogues. - -Database: Permissions to list and access databases within catalogs.





      • Related Articles

      • Onibex Snowflake Sink Connector for Confluent Platform and Cloud

        The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
      • Onibex Clickhouse Sink Connector

        The Onibex Clickhouse JDBC connector sends real-time data from Kafka to write to Tables based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and auto-evolution is supported using the ...
      • Onibex Snowflake Sink Connector Benefits

        The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
      • Performance Between Snowflake Connector vs Onibex Connector

        Introduction This article aims to compare the performance of two different connectors used to send data to Snowflake: Snowflake’s native connector and a custom connector developed by Onibex. The tests were conducted on both Confluent Cloud and ...
      • One Connect Cloud Deployment

        Prerequisites Download the required .zip folders attached at the end of the document: sql.zip one-connect.zip kafka-compose.zip Requirements for the Virtual Machine System: Linux Architecture: 64-bit processors (x86_64) support Instance ...