Onibex Databricks JDBC Connector for Confluent Cloud

Onibex Databricks JDBC Connector for Confluent Cloud

JDBC Onibex Connector for Databricks

The JDBC Onibex connector for Databricks sends real-time data from Kafka to write into live DeltaLake tables. Idempotent writes can be achieved using upserts. Automatic table creation and schema evolution are supported through schema registration.

Features

  • Idempotent Writes: The insert.mode is set to INSERT. If configured as UPSERT, the connector will use upsert semantics instead of simple insert statements. UPSERT semantics ensure atomic aggregation of a new row or updating an existing row if there is a key constraint violation, providing idempotency.
  • Schemas: The connector supports the Avro input format for both key and value. Schema registration must be enabled to use a Schema Registry-based format.
  • Automatic Table and Column Creation: Auto-creation (auto.create) and auto-evolution are supported. Missing tables or columns can be created automatically. Table names are generated based on Kafka topic names.
  • Raw Data Support: The connector supports raw data ingestion into Databricks when insert.mode is set to INSERT and pk.mode is set to none

Limitations

Review the limitations and capabilities for Databricks JDBC driver.

  1. Catalogue and Scheme Management:The connector does not have the ability to create catalogs or schemes. These must be created manually before the connector can be used.

  2. Creation of tables:Automatic table creation by the connector supports the inclusion of "PARTITIONED BY" or "PRIMARIOCluses KEY" in the definition of table. If the partition or a primary key required for performance optimization, users must run the necessary ALTERNAL commands after creating the table.

  3. Column creation:The connector does not support automatic column creation with the "GENERATE ALWAYS AS GENERATE" expression. By default, columns that are cancelled will have their default value set in NULL.

Configuration properties

Connection

Name
Description
Values
connection.host_name
Host servant name
Chain value
plug.user
Optional if URL contains PWD parameter
Chain value
connection.httppath
Los http Path provided in the JDBC connection details.
Chain value
connection.Auth_AccessToken
OAuth 2.0 access lostoken used to connect to a server.
Chain value


connection.ConnCatalogue


The names of the catalogue in Unity Catalog.
Chain value



connection.ConnSchema



The name of the outline within the catalogue.
Chain value



connection.[PROPERTENCE]



Add any additional connection configuration property.
Chain value

Transaction


Name
Description
Values
inser.mode
Definitions The SQL operation used to write data in the target table.
insert
upsert
update update
batch.size
Specifections the number of records to be grouped into a single SQL transaction, when possible.
Positive integer value > 1
delete.enabled
Indicates null registration values should be treated as deleted. Requires pk.mode record_key.
true
false

Mapping tables

Name

Description

Values

table.name.format
Format acadena used to define the name of the target table. Includes ${topic} as a placeholder for the original theme name.
Chain value
pk.mode
Specifections where you find the main key for the records that are inserted.
none
record_key
record_value
pk.fields
Alist separated by commas of field names representing the main paramese key.
Cadenavalor value
fields.whitelist
Commas separated by commas of field names to be included from the record value. If left, all fields in the registry will be included.
Value chain(optional)

Evolution of the scheme Support

Name

Description

Values

auto.create
Specification if the connector must automatically create the target table based on the target table in the log scheme.
true
false
auto.evolve
Definition if you automatically add new columns to the target table scheme when the log scheme evolves.
true 
false

Connector recovery

Name Name Name Name

Description

Values

max.retries
Specifections the maximum number of retry attempts to be made by the connector in the event of failure.
Positive full value
retry.backoff.ms
Time in milliseconds to wait after finding a mistake before making a new attempt.
Positive full value

Converters

Name

Description

Values

key.converter
The converters used to serialize the registration key.
io.confluent.connect.avro.
header.converter
The converters used to serialize the registry headings.
io.confluent.connect.avro.
.apache.kafka.connect.converters.ByteArrayConverterrayConverterray
value.converter
The converters used to serialize the record value.
io.confluent.connect.avro.

.apache.kafka.connect.converters.ByteArrayConverterrayConverterray

Example

Fluently cloud connector configuration with schema logging

  1. { { {
  2. "auto.create": "true",
  3. "auto.evolve": "true",
  4. "auto.offset.reset":"earliest",
  5. "confluent.custom.schema.registry.auto":"true",
  6. "connection.host_name":"DATABRICKS_HOST.cloud.databricks.com",
  7. "connection.httpPath":"/sql/1.0/warehouses/",
  8. "connection.Auth_AccessToken":"tu_token",
  9. "connection.ConnCatalog":"ytu_catalog",
  10. "connection.ConnSchema":"ytu_schema",
  11. "consumer.request.timeout.ms":"20000",
  12. "consumer.retry.backoff.ms":"500",
  13. "consumer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLogin
  14. "consumer.sasl.mechanism":"PLAIN",
  15. "consumer.security.protocol":"SASL_SSL",
  16. "consumer.ssl.endpoint.identification.algorithm":"https",
  17. "delete.enabled":"true",
  18. "insert.mode":"upsert",
  19. "key.converter":"io.confluent.connect.avro.
  20. "offset.flush.interval.ms":"10000",
  21. "pk.mode":"record_key",
  22. "table.name.format":"${topic}",
  23. "themes":"TUE_TOPIC",
  24. "value.converter":"io.confluent.connect.avro
  25. }

 Configuring Fluid Cloud Connector Configuration (Raw Bytes)

  1. { { {
  2. "auto.create": "true",
  3. "auto.evolve": "true",
  4. "auto.offset.reset":"earliest",
  5. "connection.host_name":"DATABRICKS_HOST.cloud.databricks.com",
  6. "connection.httpPath":"/sql/1.0/warehouses/",
  7. "connection.Auth_AccessToken":"tu_token",
  8. "connection.ConnCatalog":"ytu_catalog",
  9. "connection.ConnSchema":"ytu_schema",
  10. "consumer.request.timeout.ms":"20000",
  11. "consumer.retry.backoff.ms":"500",
  12. "consumer.sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLogin
  13. "consumer.sasl.mechanism":"PLAIN",
  14. "consumer.security.protocol":"SASL_SSL",
  15. "consumer.ssl.endpoint.identification.algorithm":"https",
  16. "delete.enabled":"false",
  17. "insert.mode":"insert",
  18. "key.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
  19. "offset.flush.interval.ms":"10000",
  20. "pk.mode":"nuno",
  21. "request.timeout.ms":"20000",
  22. "retry.backoff.ms":"500",
  23. "table.name.format":"${topic}",
  24. "themes":"TUE_TOPIC",
  25. "header.converter":"org.apache.kafka.connect.converters.
  26. "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter"
  27. }

Authentication configuration for Databricks JDBC Driver

Databricks personal example url from token url:

  1. jdbc:databricks://:443;httpPath=;AuthMech=3;UID=token;PWD=

OAuth 2.0 tokens example:

  1. jdbc:databricks://:443;httpPath=;AuthMech=11;Auth_Flow=0;Auth_AccessToken=

Permits required for the user or service account

In order for the sink connector to correctly create, modify and modify administer tables in Databricks, user account or service account authenticated through A OAuth2 you must obtain the following minimum permissions:

1. Table management permits:

CREATE: Permission to create new tables in the data objective or outline.

ALTER: Permission to modify the existing table scheme (for example, add new columns).

INSERT: Permission to insert data into existing or new posts created.

UPDATE: Permission to update records within the table.

DELETE: Permission to delete records from the table.

MERGE: Permission to carry out MERGE operations, which combine INSERT, UPDATE and DELETO.

2. Outline and catalogue permissions:

SELECTION: Permission to read from existing tables and schemes, as this may be necessary for the evolution and verification of the scheme.

USAGE: Permission to access the catalogue and outline where eltops are located.

3. OAuth2 scope requirements:

The OAuth2 token must be issued with the appropriate areas which allow table operations and outlines, which normally include: -databricks:catalog:read - databricks:tablericks:

4. Specific database permissions:

Make sure that the user or service account has enough privileges at the database or outline level to execute these operations. Excludes: - Catalogue: Permits to list and access relevant catalogues. - -Database: Permissions to list and access databases within catalogs.

Example of SQL Permissions to Grant:

Create, insert, update, alter, select, merge in my_database.my_table A 'service_account';

the database to the service_account;




    • Related Articles

    • Onibex Snowflake Sink Connector for Confluent Platform and Cloud

      The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
    • Onibex Clickhouse Sink Connector

      The Onibex Clickhouse JDBC connector sends real-time data from Kafka to write to Tables based on the topics subscription. It is possible to achieve idempotent writes with upserts. Auto-creation of tables and auto-evolution is supported using the ...
    • Onibex Snowflake Sink Connector Benefits

      The JDBC snowflake connector sends real-time data from Confluent Platform and Cloud for writing to the theme-subscription Snowflake Tables. It is possible to achieve idempotent writings with elevators. Self-creation of tables and self-evolution is ...
    • One Connect Cloud Deployment

      Prerequisites Download the required .zip folders attached at the end of the document: sql.zip one-connect.zip kafka-compose.zip Requirements for the Virtual Machine System: Linux Architecture: 64-bit processors (x86_64) support Instance ...
    • Desempeño entre el Conector Snowflake vs Conector Onibex

      Introducción Este artículo tiene como objetivo comparar el rendimiento entre dos conectores diferentes que se utilizan para enviar datos a Snowflake: el conector nativo de Snowflake y un conector custom desarrollado por Onibex. Las pruebas se ...