The JDBC Onibex connector for Databricks sends real-time data from Kafka to write into live DeltaLake tables. Idempotent writes can be achieved using upserts. Automatic table creation and schema evolution are supported through schema registration.
insert.mode
is set to INSERT
. If configured as UPSERT
, the connector will use upsert semantics instead of simple insert statements. UPSERT
semantics ensure atomic aggregation of a new row or updating an existing row if there is a key constraint violation, providing idempotency.auto.create
) and auto-evolution are supported. Missing tables or columns can be created automatically. Table names are generated based on Kafka topic names.insert.mode
is set to INSERT
and pk.mode
is set to none
LimitationsReview the limitations and capabilities for Databricks JDBC driver.
Creation of tables: Automatic table creation by the connector supports the inclusion of "PARTITIONED BY" or "PRIMARY KEY" in the definition of table.
Column creation: The connector does not support automatic column creation with the "GENERATE ALWAYS AS GENERATE" expression. By default, columns that are cancelled will have their default value set in NULL.
Name | Description | Values |
name connector | Name of connector | <name_connector> |
connector class | Specifies the kind of connector that integration will handle integration with Databricks | com.onibex.connect.datalake.jdbc.OnibexDataLakeSinkConnector |
topics | List of topics to be consumed by this connector | <topic_name> |
Name | Description | Values |
connection.host_name | Host server name | <Databricks_Server_Hostname> |
connection.Auth_AccessToken | Access Token | <Databricks_AccessToken> |
connection.httppath | Los http Path provided in the JDBC connection details. | <Databricks_Http_Path> |
connection.ConnCatalog | The names of the catalogue in Unity Catalog. | <Databricks_Catalog> |
connection.ConnSchema | The name of the schema | <Databricks_schema> |
onibex.lincense | Onibex License | <onibex_license> |
Name | Description | Values |
insert.mode | Definitions The SQL operation used to write data in the target table. | insert/upsert/update |
batch.size | Specifies the number of records to be grouped into a single SQL transaction, when possible. | Positive integer value > 1 |
delete.enabled | Indicates null registration values should be treated as deleted. Requires pk.mode record_key. | true/false |
Name | Description | Values |
table.name.format | Format string used to define the name of the target table. Includes ${topic} as a placeholder for the original theme name. | ${topic} |
pk.mode | Specifies where you find the main key for the records that are inserted. | none/record_key/record_value |
pk.fields | A list separated by commas of field names representing the main key. | record_key |
fields.whitelist | Commas separated by commas of field names to be included from the record value. If left, all fields in the registry will be included. | (optional) |
Name | Description | Values |
auto.create | Specification if the connector must automatically create the target table based on the target table in the log scheme. | true/false |
auto.evolve | Definition if you automatically add new columns to the target table scheme when the log scheme evolves. | true/false |
Name | Description | Values |
max.retries | Specifies the maximum number of retry attempts to be made by the connector in the event of failure. | Positive full value >= 1 |
retry.backoff.ms | Time in milliseconds to wait after finding a mistake before making a new attempt. | Positive full value >=1 |
Name | Description | Values |
key.converter | Key converter in Avro format | io.confluent.connect.avro.AvroConverter |
value.converter | Value converter in Avro format | io.confluent.connect.avro.AvroConverter |
key.converter.schema.registry.url | Confluent schema log URLs for Keys | http://<schema_registry_ip>:<port> |
value.converter.schema.registry.url | Confluent Scheme Log URL for Values | http://<schema_registry_ip>:<port> |
{ "name": "<name_connector>", "config": { "value.converter.schema.registry.url": "http://<ip_schema_registry>:<port>", "key.converter.schema.registry.url": "http://<ip_schema_registry>:<port>", "connector.class": "com.onibex.connect.datalake.jdbc.OnibexDataLakeSinkConnector", "tasks.max": "1", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "topics": "<topic_name>", "connection.host_name": "<databricks_host_name>", "connection.httpPath": "<databricks_htppPath>", "connection.Auth_AccessToken": "<databricks_AccessToken", "connection.ConnCatalog": "<databricks_catalog>", "connection.ConnSchema": "<databricks_schema", "connection.timeout": "10000", "connection.idleTimeout": "30000", "connection.maximumPoolSize": "1", "insert.mode": "UPSERT", "delete.enabled": "true", "table.name.format": "${topic}", "pk.mode": "record_key", "pk.fields": "", "auto.create": "true", "auto.evolve": "true", "onibex.license": "<onibex_license>" } }
In order for the sink connector to correctly create, modify and modify administer tables in Databricks, user account or service account authenticated through A OAuth2 you must obtain the following minimum permissions:
CREATE: Permission to create new tables in the data objective or outline.
ALTER: Permission to modify the existing table scheme (for example, add new columns).
INSERT: Permission to insert data into existing or new posts created.
UPDATE: Permission to update records within the table.
DELETE: Permission to delete records from the table.
MERGE: Permission to carry out MERGE operations, which combine INSERT, UPDATE and DELETE.
SELECTION: Permission to read from existing tables and schemes, as this may be necessary for the evolution and verification of the scheme.
USAGE: Permission to access the catalog and outline where tops are located.
Make sure that the user or service account has enough privileges at the database or outline level to execute these operations. Excludes: - Catalogue: Permits to list and access relevant catalogues. - -Database: Permissions to list and access databases within catalogs.