The JDBC Onibex connector for Databricks sends real-time data from Kafka to write into live DeltaLake tables. Idempotent writes can be achieved using upserts. Automatic table creation and schema evolution are supported through schema registration.
insert.mode
is set to INSERT
. If configured as UPSERT
, the connector will use upsert semantics instead of simple insert statements. UPSERT
semantics ensure atomic aggregation of a new row or updating an existing row if there is a key constraint violation, providing idempotency.auto.create
) and auto-evolution are supported. Missing tables or columns can be created automatically. Table names are generated based on Kafka topic names.insert.mode
is set to INSERT
and pk.mode
is set to none
LimitationsReview the limitations and capabilities for Databricks JDBC driver.
Creation of tables:Automatic table creation by the connector supports the inclusion of "PARTITIONED BY" or "PRIMARIOCluses KEY" in the definition of table. If the partition or a primary key required for performance optimization, users must run the necessary ALTERNAL commands after creating the table.
Column creation:The connector does not support automatic column creation with the "GENERATE ALWAYS AS GENERATE" expression. By default, columns that are cancelled will have their default value set in NULL.
Name
|
Description
|
Values
|
connection.host_name
|
Host servant name
|
Chain value |
plug.user
|
Optional if URL contains PWD parameter
|
Chain value
|
connection.httppath
|
Los http Path provided in the JDBC connection details.
|
Chain value
|
connection.Auth_AccessToken
|
OAuth 2.0 access lostoken used to connect to a server.
|
Chain value
|
connection.ConnCatalogue
|
The names of the catalogue in Unity Catalog.
|
Chain value
|
connection.ConnSchema
|
The name of the outline within the catalogue.
|
Chain value
|
connection.[PROPERTENCE]
|
Add any additional connection configuration property.
|
Chain value
|
Name
|
Description
|
Values
|
inser.mode
|
Definitions The SQL operation used to write data in the target table.
|
insert
upsert
update update
|
batch.size
|
Specifections the number of records to be grouped into a single SQL transaction, when possible.
|
Positive integer value > 1
|
delete.enabled
|
Indicates null registration values should be treated as deleted. Requires pk.mode record_key.
|
true
false
|
Name |
Description
|
Values
|
table.name.format
|
Format acadena used to define the name of the target table. Includes ${topic} as a placeholder for the original theme name.
|
Chain value
|
pk.mode
|
Specifections where you find the main key for the records that are inserted.
|
none
record_key
record_value
|
pk.fields
|
Alist separated by commas of field names representing the main paramese key.
|
Cadenavalor value
|
fields.whitelist
|
Commas separated by commas of field names to be included from the record value. If left, all fields in the registry will be included.
|
Value chain(optional)
|
Name
|
Description
|
Values
|
auto.create
|
Specification if the connector must automatically create the target table based on the target table in the log scheme.
|
true
false
|
auto.evolve
|
Definition if you automatically add new columns to the target table scheme when the log scheme evolves.
|
true
false
|
Name Name Name Name
|
Description |
Values |
max.retries
|
Specifections the maximum number of retry attempts to be made by the connector in the event of failure.
|
Positive full value
|
retry.backoff.ms
|
Time in milliseconds to wait after finding a mistake before making a new attempt.
|
Positive full value |
Name
|
Description |
Values |
key.converter
|
The converters used to serialize the registration key.
|
io.confluent.connect.avro.
|
header.converter
|
The converters used to serialize the registry headings.
|
io.confluent.connect.avro.
.apache.kafka.connect.converters.ByteArrayConverterrayConverterray
|
value.converter
|
The converters used to serialize the record value.
|
io.confluent.connect.avro.
.apache.kafka.connect.converters.ByteArrayConverterrayConverterray |
In order for the sink connector to correctly create, modify and modify administer tables in Databricks, user account or service account authenticated through A OAuth2 you must obtain the following minimum permissions:
CREATE: Permission to create new tables in the data objective or outline.
ALTER: Permission to modify the existing table scheme (for example, add new columns).
INSERT: Permission to insert data into existing or new posts created.
UPDATE: Permission to update records within the table.
DELETE: Permission to delete records from the table.
MERGE: Permission to carry out MERGE operations, which combine INSERT, UPDATE and DELETO.
SELECTION: Permission to read from existing tables and schemes, as this may be necessary for the evolution and verification of the scheme.
USAGE: Permission to access the catalogue and outline where eltops are located.
The OAuth2 token must be issued with the appropriate areas which allow table operations and outlines, which normally include: -databricks:catalog:read - databricks:tablericks:
Make sure that the user or service account has enough privileges at the database or outline level to execute these operations. Excludes: - Catalogue: Permits to list and access relevant catalogues. - -Database: Permissions to list and access databases within catalogs.
Example of SQL Permissions to Grant:
Create, insert, update, alter, select, merge in my_database.my_table A 'service_account';
the database to the service_account;