22 Oracle SQL Access to Kafka

Starting with Oracle AI Database 26ai, you can use Oracle SQL APIs to query Kafka topics dynamically using Oracle SQL.

Oracle SQL Access to Kafka integrates Kafka and OCI Streaming Service streams with Oracle AI Database in several important ways. First, it enables you to connect Oracle AI Database to one or more Kafka topics. After the database is connected, you can then query that topic dynamically using Oracle SQL, without persisting the Kafka data in Oracle AI Database. This feature enables you to analyze real time data in combination with data captured in your database. In addition, Oracle SQL Access to Kafka enables fast, scalable and lossless loading of Kafka topics into Oracle AI Database. The DBMS_KAFKA APIs simplify the management of this entire process.

22.1 About Oracle SQL Access to Kafka Version 2

Oracle SQL Access to Kafka (OSaK) is a native Oracle AI Database feature that enables you to query Kafka topics directly using Oracle SQL.

Starting with Oracle AI Database 26ai, version 2 of Oracle SQL access to Kafka is installed with the database. The service provides a native Oracle AI Database connector service to Kafka clusters. It provides a rich set of features accessed through the DBMS_KAFKA and DBMS_KAFKA_ADM PL/SQL packages.

Oracle SQL Access to Kafka integrates Kafka and OCI Streaming Service streams with Oracle AI Database in several important ways. First, it enables you to connect Oracle AI Database to one or more Kafka topics. After the database is connected, you can then query that topic dynamically using Oracle SQL, without persisting the Kafka data in Oracle AI Database. This feature enables you to analyze real time data in combination with data captured in your database. In addition, Oracle SQL Access to Kafka enables fast, scalable and lossless loading of Kafka topics into Oracle AI Database. The DBMS_KAFKA APIs simplify the management of this entire process.

Starting with Oracle AI Database 26ai (23.26.2), Oracle SQL Access to Kafka supports Oracle Wallet-based SSL configuration for secured Kafka connections. For Oracle AI Database 26ai and later, SSL certificates and keys are managed through Oracle wallets created with the orapki tool. The legacy PEM-based SSL properties are desupported. Replace PEM-based SSL properties with wallet-based configuration.

What You Can Do with OSaK

With OSaK, you can query, process, and integrate Kafka streaming data directly within Oracle AI Database using familiar Oracle tools and SQL semantics. This capability eliminates the need for external client connectors and enables you to relate streaming data to operational, transactional, or analytical tables within Oracle AI Database. OSaK can scale up data streams for the database in the same fashion as Kafka applications.

Oracle SQL access to Kafka enables you to do the following:

  • Create and use a streaming application to process unread Kafka records exactly once, and then discard these records after processing.
  • Create and use a loading application to capture unread Kafka records permanently in an Oracle AI Database table, for access by various Oracle applications. Kafka records can be are captured and persisted in user tables in Oracle AI Database both for analytics and data warehousing.
  • Re-read or seek records in Kafka topics based on offset or a user-supplied timestamp interval for auditing or reconciliation.
  • Join data from multiple Kafka topics, or between Kafka and Oracle tables, all within a sing SQL statement.

How OSaK Works

OSaK exposes system-generated views and external tables over Kafka topics using the DBMS_KAFKA package. All operations are part of a database transaction, and meet full ACID (Atomicity, Consistency, Isolation, Durability) standards. Both data changes and Kafka partition offset management are performed together in database transactions, ensuring consistent state. If a transaction fails, you can use the transaction ID and timestamp to identify and easily roll back unwanted changes or errors, all without data loss or duplication.

Without Oracle SQL Access to Kafka, the Kafka partition offsets need to be managed either by the application, or by Kafka, neither of which support transaction semantics. This means that after a system failure, Kafka records can be lost or reprocessed by an application.

Managing Kafka partition offsets in the Oracle AI Database transaction provides better isolation and durability than application- or Kafka-managed offsets. ACID requirements for the database ensure that either all parts of the transaction are committed, or all rolled back, with a unique identifier (a transaction ID) for each transaction. This transaction ID includes timestamps that you can use to identify and roll back errors. The ACID feature of Oracle AI Database transactions provides support for data recovery in case of a failure, without losing or repeating records.

Because Oracle SQL Access to Kafka is available with Oracle AI Database, and is used with PL/SQL and SQL queries, no external client application is required to provide a connector to the database.

The ORA_KAFKA PL/SQL package has functions and procedures to register a Kafka cluster in a database schema, query Kafka topics, query data from specified offsets or specified timestamps, and more. You can choose either to use global temporary tables without storing the data, or store the data into user tables in the target Oracle AI Database.

Leveraging Global Temporary Tables (GTT) Best Practice

You can use OSaK to access global temporary tables (GTT) or user tables created in Oracle AI Database so that your application can obtain data. That data can be streams of data, or snapshots of the data from other databases. This data can be accessed directly, or loaded into database tables and be used within your application.

The corresponding global temporary table receives a snapshot from an Oracle SQL access to Kafka view. Applications use this temporary table for one or more queries within a transaction: a GTT is loaded once, and used. The application can use standard Oracle SQL with the GTT. The Kafka offsets are advanced, and then the application commits, indicating that it is finished with the Kafka records loaded in the GTT.

Key Benefits of OSaK Global Temporary Tables

The key benefits of using OSaK GTTs are as follows:

  • Batch-Optimized Loading: Each Kafka batch is fetched only once into the GTT, minimizing network round-trips and Kafka cluster load.
  • Transaction-Based Caching: The GTT holds a consistent snapshot of the Kafka data for the duration of the transaction. With GTTs, you can issue multiple queries and joins with standard Oracle tables. Repeatable reads are guaranteed without additional Kafka queries needed for the duration of the application instance. You can leverage the mature optimization and processing strategies in Oracle AI Database to minimize code paths needed to join tables efficiently.
  • Atomic Offset Management: Upon commit, both database changes and the associated Kafka offsets are atomically persisted, ensuring reliable, exactly-once processing.
  • Incremental Data Movement: On subsequent executions, only new records (since the last commit) are fetched, making incremental, efficient data processing seamless.
  • You can leverage the mature optimization and processing strategies in Oracle AI Database to minimize code paths needed to join tables efficiently.

Benefits for Database Administrators

  • Centralized Management: Kafka integration, offset tracking, and data operations are managed within Oracle’s security and compliance framework. Yo do not need to administer separate middleware.
  • Resilient Operations: ACID transactions extend to streaming data: after a failure, no data is lost or reprocessed unnecessarily.
  • Efficient Resource Utilization: Each batch requires just one Kafka fetch per transaction, which reduces overhead, network load and latency by fetching Kafka data just once per operation.
  • Compliant and Secure: All access, auditing, and governance are handled natively in Oracle Database.

Benefits for Application Developers

  • Simple, Powerful SQL Interface: Access Kafka topics with standard SQL and PL/SQL. You can apply mature SQL procedures, simplifying development and enabling rapid prototyping. You can take advantage of the Oracle optimizer to gather statistics on GTT data for efficient execution plans and processing strategies for maximum performance and maintainability.
  • Consistent Snapshots: The GTT holds a static data set for each transaction. This enables repeatable reads, making multi-step queries and complex joins predictable and reliable.
  • High Performance: There is no need to handle offset logic, idempotency, or error recovery in application code; the database manages these tasks for you.
  • Integration Flexibility: You can join Kafka data with operational or historical data within Oracle for holistic analytics and insights.

22.2 Global Tables and Views for Oracle SQL Access to Kafka

Learn how Oracle SQL Access to Kafka (OSAK) accesses Kafka STREAMING, SEEKING, and LOAD applications, and how it uses unique ORA$ prefixes for global temporary tables.

For STREAMING and SEEKING applications, OSAK provides views that represent Kafka topic data. Applications use PL/SQL to call an OSAK procedure, which queries the appropriate OSAK view and loads the results into global temporary tables.

LOAD applications do not require global temporary tables. Instead, LOAD applications perform incremental loads into an existing Oracle Database table using the EXECUTE_LOAD_APP procedure.

For all three application types (STREAMING, SEEKING and LOAD), OSAK automatically creates the views and external tables.

Both the views and temporary tables created by OSAK have unique ORA$ prefixes that identify them as generated by OSAK. OSAK-generated views and tables use the following unique prefixes:

  • ORA$DKV: Views created by OSAK for Kafka access.
  • ORA$DKX: External tables created by OSAK
  • ORA$DKVGTT: Global temporary tables loaded from a streaming or seeking application. which are loaded using OSAK procedures.

Kafka-generated views and external tables serve calls to DBMS_KAFKA to load data from Kafka into a user-owned table or into a global temporary table. Typically, these views and external tables are treated as internal objects, which are not directly manipulated by an Oracle application.

Global temporary tables are loaded transparently when calling DBMS_KAFKA.LOAD_TEMP_TABLE.

22.3 Understanding how Oracle SQL Access to Kafka Queries are Performed

Oracle SQL Access to Kafka accesses Kafka streaming data, but queries are performed on Oracle Database global temporary tables, which provides several advantages.

A typical application does not query Oracle SQL Access to Kafka views directly. Instead:

  • Each query from an Oracle SQL Access to Kafka view fetches data directly from Kafka from the current offset to the current high water mark. Because rows are continually being added, each query from a view will likely retrieve more rows. Therefore,Oracle SQL Access to Kafka views do not support repeatable reads, either explicitly from multiple queries or implicitly within a join.
  • There are no reliable statistics gathered from Oracle SQL Access to Kafka views for the query optimizer
  • Each query from an Oracle SQL Access to Kafka view results in a trip to the Kafka cluster, re-retrieving the same rows and perhaps additional rows. These query retrievals can affect performance.

The corresponding temporary table receives a snapshot from an Oracle SQL Access to Kafka view. Applications use this temporary table for one or more queries within a transaction. Reading from the temporary table is beneficial for the following reasons:

  • Repeatable reads are supported, either explicitly from multiple queries or implicitly within a join
  • Reliable statistics are gathered for the query optimizer
  • Only one read is made to Kafka when loading the temporary table. Subsequent queries do not require returning to the Kafka cluster to access the data.

The global temporary tables can be joined with standard Oracle tables (for example, Oracle customer relationship management (CRM) tables.

By joining Oracle SQL access to Kafka temporary tables with Oracle Database tables, you obtain the following advantages:

  • Leveraging the mature optimization and execution strategies in Oracle Database to minimize code path required to join tables efficiently
  • Obtaining Oracle Database transaction semantics, with the security of Oracle Database ACID transaction processing (atomicity, consistency, isolation, and durability), ensuring that all changes to data are performed as if they are a single operation, controlled by the application
  • Managing the Kafka partition offsets and committing them to database metadata tables in Oracle Database, so that after a system failure, these Oracle Database transactions with Kafka records are not subject to being lost or reprocessed by an application.

22.4 Streaming Kafka Data Into Oracle Databases

Oracle SQL Access to Kafka enables Kafka streaming data to be processed with Oracle AI Database tables using standard SQL semantics.

Apache Kafka is commonly used to capture and consolidate data from many streaming sources, so that analytics can be performed on this data. Typically, this requires loading of all the Kafka records into the database, and then combining the data with database tables for analytics, either for short-term study or for longer analysis.

With Oracle SQL access to Kafka, you can use standard SQL, PL/SQL and other database development tools to accomplish the load from Kafka to an Oracle Database, and process that data using standard Oracle application logic, such as JDBC applications. Oracle SQL access to Kafka can create a view that maps to all partitions of the Kafka topic that you want to load. Each Oracle SQL access to Kafka call to load more data queries this view, which in turn queries all partitions of the Kafka topic from the previous point last read to the current data high watermark offset (the offset of the last message that was fully inserted to all Kafka partitions). Data retrieved from the Kafka partitions is loaded into a temporary Oracle Database table.

These Oracle SQL Access to Kafka views behave much like a Kafka application instance. They read records from Kafka starting at a given offset until it reaches the high watermark offset

When Oracle SQL Access to Kafka creates a view, it also creates a corresponding global temporary table. The application calls an Oracle SQL Access to Kafka PL/SQL procedure to load this global temporary table with the results of a query from the corresponding Oracle SQL Access to Kafka view.

The global temporary tables can be joined with standard Oracle tables (for example, Oracle customer relationship management (CRM) tables.

By joining Oracle SQL access to Kafka temporary tables with Oracle Database tables, you obtain the following advantages:

  • Leveraging the mature optimization and execution strategies in Oracle AI Database to minimize code path required to join tables efficiently
  • Obtaining Oracle AI Database transaction semantics, with the security of Oracle AI Database ACID transaction processing (atomicity, consistency, isolation, and durability), ensuring that all changes to data are performed as if they are a single operation, controlled by the application
  • Managing the Kafka partition offsets and committing them to database metadata tables in Oracle AI Database, so that after a system failure, these database transactions with Kafka records are not subject to being lost or reprocessed by an application.

22.5 Querying Kafka Data Records by Timestamp

Oracle SQL Access to Kafka in Seekable mode assists you to query older data stored in Kafka, based on timestamps associated with the Kafka data.

In the event of anomalies, you can use Oracle SQL access to Kafka to assist with identifying Kafka data associated with the anomaly in a specified window of time.

For example, suppose a computer company has multiple sites. Each site has labs, and all access to the buildings and labs are protected by key card access. The company has a vast array of employees, some who just need office space, some who maintain the machines in the labs, and some who monitor the buildings for issues such as ventilation issues, unpermitted access, and general usages of the sites. In this scenario, Kafka topics can consist of the following:

  • Key card usage (KCdata)
  • Facility monitoring (Fdata)
  • System monitoring, such as uptime, access, intrusion detection (Sdata)

If an usual event is detected while reading through Kafka data and combining it with Oracle data, the application can log the anomaly along with the timestamp of the record containing the unusual event. A second application can then read through these errors and process them. For each unusual event, the application might seek to a window of timestamps 10 seconds before and after the event. This is similar to analyzing exceptions in log files. It is common to look at log entries before and after the event to see if the exception was caused by an earlier issue, or if the exception led to downstream problems.

To evaluate a site issue, you can load the key card readers data (KCdata) to a permanent table. For example, if multiple applications use this data, then it would make sense to load that data into an Oracle Database table that can be used by multiple applications, to assist the real estate team to track buildings and office usage. The IT department uses the data to determine who is on site to handle issues.

Using a Streaming query, you can scan the facility data (Fdata) to determine if there are any atypical or unusual events in the data. This could be a spike in lab temperature, a door that did not close and is raising an alarm, the fire detection system sounding an alarm, or other data points associated with the time frame, such as a door that was left ajar.

The security team is given an alert of a door in a building that did not close. They use the streaming data to determine the door was left ajar at 3:17 AM. They can then use a Seeking query to seek multiple other data points (KCdata, Fdata, Sdata) in a 30-minute window (3:02 to 3:32) to determine who accessed the building, what doors or labs were accessed, what machines went offline or were directly accessed, and other data records, so that they can take the proper response to the developing situation.

In this scenario, you can use Oracle SQL Access to Kafka to create a single view that maps to all partitions of the Kafka topic. When Oracle SQL access to Kafka creates a view, it also creates a corresponding global temporary table. The application first specifies a starting and ending timestamp and then calls Oracle SQL access to Kafka to load the global temporary table with the rows in the specified window of time. You can leverage standard Oracle Database SQL transaction processing to parse large volumes of data to identify relevant device data around the anomalous event.

22.6 About the Kafka Database Administrator Role

To administer Oracle SQL access to Kafka, grant the Oracle AI Database role OSAK_ADMIN_ROLE and grant required administration privileges to the administrator role and the Kafka administration API package.

To provide role-based authentication to grant the Oracle SQL access for Kafka administration privileges to an administrative user, Oracle provides the OSAK_ADMIN_ROLE starting with Oracle AI Database 26ai. You can grant this role to an administrator user for Oracle SQL Access to Kafka. This role grants the system privileges required for users that you designate as Oracle SQL access for Kafka administrators to configure, register, and manage Kafka clusters. The system privileges granted by this role are as follows:

  • CREATE CREDENTIAL, to create a Kafka SASL-SSL (Simple Authentication and Security Layer) password or OSS (Oracle Streaming Service) authToken
  • CREATE ANY DIRECTORY, to create cluster access and cluster configuration directory
  • DROP ANY DIRECTORY, to drop cluster access and cluster configuration directory
  • READ privileges to sys.dbms_kafka_clusters
  • READ privileges to sys.dbms_kafka_applications
  • READ privileges to sys.dbms_kafka_messages

22.7 Enable Kafka Database Access to Users

The application user accounts are granted the DBMS_KAFKA database privileges required to access OSAK.

As a DBA, you create and grant users privileges to administer and use Oracle SQL access to Kafka. There are two categories of users:

  • Oracle SQL Access to Kafka administrators are privileged users. To simplify management of Oracle SQL access to Kafka, Oracle recommends that the Oracle DBA grant the OSAK_ADMIN_ROLE to designated Kafka administrators. This role is precreated in the database starting with Oracle AI Database 26ai.

    Administrators run the DBMS_KAFKA_ADM package methods to configure and manage the Kafka cluster information. Either users granted OSAK_ADMIN_ROLE or the Oracle DBA can create the operating system level Kafka Configuration Directory, and populate that directory with configuration files. Oracle SQL Access to Kafka administrators create the Oracle directory objects for Kafka cluster access directories.

  • Application users of Kafka topic data are granted the READ privileges required to access to the DBMS_KAFKA packages, so that they can access and use data accessed from Kafka cluster topics.

Example 22-1 Grant OSAK_ADMIN_ROLE to Kafka Administrator Users

In this example, the OSAK_ADMIN_ROLE is granted to user kafka-admin:

GRANT OSAK_ADMIN_ROLE
   TO kafka-admin; 

Example 22-2 Grant User Access to Kafka Users

To enable applications to use Oracle SQL access to Kafka, you grant DBMS_KAFKA access. These application users must already have the following privileges on the source Kafka cluster and target Oracle AI Database:

  • CREATE SESSION
  • CREATE TABLE
  • CREATE VIEW
  • Available quota on the tablespace where they access Kafka data
  • Read access on the cluster access directory of a registered Kafka cluster

22.8 Data Formats Supported with Oracle SQL Access to Kafka

Oracle SQL access to Kafka supports Kafka records represented in three formats: delimited text data (for example, csv), JSON, and Avro.

Kafka is without schemas, and format-neutral. Application data is stored as opaque byte arrays in the key and value field of a Kafka record. Because the Kafka key is used mainly for hashing data into Kafka partitions, only the value field of a Kafka record is retrieved and rendered as an Oracle row. In this context, application data is stored as opaque (uninterpreted) byte arrays in the key and value fields of a Kafka message. The application is responsible for serialization and deserialization of the data and for supplying a schema that defines the structure of the data format. In Oracle SQL Access for Kafka, the data format and schema are specified in the options argument to the DBMS_KAFKA.CREATE_[LOAD|STREAMING|SEEKABLE]_APP() procedures.

Note:

Regardless of the format type, the tables and views created contain three additional columns: KAFKA_PARTITION, KAFKA_OFFSET, and KAFKA_EPOCH_TIMESTAMP.

22.8.1 JSON Format and Oracle SQL Access to Kafka

For JSON, Oracle SQL access to Kafka determines the columns for the table or view.

The following is an example of using options to display data for a JSON streaming application:

DECLARE
 v_options VARCHAR2;
BEGIN
 v_options := '{"fmt" : "JSON"}';
 SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
                'ALPHA1',
                'MYAPP',
                'ExampleTopic',
                v_options);
END;
/

With JavaScript Object Notation (JSON) data, Oracle SQL Access to Kafka creates views and global temporary tables in the user schema over Kafka data. These views are prefixed by ORA$DKV_ The temporary tables are prefixed by ORA$DKVGTT_. The package DBMS_KAFKA.CREATE_xxx_APP uses a fixed schema to return JSON data from a Kafka record.

For example:

SQL> describe ORA$DKVGTT_ALPHA1_MYAPP_0; 
 Name                                      Null?    Type 
 ----------------------------------------- -------- ---------------------------- 
 KAFKA_PARTITION                                    NUMBER(38) 
 KAFKA_OFFSET                                       NUMBER(38) 
 KAFKA_EPOCH_TIMESTAMP                              NUMBER(38) 
 VALUE                                              VARCHAR2(4000) 

With the VARCHAR2 type, the length of the VALUE column is restricted by the maximum varchar2 length of your database. Note that the VALUE column has the option to be of type CLOB.

The KAFKA_ columns identify the partition id, the offset, and the timestamp of the Kafka record. (The underlying timestamp representation is an integer representing the number of milliseconds since Unix epoch.)

The data in the value portion of the Kafka record is returned as text to the VALUE column. The character encoding of the external text is fixed as AL32UTF8. Oracle SQL access to Kafka logic does not check for valid JSON syntax in the VALUE columns. However, faulty JSON is discovered when JSON operators in a SQL query attempt to parse the VALUE data.

22.8.2 Delimited Text Format and Oracle SQL Access to Kafka

For delimited text formats, Oracle SQL access to Kafka creates views and temporary tables in the user schema with Kafka data.

With delimited data, such as CSV or comma-delimited data, Oracle SQL Access to Kafka creates views and global temporary tables in the user schema over Kafka data. These views are prefixed by ORA$DKV_. The temporary tables are prefixed by ORA$DKVGTT_. With DSV format, the data columns are based on the reference table passed in the options plus the three metadata columns.

The temporary tables and views created with Oracle SQL access to Kafka delimited text format data have columns that reflect the shape of the delimited text data in the value field of a Kafka record. Oracle SQL access to Kafka converts text data into the native Oracle data types expressed in the table and view definition. The character encoding of the external text is fixed as AL32UTF8.

When a Kafka record is retrieved, a canonical layout is created, starting with the Kafka partition identifier (INTEGER), Kafka record offset (INTEGER), and Kafka record timestamp (INTEGER), followed by delimited text data in the Kafka value. In other words, the Kafka data is flattened out and streamed as rows of pure delimited text fields, using the order of the view schema definition.

The following Oracle data types are supported:

  • INTEGER, INT, NUMBER
  • CHAR, VARCHAR2
  • NCHAR, NVARCHAR2
  • CLOB, NCLOB, BLOB
  • FLOAT, BINARY_FLOAT, BINARY_DOUBLE
  • TIMESTAMP, DATE
  • TIMESTAMP WITH TIME ZONE, TIMESTAMP WITH LOCAL TIME ZONE
  • INTERVAL
  • RAW
  • BOOLEAN

To simplify the specification of delimited text at application creation time, you provide the name of a table that describes the columns of the user data in the order that they are physically ordered in the Kafka record value field. Oracle SQL Access to Kafka uses that name in views and temporary tables.

The following example shows the shape of the delimited text data table (a reference table, or reftable) provided when you create an Oracle SQL Access to Kafka application. Again, the Kafka value field reflects the identical physical order and the desired data type conversion from the delimited text.

You should preserve reftables after they are used for a CREATE_xxx_APP call to create Oracle SQL Access to Kafka views and temporary tables reflecting the shape. You will require the reftable to recreate views.

SQL> describe FIVDTI_SHAPE; 
 Name                                      Null?    Type 
 ----------------------------------------- -------- ---------------------------- 
 F1                                                 NUMBER 
 I2                                                 NUMBER 
 V3                                                 VARCHAR2(50) 
 D4                                                 DATE 
 T5                                                 TIMESTAMP(6) 
 V6                                                 VARCHAR2(200) 
 I7                                                 NUMBER 

The reference table describes the fields in the Kafka record value only. For example, the reftable FIVDTI_SHAPE could support Kafka records where F1, I2, V3, D4, T5, V6, I7 are fields in the Kafka record value. The fields in the Kafka record value must be separated by delimiters (for example, comma delimiters).

Note:

The reference table cannot include invisible (hidden) columns. The ordering of the columns must match the order of the data values from the Kafka record. An invisible column has a COLUMN_ID of NULL, so its position in the column list cannot be determined.

Oracle SQL Access to Kafka temporary tables created for data described by the FIVDTI_SHAPE table will have the following schema:

SQL> describe ORA$DKVGTT_ALPHA1_MYAPP__0; 
 Name                                      Null?    Type 
 ----------------------------------------- -------- ---------------------------- 
 KAFKA_PARTITION                                    NUMBER(38) 
 KAFKA_OFFSET                                       NUMBER(38) 
 KAFKA_EPOCH_TIMESTAMP                              NUMBER(38) 
 F1                                                 NUMBER 
 I2                                                 NUMBER 
 V3                                                 VARCHAR2(50) 
 D4                                                 DATE 
 T5                                                 TIMESTAMP(6) 
 V6                                                 VARCHAR2(200) 
 I7                                                 NUMBER 

22.8.3 Avro Formats and Oracle SQL Access to Kafka

For Avro formats, Oracle SQL access to Kafka uses the Avro schema to determine the data columns and the three metadata columns.

22.8.3.1 About Using Avro Format with Oracle SQL Access to Kafka

Learn how Oracle SQL access to Kafka makes Kafka data in the Avro format available for use in Oracle database tables and views.

To enable the use of the Apache Avro formatted data by applications in Oracle database table and views, Oracle SQL Access for Kafka converts the data format based on the Avro schema specified in the options argument to the DBMS_KAFKA.CREATE_[LOAD|STREAMING|SEEKABLE]_APP() procedures. This means that an Oracle SQL access to Kafka application can only support a single Avro schema for a Kafka topic. It is not supported to have messages with different Avro schemas in the topic stream. If the schema evolves, then you must create a new Oracle SQL access to Kafka application.

22.8.3.2 Primitive Avro Types Supported with Oracle SQL Access to Kafka

To use Apache Avro Schema primitive type names in the database, Oracle converts these types to SQL data types.

Table 22-1 Avro Primitive types and Oracle Type Conversions for Oracle SQL Access to Kafka

Type Description Avro Primitive Type Oracle Type

null/no value

null

VARCHAR2(1)

(not applicable)

boolean

NUMBER(1)

32-bit signed integer

int

INTEGER

64-bit signed integer

long

INTEGER

IEEE 32-bit floating point

float

BINARY_FLOAT

IEEE 64-bit floating point

double

BINARY_DOUBLE

byte array/binary

bytes

BLOB

UTF-8 encoded character string

string

VARCHAR2

The following example Avro schema defines a record that uses all Avro primitive types:

{
  "type": "record",
  "name": "primitives",
  "fields": [
    { "name": "f_null", "type": "null" },
    { "name": "f_boolean", "type": "boolean" },
    { "name": "f_int", "type": "int" },
    { "name": "f_long", "type": "long" },
    { "name": "f_float", "type": "float" },
    { "name": "f_double", "type": "double" },
    { "name": "f_bytes", "type": "bytes" },
    { "name": "f_string", "type": "string" }
  ]
}

If you created Oracle SQL access to Kafka temporary tables for Avro data by using this example Avro schema, then the temporary tables have the following schema:

describe ORA$DKVGTT_ALPHA1_MYAPP__0; 
 Name                                      Null?    Type 
 ----------------------------------------- -------- ---------------------------- 
 KAFKA_PARTITION                                    NUMBER(38) 
 KAFKA_OFFSET                                       NUMBER(38) 
 KAFKA_EPOCH_TIMESTAMP                              NUMBER(38) 
 F_NULL                                             CHAR(1)
 F_BOOLEAN                                          NUMBER(1) 
 F_INT                                              NUMBER(38)
 F_LONG                                             NUMBER(38) 
 F_FLOAT                                            BINARY_FLOAT
 F_DOUBLE                                           BINARY_DOUBLE 
 F_BYTES                                            BLOB 
 F_STRING                                           VARCHAR2(4000)

The VARCHAR2 type length (in this example, for the F_STRING column) is determined by the maximum varchar2 length of your database.

22.8.3.3 Complex Avro Types Supported with Oracle SQL Access to Kafka

To use Apache Avro Schema complex type names in the database, Oracle converts these types to supported SQL data types.

Description

The Apache Avro complex data types take specified attributes. To use the Avro complex types, Oracle SQL access to Kafka converts them to Oracle types, as specified in the following table.

Table 22-2 Avro Complex types and Oracle Type Conversions for Oracle SQL Access to Kafka

Avro Complex Type Oracle Type Type Description

fixed

BLOB

A fixed type is used to declare a fixed-length field that can be used for storing binary data. It has two required attributes: the field's name, and the size in 1-byte quantities.

enum

VARCHAR2

An Avro enum field.

An avro enum is an enumerated type. They consist of JSON strings with the type name enum, taking the name of the enum, and can take additional optional attributes.

record

VARCHAR2

Struct field.

The struct field corresponds to a field in the input Avro records. A record represents an encapsulation of attributes that, all combined, describe a single thing.

map

VARCHAR2

A map is an associative array, or dictionary, that organizes data as key-value pairs. The key for an Avro map must be a string. Avro maps support only one attribute: values. This attribute is required and it defines the type for the value portion of the map.

Values can be of any type.

array

VARCHAR2

An array of any type

The array type defines an array field. It only supports the items attribute, which is required. The items attribute identifies the type of the items in the array.

Note:

The Avro complex types record, map, and array are converted to a JSON format string before conversion to a VARCHAR2 type.

The following example Avro schema defines a record that uses all Avro complex types:

{
 "type" : "record",
 "name" : "complex",
 "fields" : [
  { "name" : "f_fixed", 
    "type" : { "type" : "fixed", "name" : "ten", "size" : 10} 
  },
  { "name" : "f_enum",
    "type" : { "type" : "enum", "name" : "colors",
               "symbols" : [ "red", "green", "blue" ]   }  
  },
  { "name" : "f_record", 
    "type" : {"type" : "record", "name" : "person",
              "fields" : [ { "name" : "first_name", "type" : "string" },
                           { "name" : "last_name",  "type" : "string"} ] }
  }, 
  { "name" : "f_map",
    "type" : { "type" : "map", "values" : "int" }
  },
  { "name" : "f_array",
    "type" : {"type" : "array", "items" : "string"  }
  }]
}

If you created Oracle SQL access to Kafka temporary tables for Avro data by using this example Avro schema, then the temporary tables have the following schema:

describe ORA$DKVGTT_ALPHA1_MYAPP__0; 
 Name                                      Null?    Type 
 ----------------------------------------- -------- ---------------------------- 
 KAFKA_PARTITION                                    NUMBER(38) 
 KAFKA_OFFSET                                       NUMBER(38) 
 KAFKA_EPOCH_TIMESTAMP                              NUMBER(38) 
 F_FIXED                                            BLOB
 F_ENUM                                             VARCHAR2(4000)
 F_RECORD                                           VARCHAR2(4000)
 F_MAP                                              VARCHAR2(4000) 
 F_ARRAY                                            VARCHAR2(4000)

The VARCHAR2 type length (in this example, for the F_ENUM, F_RECORD, F_MAP and F_ARRAY columns) is determined by the maximum varchar2 length of your database.

22.8.3.4 Avro Logical Types Supported with Oracle SQL Access to Kafka

To use Apache Avro Schema logical type names in the database, Oracle converts these types to supported SQL data types.

Description

An Avro logical type is an Avro primitive or complex type with extra attributes to represent a derived type. Logical types are converted to Oracle types as specified in the following table.

Table 22-3 Avro Complex types and Oracle Type Conversions for Oracle SQL Access to Kafka

Type Description Avro Logical Type Oracle Type

decimal: arbitrary-precision signed decimal number of the form unscaled × 10-scale

decimal (bytes, fixed)

NUMBER

UUIDs (Universally Unique Identifiers), also known as GUIDS (Globally Unique Identifiers):

These IDs are randomly generated, in conformity with RFC-4122.

UUID (string)

Not supported.

date

A date within the calendar, with no reference to a particular time zone or time of day

Number of days from the Unix epoch, 1 January 1970

date (int)

DATE

time (millis):

A time of day, with no reference to a particular calendar, time zone or date, represented as number of milliseconds after midnight: 00:00:00.000

time-millis (int)

TIMESTAMP

time (micros):

A time of day, with no reference to a particular calendar, time zone or date number of microseconds after midnight: 00:00:00.000000

time-micros (long)

TIMESTAMP

timestamp (millis) UTC:

An instant on the global timeline, independent of a particular time zone or calendar number of milliseconds from the Unix epoch, 1 January 1970: 00:00:00.000 UTC

timestampmillis (long)

TIMESTAMP

timestamp (micros) UTC:

An instant on the global timeline, independent of a particular time zone or calendar number of microseconds from the Unix epoch, 1 January 1970: 00:00:00.000000 UTC

timestampmicros (long)

TIMESTAMP

duration

An amount of time defined by a number of months, days and milliseconds.

fixed (size:12)

Not supported.

Note:

Decimal types, which are used with the logical types time-millis, time-macros, timestampmillis and timestampmicros, are internally stored as byte arrays (fixed or not). Depending on the Avro writer, some of these arrays store the string representation of the decimal, while others store the unscaled value. To avoid presenting ambiguous data, Oracle recommends that you use the option avrodecimaltype to declare explicitly which representation is used. If this option is not explicitly specified, then the default option for Oracle SQL access to Kafka is that the unscaled representation of the data is stored in the decimal columns of the file.

The following example Avro schema defines a record that uses all Avro logical types:

{
  "type" : "record",
  "name" : "logical",
  "fields" : [ {
  "name" : "f_decimal",
  "type" : {
      "type" : "bytes",
      "logicalType" : "decimal",
      "precision" : 4,
      "scale" : 2
    }
  }, {
  "name" : "f_date",
  "type" : {
      "type" : "int",
      "logicalType" : "date"
    }
  }, {
  "name" : "f_time_millis",
  "type" : {
      "type" : "int",
      "logicalType" : "time-millis"
    }
  }, {
  "name" : "f_time_micros",
  "type" : {
      "type" : "long",
      "logicalType" : "time-micros"
    }
  }, {
  "name" : "f_timestamp_millis",
  "type" : {
      "type" : "long",
      "logicalType" : "timestamp-millis"
    }
  }, {
  "name" : "f_timestamp_micros",
  "type" : {
      "type" : "long",
      "logicalType" : "timestamp-micros"
    }
  } ]
}

If you created Oracle SQL access to Kafka temporary tables for Avro data by using this example Avro schema, then the temporary tables have the following schema:

describe ORA$DKVGTT_ALPHA1_MYAPP__0; 
 Name                                      Null?    Type 
 ----------------------------------------- -------- ---------------------------- 
 KAFKA_PARTITION                                    NUMBER(38) 
 KAFKA_OFFSET                                       NUMBER(38) 
 KAFKA_EPOCH_TIMESTAMP                              NUMBER(38) 
 F_DECIMAL                                          NUMBER
 F_DATE                                             DATE
 F_TIME_MILLIS                                      TIMESTAMP(3)
 F_TIME_MICROS                                      TIMESTAMP(6)
 F_TIMESTAMP_MILLIS                                 TIMESTAMP(3)
 F_TIMESTAMP_MICROS                                 TIMESTAMP(6)

22.9 Configuring Access to a Kafka Cluster

You can configure access to secured Kafka clusters, or non-secured Kafka clusters.

22.9.1 Create a Kafka Access Directory

The Oracle SQL access to Kafka administrator must create a Kafka Access Directory for each Kafka cluster to control database user access to the cluster.

The Kafka Access Directory is an Oracle directory object that is used to decide which Oracle users are allowed to access that Oracle SQL Access to Kafka cluster. Each Kafka cluster requires its own dedicated Kafka Access Directory. As the administrator for Oracle SQL access to Kafka, you manage access by first creating the Kafka Access Directory object for each cluster. You then grant READ privileges on this directory to database users who need to access the Kafka cluster. The Kafka Access Directory must be created before you call the DBMS_KAFKA_ADM.REGISTER_CLUSTER() procedure.

Example 22-3 Creating a Kafka Access Directory and Granting READ Access

First create a Kafka Configuration Directory object. In this example, the object is osak_kafkaclus1_access:

CREATE DIRECTORY osak_kafkaclus1_access AS '';

After the Kafka Cluster is successfully registered, the Oracle SQL access to Kafka administrator grants READ access on this directory to users.

In this example, the user example_user is granted access to osak_kafkaclus1_access:

 GRANT READ ON DIRECTORY osak_kafkaclus1_access TO example_user;

22.9.2 The Kafka Configuration File (osakafka.properties)

To access Kafka clusters, you must create a configuration file (osak config dir) that contains the information required to access the Kafka cluster.

22.9.2.1 About the Kafka Configuration File

The osakafka.properties file contains configuration information required to access secured Kafka Clusters, as well as additional information about Oracle SQL access to Kafka.

The Kafka Configuration File, osakafka.properties, is created in the Kafka Configuration Directory. The Oracle SQL access to Kafka administrator (granted OSAK_ADMIN_ROLE) creates the osakafka.properties file. This file is used by the DBMS_KAFKA_ADM package to make connections to an Apache Kafka cluster.

The Oracle SQL access to Kafka administrator creates a Kafka Configuration Directory in which to store the configuration files for each Kafka Cluster. Each Kafka Configuration Directory has its own Kafka Configuration File. To manage access to Apache Kafka clusters, only an Oracle SQL access to Kafka administrator has read and write access to Kafka Configuration Directories for Kafka clusters. No other users are granted any privileges on Kafka Configuration Directories or Kafka Configuration Files.

Functions of the Kafka Configuration File

The osakafka.properties file is similar to the consumer properties file used by a Kafka Consumer using librdkafka. Secure Apache Kafka clusters require credential files. These additional files again are like the ones required by a Kafka Consumer using librdkafka. The osakafka.properties file has the following properties:

  • It is created and managed by the Oracle SQL access to Kafka administrator as part of the setup and configuration needed to access a Kafka cluster.
  • It consists of a text file of key-value pairs. Each line has the format key=value describing the key and the value, and is terminated with a new line. The new line character cannot be part of the key or value.
  • It contains Oracle SQL access for Kafka parameters, which are identified with the osak prefix.
  • It contains debugging properties for Oracle SQL access to Kafka.
  • It is used by the DBMS_KAFKA_ADM package to make connections to a Kafka cluster using librdkafka APIs.
  • It is required for secure Kafka clusters, to store security configuration properties required to connect to Kafka clusters using librdkaka interfaces, Oracle SQL access to Kafka tuning properties, which are identified with the osak prefix, and debugging properties. For secure cluster access, the key-value pairs contain include cluster configuration files certificates and client public and private keys.
  • For SSL connections, the Kafka configuration file no longer stores separate PEM file references for ssl.ca.location, ssl.key.location, and ssl.certificate.location. Instead, the file stores the Oracle Wallet location through ssl.wallet.location. The wallet password is stored in a database credential, and the credential name is provided as a parameter so the password can be retrieved and passed to the connection layer at runtime.
  • It is optional for non-secure Kafka clusters, and can be used to specify the tuning and debugging properties for cluster connections.

The osakafka.properties file is stored in the Kafka Configuration Directory, in the path ORACLE_base/osak/clusters/cluster-name/config, where Oracle_base is the Oracle base directory of the target Oracle Database, and cluster-name is the name of the Kafka Cluster whose access information is stored in the configuration file.

Guidelines for Creating Kafka Configuration Files

As part of the setup and configuration required to access an Apache Kafka cluster, you are required to create an Oracle SQL access for Kafka administrator. This administrator owns the Kafka Configuration File. The information in this file is used to set session context in C interfaces, which make connections to a Kafka cluster using librdkafka APIs.

The SYS.DBMS_KAFKA_SEC_ALLOWED_PROPERTIES system table contains a pre-populated list of supported consumer configuration properties, including security properties. For extensibility, SYS can add more properties to this table with certain restrictions.

The DBMS_KAFKA_ADM.REGISTER_CLUSTER() procedure reads only those properties from the osakafka.properties file that are also listed in the SYS.DBMS_KAFKA_SEC_ALLOWED_PROPERTIES system table. Any extra properties are ignored.

22.9.2.2 Oracle SQL Access for Kafka Configuration File Properties

To create an osakafka.properties file for the Kafka Configuration Directory, review and specify the properties as described here.

osakafka.properties File Processing

The properties specified in the osakafka.properties must be those listed in the table that follows. If you provide any other key-value pairs, then these values are ignored.

Note the following:

  • Property names with the osak prefix are internal tuning properties or debugging properties.
  • Property names without the osak prefix are Kafka consumer properties, which are used by librdkafka. For a complete list of properties, refer to the documentation for the Apache Kafka C/C++ client library (librdkafka) documentation.
Property Allowed Values Description

debug

all

Used to debug connectivity issues.

max.partition.fetch.bytes

1024 * 1024

For librdkafkaSDK clients, Oracle Streaming Service (OSS) recommends that you allocate 1 MB for each partition.

security.protocol

PLAINTEXT

SSL

SASL_PLAIN_TEXT

SASL_SSL

Security Protocol used to communicate with Kafka brokers

sasl.mechanisms

GSSAPI

PLAIN

SCRAM-SHA-256

SCRAM-SHA-512

SASL mechanism to use for authentication

NOTE: Despite the plural name, only one mechanism must be configured.

This property is allowed to provide backward compatibility for older Kafka clusters. Where possible, Oracle recommends that you use the property sasl.mechanism instead.

sasl.mechanism

GSSAPI

PLAIN

SCRAM-SHA-256

SCRAM-SHA-512

Simple Authentication and Security Layer (SASL) mechanism to use for authentication

sasl.username

Username

The user name required for authenticating to the Kafka cluster.

The corresponding password value for this user name must be stored as a database credential, using the DBMS_CREDENTIAL.CREATE_CREDENTIAL() procedure

sasl.kerberos.principal

Client Kafka Kerberos principal name

The Client Kerberos principal name

sasl.kerberos.ccname

Kerberos ticket cache file name

The Kerberos ticket cache file

Example: krb5ccname_osak

This file must exist in the cluster configuration directory.

sasl.kerberos.config

Kerberos Configuration file name

The Kerberos configuration of the Kafka Cluster. Example krb5.conf

This file must exist in the cluster configuration directory

sasl.kerberos.service.name

Kerberos principal name (Kafka primary name)

The primary name of the Kerberos principal, which is the name that appears before the slash (/). For example, Kafka is the primary name of the Kerberos principal kafka/broker1.example.com@EXAMPLE.

ssl.endpoint.identification.algorithm

Valid Values:

https

none

Endpoint identification algorithm to validate the Kafka broker hostname, using a Kafka broker certificate. Values are as follows:

https: Server (Kafka broker) hostname verification, as specified in RFC2818.

none: No endpoint verification.

Default Value: none

ssl.wallet.location

Path to the Oracle Wallet directory or wallet file used for SSL authentication.

The wallet must contain the trusted certificate and, if required, the client certificate and private key. The wallet path should be provided as an absolute path or as a file:// URL, depending on the deployment environment.

walletcredname

The database credential name that stores the Oracle Wallet password.

The wallet password is stored in a database credential, and the credential name is provided as a parameter so the password can be retrieved and passed to the connection layer at runtime.

Example

The following is an example osakafka.properties file that specifies security protocol SSL, and provides authentication by using a Certification Authority (CA) certificate on the client:

security.protocol=SSL
ssl.wallet.location=/u01/app/oracle/osak/kafkaclus1/config/wallet/ewallet.p12
ssl.endpoint.identification.algorithm=https

When Oracle Wallet is used, the CA certificate, client certificate, and private key are imported into the wallet before the cluster is registered.

22.9.2.3 Creating the Kafka Configuration Directory

To access secure Kafka clusters, you must create a Kafka Configuration Directory for each Kafka cluster.

The Oracle SQL access to Kafka administrator creates the Kafka Configuration Directory, which is an operating system directory in the Oracle base directory. The path is Oracle-base/osak/cluster_name/config, where Oracle-base is the Oracle base directory, and cluster_name is the value of the cluster name parameter passed to the SYS.DBMS_KAFKA_ADM.REGISTER_CLUSTER call. Each Kafka cluster requires its own dedicated Kafka Configuration Directory.

This directory must contain all the configuration files and certificates needed to access the Kafka Cluster:

  • osakafka.properties file.
  • Security files listed in the osakafka.properties file

In the following example, the Oracle base directory is /u01/app/oracle, and the cluster name is kafkaclus1:


 mkdir u01/app/oracle/osak/kafkaclus1/config;
 CREATE DIRECTORY osak_kafkaclus1_config AS 
 'u01/app/oracle/osak/kafkaclus1/config' ;

22.9.3 Security Configuration Access for the Kafka Configuration Directory

Configure OSAK to access security-enabled Apache clusters.

To configure access to a secure Kafka Cluster, the administrator must add several configuration files to the Kafka Configuration Directory, and prepare a DBMS_CREDENTIAL object. The specific configuration depends on which security protocol is used to configure security on the Kafka cluster.

For secured Kafka clusters, Oracle SQL Access to Kafka uses Oracle Wallet-based configuration instead of separate PEM files for CA certificates, client certificates, and private keys. The cluster configuration directory contains osakafka.properties and the wallet files needed for the connection. Legacy PEM-based configurations are no longer supported.

Caution:

Passwords must not be embedded in configuration files. Wallet passwords are stored in database credentials and resolved during cluster registration.

Note:

The Kerberos keytab file is not required, because Kerberos ticket management is handled outside of Oracle SQL access to Kafka.
22.9.3.1 SASL_SSL/GSSAPI

OSAK configuration settings for SASL_SSL using GSSAPI authentication protocol.

Description

The SASL_SSL/GSSAPI protocol specifies Kerberos authentication with encryption. The Kerberos tickets must be managed externally (outside Oracle SQL access To Kafka).

DBMS_CREDENTIAL

Not required, because Kerberos tickets are managed externally.

Required Files in the Kafka Configuration Directory

  1. The certificate authority (CA) file
  2. The osakafka.properties file.

In the following example, the property security.protocol specifies SASL_SSL. The property sasl.mechanism specifies GSSAPI. SSL certificates are managed using an Oracle Wallet. The wallet location is specified using ssl.wallet.location.

security.protocol=SASL_SSL
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.kerberos.config=krb5.conf 
sasl.kerberos.ccname=krb5ccname_osak 
sasl.kerberos.principal=kafkaclient/<FQDN-hostname>@<Realm>
ssl.wallet.location=/path/to/wallet/ewallet.p12
ssl.endpoint.identification.algorithm=https
22.9.3.2 SASL_PLAINTEXT/GSSAPI

OSAK configuration settings for SASL_PLAINTEXT using GSSAPI authentication protocol.

Description

The SASL_PLAINTEXT/GSSAPI protocol specifies Kerberos authentication without SSL encryption. Kerberos tickets must be managed externally (outside Oracle SQL Access to Kafka).

Because this protocol does not use SSL, Oracle Wallet is not required for this configuration.

Caution:

With SASL_PLAINTEXT, data is sent in clear text over the network This configuration is unsecure and not recommended for production systems.

DBMS_CREDENTIAL

Not required, because Kerberos tickets are managed externally.

Required Files in the Kafka Configuration Directory

  1. The osakafka.properties file.
  2. Kerberos configuration files, including krbS.conf and the Kerberos ticket cache (for example, krb5ccname_osak)

In the following example, the property security.protocol specifies SASL_PLAINTEXT, and the property sasl.mechanism specifies GSSAPI.

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.kerberos.principal=kafkaclient/FQDN-hostname@Realm
sasl.kerberos.config=krb5.conf 
sasl.kerberos.ccname=krb5ccname_osak
22.9.3.3 SASL_PLAINTEXT/SCRAM-SHA-256

OSAK configuration settings for SASL_PLAINTEXT using SCRAM-SHA-256 authentication protocol.

Description

The SASL_PLAINTEXT/SCRAM-SHA-256 protocol specifies SASL SCRAM authentication without SSL encryption.

Because this protocol does not use SSL, Oracle Wallet is not required for this configuration. Oracle Wallet is only applicable for SSL-enabled protocols such as SSL and SASL_SSL.

Caution:

With SASL_PLAINTEXT, data is sent in clear text over the network This configuration is unsecure and not recommended for production systems.

DBMS_CREDENTIAL

Required, to store the password for the SASL user name.

Required Files in the Kafka Configuration Directory

  1. The osakafka.properties file.

Oracle Wallet is not used for SASL_PLAINTEXT configurations because SSL/TLS is not enabled.

In the following example, the property security.protocol specifies SASL_PLAINTEXT, and the property sasl.mechanism specifies SCRAM-SHA-256.

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.username=testuser 

Example 22-4 Credential Setup

Create a database credential to store the SASL password:

BEGIN
   DBMS_CREDENTIAL.CREATE_CREDENTIAL(
      credential_name => 'CRED_SASL_USER',
      username        => 'testuser',
      password        => 'user_password'
   );
END;
22.9.3.4 SASL_SSL/PLAIN

OSAK configuration settings for SASL_SSL using PLAIN authentication protocol.

Description

The SASL_SSL/PLAIN protocol specifies SASL authentication using the PLAIN mechanism over SSL encryption. This configuration is commonly used for Oracle Streaming Service (OSS) Kafka clusters and other secured Kafka environments. Starting with Oracle AI Database 26ai (23.26.2), SSL configuration requires the use of Oracle Wallet. All trusted certificates must be stored in an Oracle Wallet created using the orapki tool. Legacy PEM-based SSL configuration using ssl.ca.location is no longer supported.

DBMS_CREDENTIAL

Required to store the sasl.password and the Oracle Wallet password.

Required Files in the Kafka Configuration Directory

  1. The osakafka.properties file.
  2. The Oracle Wallet files (for example, ewallet.p12) containing trusted CA certificates.

Example 22-5 OSS Cluster osakafka.properties File

In the following example, the property security.protocol specifies SASL_SSL, and the property sasl.mechanism specifies PLAIN. SSL certificates are managed using an Oracle Wallet:

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.username=<tenancyName>/<username>/<streamPoolID>
#-- limit request size to 1 MB per partition
max.partition.fetch.bytes=1048576

ssl.wallet.location=ewallet.p12
ssl.endpoint.identification.algorithm=https

Example 22-6 Non-OSS Cluster osakafka.properties File

In the following example, the property security.protocol specifies SASL_SSL, and the property sasl.mechanism specifies PLAIN. The Oracle Wallet replaces the legacy CA file configuration:

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.username=kafkauser

ssl.wallet.location=ewallet.p12
ssl.endpoint.identification.algorithm=https
22.9.3.5 SSL with Client Authentication

OSAK configuration settings for SSL authentication protocol.

Description

The Secure Socket Layer (SSL) protocol specifies SSL with client authorization.

Note:

Starting with Oracle AI Database 26ai (23.26.2), SSL configuration for Oracle SQL Access to Kafka requires the use of Oracle Wallet. All certificates and keys must be stored in an Oracle Wallet created using the orapki tool. Legacy PEM-based configurations are no longer supported.

DBMS_CREDENTIAL

Required, to store the password for the Oracle Wallet.

Required Files in the Kafka Configuration Directory

  1. The osakafka.properties file.
  2. The Oracle Wallet files (for example, ewallet.p12). These include the trusted certificate and client credentials. For example:
    • Trusted CA certificates
    • Client certificate (if required)
    • Client private key (if required)
  3. A database credential that stores the wallet password.

Example 22-7 SSL osakafka.properties File

In the following example, the property security.protocol specifies SSL, and the property ssl.wallet.location specifies the Oracle Wallet location.

security.protocol=SSL
ssl.wallet.location=ewallet.p12
ssl.endpoint.identification.algorithm=https

Example 22-8 Credential Setup

Create a database credential to store the Oracle Wallet password:

BEGIN
   DBMS_CREDENTIAL.CREATE_CREDENTIAL(
      credential_name => 'CRED_SSL_WALLET',
      username        => 'wallet_user',
      password        => 'wallet_password'
   );
END;
/

Example 22-9 Cluster Registration

Specify the wallet credential name during cluster registration:

SELECT DBMS_KAFKA_ADM.REGISTER_CLUSTER(
   cluster_name        => 'KAFKACLUS1',
   bootstrap_servers   => 'mykafkabootstrap-host:9092',
   kafka_provider      => DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE,
   cluster_access_dir  => 'OSAK_KAFKACLUS1_ACCESS',
   credential_name     => NULL,
   cluster_config_dir  => 'OSAK_KAFKACLUS1_CONFIG',
   cluster_description => 'My test cluster kafkaclus1',
   options             => '{"walletcredname":"CRED_SSL_WALLET"}'
)
FROM dual;
22.9.3.6 SSL without Client Authentication

OSAK configuration settings for SSLwithout client authentication.

Description

The Secure Socket Layer (SSL) protocol specifies SSL without client authorization.

Starting with Oracle AI Database 26ai (23.26.2), SSL configuration uses Oracle Wallet to store trusted certificates. Even when client authentication is not required, the trusted CA certificate must be stored in the wallet. The legacy PEM-based SSL properties are desupported. Replace PEM-based SSL properties with wallet-based configuration.

DBMS_CREDENTIAL

Required, to store the password for the Oracle Wallet.

Required Files in the Kafka Configuration Directory

  1. The osakafka.properties file.
  2. The Oracle Wallet files (for example, ewallet.p12) containing trusted certificates.

Example 22-10 SSL osakafka.properties File

In the following example, the property security.protocol specifies SSL, and the property ssl.wallet.location specifies the Oracle Wallet location.

security.protocol=SSL
ssl.wallet.location=ewallet.p12
ssl.endpoint.identification.algorithm=https

Credential Setup

BEGIN
   DBMS_CREDENTIAL.CREATE_CREDENTIAL(
      credential_name => 'CRED_SSL_WALLET',
      username        => 'wallet_user',
      password        => 'wallet_password'
   );
END;
/

Cluster Registration

Specify the wallet credential name during cluster registration:

SELECT DBMS_KAFKA_ADM.REGISTER_CLUSTER(
   cluster_name        => 'KAFKACLUS2',
   bootstrap_servers   => 'mykafkabootstrap-host:9092',
   kafka_provider      => DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE,
   cluster_access_dir  => 'OSAK_KAFKACLUS2_ACCESS',
   credential_name     => NULL,
   cluster_config_dir  => 'OSAK_KAFKACLUS2_CONFIG',
   cluster_description => 'My test cluster kafkaclus2',
   options             => '{"walletcredname":"CRED_SSL_WALLET"}'
)
FROM dual;

22.10 Creating Oracle SQL Access to Kafka Applications

Oracle SQL can access Kafka data using loading, streaming, or seekable applications created with DBMS_KAFKA.

Oracle SQL access to Kafka provides the following application modes that you can use to attach to the Apache Kafka cluster:

  • Loading: Use to load data from a Kafka topic into an Oracle Database table.
  • Streaming: Use to read sequentially through a Kafka topic.
  • Seekable: Use to gain random access to a Kafka topic between starting and ending timestamps that you designate.

Choose the type of application that you want to create, depending on the kind of access to Kafka topics that you require:

  • DBMS_KAFKA.CREATE_LOAD_APP creates an application that can be used in Loading mode.
  • DBMS_KAFKA.CREATE_STREAMING_APP creates an application that can be used in Streaming mode.
  • DBMS_KAFKA.CREATE_SEEKABLE_APP creates an application that can used in Seekable mode.

Example 22-11 Creating a Streaming Application with Four Views for a Kafka Topic

In the following example, a streaming application is created to use a set of four views with temporary tables for a Kafka topic that has four (4) partitions. Each view creates a temporary table. Each view (and temporary table) is associated with one partition of the Kafka topic:

DECLARE
  v_options VARCHAR2(4000);
BEGIN
  v_options := '{"fmt":"DSV","reftable":"user_shape_table_name"}';

  SYS.DBMS_KAFKA.CREATE_STREAMING_APP(
    'ExampleCluster',
    'ExampleApp',
    'ExampleTopic',
    v_options,
    4
  );
END;
/

Example 22-12 Creating a Streaming Application with One View for a Kafka Topic

In the following example, a streaming application is created to use one view (1) with a temporary table where the temporary tables for a Kafka topic has four partitions. The view (a temporary table) is associated with the entire Kafka topic:

DECLARE
  v_options VARCHAR2(4000);
BEGIN
  v_options := '{"fmt":"DSV","reftable":"user_shape_table_name"}';

  SYS.DBMS_KAFKA.CREATE_STREAMING_APP(
    'ExampleCluster',
    'ExampleApp',
    'ExampleTopic',
    v_options,
    1
  );
END;
/

22.11 Security for Kafka Cluster Connections

Oracle SQL Access to Kafka supports access to Kafka and Oracle Streaming Service (OSS), using various security mechanisms, such as SSL, SASL, and Kerberos.

Note:

The credentials used to access the Kafka cluster must have access to both the Kafka broker metadata, as well as any topics that will be part of any Oracle SQL access to Kafka application. If there are access control lists (ACLs) enabled for the credentials, then ensure that access is granted to both the brokers and to the Kafka topics. In a shared Oracle Real Application Clusters (Oracle RAC) environment, security credentials should be in a shared location, not local to a cluster member node.

Secure Kafka Clusters

To maintain securely encrypted data transmission between Oracle Database and clusters, Oracle SQL access to Kafka employs several security protocols. For access to secure Kafka clusters and Oracle Streaming Services (OSS) clusters, security configuration files are used. These operating system files must exist in the cluster configuration directory. The cluster configuration Oracle directory object is created to access the cluster configuration files. Only the osak_admin_role is granted READ access to this directory. The cluster configuration files are readable only by the osak_admin_role. The cluster configuration files include the osakafka.properties file. Keys and Certificates for SSL are stored in the Oracle keystore.

Note:

The secure access method is now Oracle Wallet-based SSL configuration. Secure Kafka clusters should use a wallet created with orapki, and the wallet should contain the CA certificate and client credentials required by the connection.

The wallet password must be stored as a database credential and referenced during cluster registration.

The cluster access Oracle directory object is used to control access to the Kafka cluster. This directory object does not contain any configuration files. Kafka sessions are exclusive to individual PDBs in the multitenant environment. Each PDB where you want to create an application to connect to a Kafka broker must create its own application.

No passwords must be embedded in files. Any embedded password properties in the osakafka.properties file will be ignored. All passwords must be stored as database credentials using the DBMS_CREDENTIAL package.

Kafka Clusters Using Kerberos Authentication

For Kafka clusters using Kerberos Authentication, the Kerberos ticket for the Kafka principal specified in the osakafka.properties file must be acquired on the database system, and renewed periodically outside of Oracle SQL access to Kafka.

The Kafka Configuration Directory object and the Kafka cluster access directory object and database credential name must be supplied as input parameters to the DBMS_KAFKA_ADM.REGISTER_CLUSTER() call.

The Oracle SQL Access to Kafka administrator (a user with the osak_admin_role, the OSAK_ADMIN) performs the cluster registration and administration tasks.

22.12 Configuring Access to Unsecured Kafka Clusters

To configure access to non-secure Kafka clusters, the OSAK administrator (Oracle Database user with osak_admin_role) must complete this procedure.

Access to non-secure Kafka clusters requires that you create a cluster access database directory object to control access to the Kafka cluster. The grants on this database directory are used to control which Oracle Database users can access the Kafka cluster. This database directory has an empty path: it does not a need a corresponding operating system directory, and it also does not contain any files. Oracle recommends that the Oracle Directory Object Name for a cluster access database directory object takes the form OSAK_CLUSTER_NAME_ACCESS, where CLUSTER_NAME is the name of the Kafka cluster.

Procedure:

  1. Create the cluster access database directory with an empty path. This directory is used to control which Oracle users can access the Kafka cluster.

    For example, create a cluster access database directory object called oaskaccess_kafkaclust1 with an empty path. This directory is used to control which Oracle users can access the Kafka cluster.

    SQL> CREATE DIRECTORY OSAK_KAFKACLUS2_ACCESS AS '';
  2. On the target Oracle Database server, create the cluster configuration operating system directory in the Oracle base path directory, using the path Oracle_base/osak/cluster_name/config where Oracle_base is the Oracle base directory, and cluster_name is the Kafka cluster name. For example:

    mkdir /u01/app/oracle/osak/kafkaclus2/config

    Log in to the database as SYSDBA, start SQL, and create the corresponding Oracle directory object. In this example, the Kafka cluster name is KAFKACLUS2:

    SQL> CREATE DIRECTORY OSAK_KAFKACLUS2_CONFIG AS  'u01/app/oracle/osak/kafkaclus2/config';
  3. Create an empty osakafka.properties file, or an osakafka.properties file with OSAK tuning or debugging properties.

  4. In SQL, register the Kafka cluster using DBMS_KAKFA_ADM.REGISTER_CLUSTER(). For example, using the server hostname mykafkabootstrap-host, port 9092, for Kafka cluster KAFKACLUS2:

    SELECT DBMS_KAFKA_ADM.REGISTER_CLUSTER(
             cluster_name         => 'KAFKACLUS2',
             bootstrap_servers    => 'Kafka-example-host:9092',
             kafka_provider       => DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE,
             cluster_access_dir   => 'OSAK_KAFKACLUS2_ACCESS',
             credential_name      => NULL,
             cluster_config_dir   => 'OSAK_KAFKACLUS2_CONFIG',
             cluster_description  => 'My test cluster kafkaclus2',
             options              => NULL
           )
    FROM dual;

    If configuration is successful, then the registration return is 0 (zero):

    SQL> DBMS_KAFKA_ADM_RE…..
                 0
    
  5. Grant read access to a Kafka user. In the following example, user app2-usr is granted access to the Kafka cluster named KAFKACLUS2:
    SQL> grant read on directory osak_kafkaclus2_access to app2-usr;

22.13 Configuring Access to Secure Kafka Clusters Using Oracle Wallet

Configure secure Kafka access by replacing PEM-based SSL with Oracle Wallet and registering the cluster using wallet credentials.

In previous releases, access to secure Kafka clusters required configuration files, such as osakafka.properties, and additional security files such as SSL/TLS PEM files and certificates. These files were stored in a cluster configuration database directory object. The configuration files and directory were accordingly protected by the operating system directory and file access privileges.

Starting with Oracle AI Database 23.26.2, Oracle is replacing the SSL/TLS PEM files and certificates procedure with Oracle Wallet configuration. Instead of multiple files, the cluster configuration database directory object points to a single wallet (ewallet.p12 in this example). Instead of file-based secrets, access is enabled with DBMS_CREDENTIAL.

With the Oracle Wallet method, Kafka access is controlled in three layers:

  • Access control: This is maintained by creating a DIRECTORY object for access and using GRANT READ to control user access.
  • Configuration: The configuration files are in an operating system directory, and the location of these files are referenced in a separate DIRECTORY object.
  • Authentication: Authentication is provided by using Wallet + DBMS_CREDENTIAL

Procedure:

  1. Create a Cluster Access Directory Object for Kafka for access.

    CREATE DIRECTORY osakaccess_kafkaclus1 AS '';

    Note:

    In this example, the directory object osakaccess_kafkaclus1 is a logical object created to attach database privileges (GRANT READ). The directory object is not being used to point to an actual file system location. Instead, it is being used as a security access control mechanism inside the database.

  2. Create the Cluster Configuration operating system directory for the configuration file location.

    mkdir -p /u01/app/oracle/osak/kafkaclus1/config

    Set permissions as follows on this directory:

    • Directory: 750 (rwxr-x---)
    • osakafka.properties: 540 (rw-r-----)
    • Other files: 440 (r--r-----)
  3. Create the Oracle Directory Object for the config directory location:

    CREATE DIRECTORY OSAK_KAFKACLUS1_CONFIG
       AS '/u01/app/oracle/osak/kafkaclus1/config';

    Note:

    The OSAK_KAFKACLUS1_ CONFIG Directory object points to where configuration and wallet files are placed for this Kafka access.

  4. Create Oracle Wallet (replace the PEM files.

    WALLET_DIR=/u01/app/oracle/osak/kafkaclus1/config/wallet
    WALLET_PWD=MyStrongPassword123
    
    orapki wallet create -wallet $WALLET_DIR -pwd $WALLET_PWD -auto_login

    The credential ensures the database can open the wallet during Kafka operations.

  5. Import certificates into the wallet.

    If starting from PEM files:

    # Convert PEM to PKCS12
    openssl pkcs12 -export -passout 'Welcome1' \
     -in client.pem -inkey client.key -out client.p12
    
    # Import into wallet
    orapki wallet import_pkcs12 \
     -wallet $WALLET_DIR \
     -pwd $WALLET_PWD \
     -pkcs12file client.p12 \
     -pkcs12pwd Welcome1

    (Optional): Import CA cert if needed):

    orapki wallet add \
      -wallet $WALLET_DIR \
      -trusted_cert \
      -cert ca-cert.pem \
      -pwd $WALLET_PWD
  6. Verify the wallet.

    orapki wallet display -wallet $WALLET_DIR -pwd $WALLET_PWD
  7. Create the osakafka.properties file (Wallet-Based SSL)

    security.protocol=SSL
    ssl.wallet.location=/u01/app/oracle/osak/kafkaclus1/config/wallet/ewallet.p12
    ssl.endpoint.identification.algorithm=https

    Note:

    Remove the desupported properties:

    • ssl.ca.location
    • ssl.certificate.location
    • ssl.key.location
    • ssl.key.password

    If these properties are not removed, you will receive an error.

  8. Store the wallet password in the database credential.

    begin
        dbms_credential.create_credential(
            credential_name => 'KAFKACLUS1_WALLET_CRED',
            username        => 'wallet',
            password        => 'MyStrongPassword123'
        );
    end;
    /
  9. Register the Kafka cluster with the wallet.

    SELECT DBMS_KAFKA_ADM.REGISTER_CLUSTER(
             cluster_name        => 'KAFKACLUS1',
             bootstrap_servers   => 'mykafkabootstrap-host:9092',
             kafka_provider      => DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE,
             cluster_access_dir  => 'OSAK_KAFKACLUS1_ACCESS',
             credential_name     => NULL,
             cluster_config_dir  => 'OSAK_KAFKACLUS1_CONFIG',
             cluster_description => 'My test cluster kafkaclus1',
             options             => '{"walletcredname":"KAFKACLUS1_WALLET_CRED"}'
           )
    FROM dual;

    The output you should see from this is 0.

  10. Grant access to Kafka users.

    GRANT READ ON DIRECTORY OSAK_KAFKACLUS1_ACCESS TO app1_usr;

As a result of this process:

  • Kafka cluster registration points to a DIRECTORY object called OSAK_KAFKACLUS1_ACCESS.
  • Only users with READ privilege on that DIRECTORY can access the specified Kafka cluster.
  • The administrator can easily grant or revoke access to specific users.
  • The administrator can audit access using database mechanisms.

22.14 Administering Oracle SQL Access to Kafka Clusters

See how to update, temporarily disable, and delete Kafka cluster definitions with Oracle SQL access to Kafka

22.14.1 Updating Access to Kafka Clusters

If the Kafka cluster environment changes, you can update the cluster definition and configuration for those changes.

During the lifetime of the Kafka cluster definition, if you need to update the cluster definition, then you can use DBMS_KAFKA_ADM.UPDATE_CLUSTER_INFO and DBMS_KAFKA_ADM.CHECK_CLUSTER.

22.14.2 Disabling or Deleting Access to Kafka Clusters

You can temporarily disable an Oracle SQL access to a Kafka cluster, or delete the connection if it is no longer required.

Example 22-13 Disabling a Kafka Cluster

During temporary outages of the Kafka environment, you can temporarily disable access to the Kafka cluster

  • DBMS_KAFKA_ADM.DISABLE_CLUSTER followed by
  • DBMS_KAFKA_ADM.ENABLE_CLUSTER when the Kafka environment is back up

Example 22-14 Deleting a Kafka Cluster

When a cluster definition is no longer needed, the OSAK Administrator can remove the cluster definition

  • DBMS_KAFKA_ADM.DEREGISTER_CLUSTER

22.15 Guidelines for Using Kafka Data with Oracle SQL Access to Kafka

Review guidelines, restrictions, and recommendations as part of your application development plan.

22.15.1 Kafka Temporary Tables and Applications

Oracle SQL access to Kafka views and their corresponding temporary tables are bound to a unique Kafka application (a group ID), and must exclusively access one or more partitions in a topic on behalf of that application.

Use these guidelines to assist you with constricting your applications.

Kafka Group IDs and Oracle SQL Access to Kafka Temporary Tables

Unlike standard Oracle tables and views, in accordance with the rules for consuming Apache Kafka data, Kafka temporary tables cannot be shared across multiple applications. With Kafka data, each temporary table is a snapshot of data fetched directly from Kafka at a particular point of time, and has a canonical name format that identifies the Kafka cluster, the application name, and a view ID, an integer identifying a particular view accessing one or more partitions in the cluster or topic on behalf of an application associated with a consumer group ID (groupID) in Kafka. The temporary views and tables created in Oracle AI Database are bound to a unique Kafka application (identified by groupID), and must exclusively access one or more partitions in a topic on behalf of that application. It cannot share access to these partitions simultaneously with other applications. This restriction extends to an Oracle application instance. An Oracle SQL Access to Kafka view and its associated temporary table must be exclusive to that application. If you want to configure multiple applications to query the same Kafka topic or partition data, then these applications must identify themselves as a different application (that is, with different, unique Kafka group IDs), and create their own Oracle SQL access to Kafka applications, reflecting their own group ID and applcation identity, and their own set of offsets to track.

Guidelines for Using Views and Tables with Oracle SQL Access to Kafka

Create views and tables for your applications in accordance with the kinds of analytics you want to perform with that data.

If you want your application to use Oracle SQL for analytics, then Oracle recommends that you create an Oracle SQL access to Kafka view for that application that captures all partitions of the data that you want to query. Each visit by a single application instance captures all new Kafka data in a topic, and generates aggregate information that the application can then store or display.

If you do not want to perform analytics using Oracle SQL, but instead use complex logic in the application itself, then Oracle recommends that you scale out the application instances, and have each Oracle SQL access to Kafka view access a single partition on behalf of a single application instance. For this case, typically the Kafka data is joined with standard Oracle tables to enrich the data returned to the application.

In cases where some SQL analytics and joins are performed before more analysis is done by the application, views mapping to some subset of the partitions in a topic can be a good option to choose.

22.15.2 Sharing Kafka Data with Multiple Applications Using Streaming

To enable multiple applications to use Kafka data, use Oracle SQL access to Kafka to stream Kafka tables to a user table.

To share Kafka data with multiple Oracle users, so that table is not tied to a specific Group ID, Oracle recommends that you have an application user run the Oracle SQL access to Kafka in Loading mode, with the PL/SQL procedure DBMS_KAFKA.EXECUTE_LOAD_APP, to create a table owned by that user. With this option, a single application instance runs the Loading PL/SQL procedure on a regular basis to load all new data incrementally from a Kafka topic into an Oracle AI Database table. After the data is loaded into the table, it can then be made accessible to standard Oracle database applications granted access to that table, without the restrictions that apply to temporary tables.

22.15.3 Dropping and Recreating Kafka Tables

Because the Kafka offsets are managed by the DBMS_KAFKA metadata tables, changes to a Kafka topic configuration can require manual updates to Oracle SQL access to Kafka applications.

To ensure that Oracle application instances can identify what Kafka table content has been read, and where it has been read, partition offsets of a Kafka topic must tracked on a per application instance basis.

Kafka supports three models for committing offsets:

  • Auto-commit, where Kafka commits the last offset fetched on a short time schedule
  • Manual commit, where applications send a request for Kafka to commit an offset
  • Application-managed commits, where Kafka commits are entirely managed by the applications.

Oracle uses application-managed commits. In these commits, Kafka sees this as an application declaring manual commits without ever explicitly committing to Kafka. Offsets are recorded and maintained exclusively in DBMS_KAFKA metadata tables. These tables are protected by the ACID transaction properties of Oracle Database. To insure the integrity of transactions, Oracle does not support Kafka auto-commit or Kafka manual commit in Oracle SQL Access to Kafka.

If a Kafka topic is dropped and recreated, then you must update that table manually, depending on the scenario:

Example 22-15 Dropping and Resetting a View with the Same Partitions

If the number of partitions remains the same as the original Kafka topic configuration, then you must reset the view reset the Oracle SQL access to Kafka view to begin processing from the beginning of the Kafka partition within the recreated topic. To reset the view, call the procedure DBMS_KAFKA.INIT_OFFSET(view_name, 0, 'WML'), where view_name is the name of the view.

Example 22-16 Dropping and Resetting a View with Fewer Partitions

This option is not available. If the number of partitions is less than the original Kafka topic configuration, then the Oracle SQL access to Kafka applications associated with this topic must be dropped and recreated.

Example 22-17 Dropping and Resetting a View with More Partitions

If the number of partitions is greater than the original Kafka topic configuration, then you must reset the Oracle SQL Access to Kafka view by calling the procedure DBMS_KAFKA.INIT_OFFSET(view_name, 0, 'WML'), where view_name is the name of the view, and then call the procedure DBMS_KAFKA.ADD_PARTITIONS for each Oracle SQL Access to Kafka application using this topic.

22.16 Choosing a Kafka Cluster Access Mode for Applications

To use Oracle SQL access to Kafka, decide what mode of data access you require for your applications.

22.16.1 Configuring Incremental Loads of Kafka Records Into an Oracle Database Table

To enable applications to load data incrementally from a Kafka topic into an Oracle Database table, you use Oracle SQL Access to Kafka in Loading mode.

Configuring Oracle SQL Access to Kafka to perform incremental loads using the EXECUTE_LOAD_APP procedure enables you to move Kafka data into standard Oracle tables, which are accessible by multiple applications without the one reader constraint imposed when using Oracle SQL access to Kafka temporary tables.

To load Kafka data incrementally into an Oracle Database table, an application declares that it is a loading application by calling the PL/SQL procedure DBMS_KAFKA.CREATE_LOAD_APP to initialize a state for subsequent calls toDBMS_KAFKA.EXECUTE_LOAD_APP. The DBMS_KAFKA.CREATE_LOAD_APP procedure creates a single view over all partitions of the topic.

If you do not require data from the entire topic, then you also have the option to configure the application to call the DBMS_KAFKA.INIT_OFFSET[_TS] procedure to set the starting point in Kafka topic partitions for loading the Kafka data.

The DBMS_KAFKA.EXECUTE_LOAD_APP procedure is called in an application loop to load data from where the previous call left off to the current high water mark of the Kafka topic. This procedure runs in an autonomous transaction.

To load data into an Oracle Database table from a Kafka topic:

  • DBMS_KAFKA.CREATE_LOAD_APP to create an Oracle SQL Access to Kafka Load application
  • Optionally, DBMS_KAFFA_INIT_OFFSET_TS or DBMS_KAFKA_INIT_OFFSET to set the first Kafka record to be read
  • LOOP until done
    • DBMS_KAFKA.EXECUTE_LOAD_APP to load Kafka data starting from where we left off to the current high water mark
  • DBMS_KAFKA.DROP_LOAD_APP to drop the load application

22.16.2 Streaming Access to Kafka Records in Oracle SQL Queries

To access Kafka topics in a sequential manner from the beginning of the topic, or from a specific starting point in a Kafka topic, you can use Oracle SQL Access to Kafka in Streaming mode.

If your application requires access to Kafka topics in a sequential manner, you can configure Oracle SQL Access to Kafka in Streaming mode. This mode enables a SQL query using an Oracle SQL access to Kafka temporary table to access Kafka records sequentially in an application processing loop. With this use case, the application declares that it is a streaming application by calling the PL/SQL procedure DBMS_KAFKA.CREATE_STREAMING_APP to initialize the state for subsequent queries of Oracle SQL access to Kafka views. In addition to creating views, this procedure also creates a global temporary table for each view. You also have the option to use the INIT_OFFSET[_TS] procedure to set the starting point in Kafka topic partitions for your application. When you set as starting point, the initial query reads the Kafka data from the starting point. The application then can perform the following steps, in a processing loop:

  1. Call DBMS_KAFKA.CREATE_STREAMING_APP to create the Oracle SQL access to Kafka streaming application.
  2. (Optional) call DBMS_KAFFA_INIT_OFFSET_TS or DBMS_KAFKA_INIT_OFFSET to set the first Kafka record that you want to be read.
  3. LOOP until done:
    1. Call DBMS_KAFKA.LOAD_TEMP_TABLE to load the global temporary table with the next set of rows from Kafka
    2. SELECT from the OSAK global temporary table Process data retrieved
    3. If the processing was successful, call DBMS_KAFKA.UPDATE_OFFSET to update the last Kafka offsets read
    4. Commit the offset tracking information using COMMIT.
  4. When finished, call DBMS_KAFKA.DROP_STREAMING_APP to drop the application.

The PL/SQL procedure DBMS_KAFKA.UPDATE_OFFSET transparently advances Kafka partition offsets of the Kafka group ID for all of the partitions that are identified with the Oracle SQL access to Kafka view, so that for every call to DBMS_KAFKA.LOAD_TEMP_TABLE, a new set of unread Kafka records is retrieved and processed

Note that UPDATE_OFFSET initiates an Oracle transaction if a transaction is not already started, and records the last offsets in metadata tables. Because of this, to ensure that the transaction does not lose its session information you should configure your application to commit the transaction after every call to UPDATE_OFFSET. After you commit the transaction, because Oracle SQL access to Kafka manages offsets within an Oracle transaction, no records are lost or reread. If the transaction fails to complete, then offsets are not advanced. When the application resumes data reads, it can then restart the data reads of the Kafka data from where it stopped its previous reads.

22.16.3 Seekable access to Kafka Records in Oracle SQL queries

To access Kafka records randomly between two timestamps, you use Oracle SQL Access to Kafka in Seekable mode

The Seekable mode of Oracle SQL access to Kafka enables an application to read Kafka records between timestamps of interest, typically identified by a peer application doing streaming access. In this mode, you specify the start and end timestamps that define a window of time from which the DBMS_KAFKA.LOAD_TEMP_TABLE procedure will populate the temporary table. An application declares that it is a Seekable application by calling the PL/SQL procedure DBMS_KAFKA.CREATE_SEEKABLE_APP to initialize the state for accessing Kafka in Seekable mode. This procedure creates a view and a corresponding global temporary table over all partitions of the topic. The DBMS_KAFKA.SEEK_OFFSET_TS procedure is called to specify the time window from which to query. The application calls SEEK_OFFSET_TS before calling the DBMS_KAFKA.LOAD_TEMP_TABLE procedure to load the temporary table with the next set of rows.

To query Kafka data in ”Seekable” mode in order to access Kafka records between two timestamps

  • DBMS_KAFKA.CREATE_SEEKABLE_APP to create the Oracle SQL Access to Kafka seekable application
  • LOOP until done
    • DBMS_KAFKA.SEEK_OFFSET_TS to seek to a user defined window of time in a Kafka topic
    • Call DBMS_KAFKA.LOAD_TEMP_TABLE to load the global temporary table with the set of rows from Kafka
    • SELECT from the OSAK global temporary table
    • Process the data
  • DBMS_KAFKA.DROP_SEEKABLE_APP when done with the application

22.17 Creating Oracle SQL Access to Kafka Applications

To query Kafka data in a LOAD application, load Kafka data into an Oracle Database table using these procedures.

Typical uses of load procedures include:

DBMS_KAFKA.CREATE_LOAD_APP: This procedure is used to set up loading into an Oracle table

DBMS_KAFKA.INIT_OFFSET[_TS] (OPTIONAL): This procedure is used to set offsets in all topic partitions to control the starting point of a sequence of load operations. You repeat this procedure until you no longer want to load new rows from the Kafka topic on which you run the procedure.

DBMS_KAFKA.EXECUTE_LOAD_APP: This procedure is used to load new unread records from a Kafka topic to high water mark of all topic partitions

DBMS_KAFKA. DROP_LOAD_APP: This procedure is used when loading is complete from the Kafka topic on which you are running procedures.

22.17.1 Creating Load Applications with Oracle SQL Access to Kafka

If you want to load data into an Oracle Database table, then use the Loading mode of DBMS_KAFKA.

An Oracle SQL access to Kafka load application retrieves data from all partitions of a Kafka topic, and places that data into an Oracle Database table for processing. It also creates, if not already present, a metadata view that is used to inspect the Kafka cluster for live topic and partition information regarding the Kafka topic. This view is created once, and serves all applications that are sharing the same cluster. Only one application instance is allowed to call DBMS_KAFKA.EXECUTE_LOAD_APP for the created LOAD application.

Example 22-18 Loading Data Into a Table with DBMS_KAFKA.EXECUTE_LOAD_APP

In this example, you create create one view and associated temporary table for a loading application. The Kafka cluster name is ExampleCluster, the application name is ExampleApp. The Kafka Topic is ExampleTopic, which is a topic that has four partitions:

DECLARE
  v_options VARCHAR2(4000);
BEGIN
  v_options := '{"fmt":"DSV","reftable":"user_reftable_name"}';

  SYS.DBMS_KAFKA.CREATE_LOAD_APP(
    'ExampleCluster',
    'ExampleApp',
    'ExampleTopic',
    v_options
  );
END;
/

Example 22-19 Loading Data Periodically Into a Table with DBMS_KAFKA.EXECUTE_LOAD_APP

As an alternative to processing Kafka data from a set of application views, you can choose simply to load the data from Kafka into an Oracle Database table, periodically fetching the latest data into the table. The DBMS_KAFKA.EXECUTE_LOAD_APP procedure in this example obtains the latest data from the Kafka cluster, and inserts the data into the table, ExampleLoadTable. An application that uses the data in this table has the option to calll DBMS_KAFKA.INIT_OFFSET[_TS] to set the starting point for the load.

DECLARE
  v_records_inserted PLS_INTEGER;
BEGIN
  SYS.DBMS_KAFKA.EXECUTE_LOAD_APP(
    'ExampleCluster',
    'ExampleLoadApp',
    'ExampleLoadTable',
    v_records_inserted
  );

  -- Optional: view the output value
  -- DBMS_OUTPUT.PUT_LINE('Records inserted: ' || v_records_inserted);
END;
/

Example 22-20 Dropping the Kafka View and Metadata with DBMS_KAFKA.DROP_LOAD_APP or DBMS_KAFKA.DROP_ALL_APPS

If the Oracle SQL access to Kafka Load application is no longer needed, then you can drop the views and metadata by calling DBMS_KAFKA.DROP_LOAD_APP. In the following example, the Kafka cluster is ExampleCluster, and the application is ExampleApp.

EXEC SYS.DBMS_KAFKA.DROP_LOAD_APP('ExampleCluster', 'ExampleApp');

If the Kafka cluster for one or more Oracle SQL access to Kafka applications no longer exists, then you can drop all of the applications for a given cluster by calling DBMS_KAFKA.DROP_ALL_APPS

EXEC SYS.DBMS_KAFKA.DROP_ALL_APPS('ExampleCluster');

22.17.2 Creating Streaming Applications with Oracle SQL Access to Kafka

If you want to load data into an Oracle Database table, then use the Loading mode of DBMS_KAFKA.

Streaming enables the ability to process data at scale. You can use Oracle SQL access to Kafka in streaming mode to create multiple application instances. Multiple instances enables applications to scale out and divide the workload of analyzing Kafka data across the application instances running concurrently on one or more threads, processes, or systems.

An Oracle SQL access to Kafka streaming application includes a set of dedicated Oracle SQL access to Kafka global temporary tables and Oracle SQL access to Kafka views. These temporary tables and views can be used for retrieving new, unread records from partitions in a Kafka topic.

It also creates, if not already present, a metadata view that is used to inspect the Kafka cluster for active topic and partition information regarding the Kafka topic. This view is created once, and serves all applications that are sharing the same cluster.

Each Oracle SQL access to Kafka global temporary table and its related view is exclusively used by one instance of an Oracle SQL access to Kafka application.

Each application instance calls LOAD_TEMP_TABLE, which populates the dedicated Oracle SQL access to Kafka global temporary table with Kafka rows retrieved from the associated view. The application then can run one or more SQL queries against the content in the Oracle SQL access to Kafka global temporary table. When the application is done with the current set of Kafka rows, it calls UPDATE_OFFSET and COMMIT.

A STREAMING mode application is different from a LOAD or SEEKING application in that you can configure the application to select how many Oracle SQL access to Kafka views and temporary tables are required for your application purpose. As with other types of Oracle SQL access to Kafka applications, each application instance exclusively queries one unique Oracle SQL access to Kafka temporary table. Each Oracle SQL access to Kafka view and global temporary table name includes the cluster name, the application name, and an application instance identifier (ID).

In creating your application, be aware that the number Oracle SQL access to Kafka views and temporary table pairs you create must be between 1 and N where N is the number of partitions in the Kafka topic.

During runtime, each application instance runs in its own user session, and processes one Oracle SQL access to Kafka global temporary table and its associated view. Accordingly, to run application instances concurrently, you must allocate at least as many sessions to the user as there are partitions in the Kafka topic (that is, the value of N). If the view_count exceeds the maximum sessions per user, then this call fails with an error indicating that there are insufficient sessions allocated to the user. The number of Kafka partitions bound to a specific Oracle SQL access to Kafka view and its associated global temporary table varies, depending on how many views are created, and on how many partitions exist. Oracle SQL access to Kafka balances the number of partitions assigned to each view.

Example 22-21 Streaming Data Into a Table with DBMS_KAFKA.CREATE_STREAMING_APP

In this example, you create a set of four views and associated temporary tables for a Streaming mode application using data from a topic called ExampleTopic. The topic has four partitions, and each view and temporary table is associated with one partition:

DECLARE
  v_options VARCHAR2(4000);
BEGIN
  v_options := '{"fmt":"DSV","reftable":"user_reftable_name"}';

  SYS.DBMS_KAFKA.CREATE_STREAMING_APP(
    'ExampleCluster',
    'ExampleApp',
    'ExampleTopic',
    v_options,
    4
  );
END;
/

Example 22-22 Loading Data Into a Single Table with DBMS_KAFKA.CREATE_STREAMING_APP

In this example, Streaming mode is used to create one view and associated temporary table for an application that is associated with all four partition of the topic:

DECLARE
  v_options VARCHAR2(4000);
BEGIN
  v_options := '{"fmt":"DSV","reftable":"user_reftable_name"}';

  SYS.DBMS_KAFKA.CREATE_STREAMING_APP(
    'ExampleCluster',
    'ExampleApp',
    'ExampleTopic',
    v_options,
    1
  );
END;
/

Example 22-23 Dropping the Kafka View and Metadata with DBMS_KAFKA.DROP_STREAMING_APP or DBMS_KAFKA.DROP_ALL_APPS

If the Oracle SQL access to Kafka Load application is no longer needed, then you can drop the views and metadata by calling DBMS_KAFKA.DROP_STREAMING_APP. In the following example, the Kafka cluster is ExampleCluster, and the application is ExampleApp.

EXEC SYS.DBMS_KAFKA.DROP_STREAMING_APP('ExampleCluster', 'ExampleApp');

If the Kafka cluster for one or more Oracle SQL access to Kafka applications no longer exists, then you can drop all of the applications for a given cluster by calling DBMS_KAFKA.DROP_ALL_APPS

EXEC SYS.DBMS_KAFKA.DROP_ALL_APPS('ExampleCluster');

22.17.3 Creating Seekable Applications with Oracle SQL Access to Kafka

If you want to investigate issues that occurred in the past, and randomly access a Kafka topic between starting and ending timestamps, then use the Seekable mode of DBMS_KAFKA.

Before accessing Kafka topics in Seekable mode, you must create an Oracle SQL Access to Kafka application with DBMS_KAFKA.CREATE_SEEKABLE_APP This package creates an application that you can use in Seekable mode.

Using Oracle SQL access to Kafka in Seekable mode enables you to use Kafka data to investigate issues that have occurred in the past. Provided that the data is still present in the Kafka steam, you can create a Seekable application by calling DBMS_KAFKA.CREATE_SEEKABLE_APP. When you have created a Seekable mode application, you can then call the procedure DBMS_KAFKA.SEEK_OFFSET_TS to request the Oracle SQL access to Kafka view to retrieve a range of data records. For example, suppose that an IT consultant was informed that a production issue occurred around 03:00 in the morning, and needed to investigate the cause. The consultant could use the following procedure, load the temporary table, and then select to retrieve an hour's worth of data around that time:

In creating your application, be aware that the number Oracle SQL access to Kafka views and temporary table pairs you create must be between 1 and N where N is the number of partitions in the Kafka topic.

During runtime, each application instance runs in its own user session, and processes one Oracle SQL access to Kafka global temporary table and its associated view. Accordingly, to run application instances concurrently, you must allocate at least as many sessions to the user as there are partitions in the Kafka topic (that is, the value of N). If the view_count exceeds the maximum sessions per user, then this call fails with an error indicating that there are insufficient sessions allocated to the user. The number of Kafka partitions bound to a specific Oracle SQL access to Kafka view and its associated global temporary table varies, depending on how many views are created, and on how many partitions exist. Oracle SQL access to Kafka balances the number of partitions assigned to each view.

Example 22-24 Searching a Date Range in Kafka Data Using DBMS_KAFKA.CREATE_SEEKABLE_APP

In this example, suppose that an IT consultant was informed that a production issue occurred around 03:00 in the morning, and needed to investigate the cause. The consultant could use the following procedure, load the temporary table, and then select to retrieve an hour's worth of data around that time, where the Kafka cluster is EXAMPLECLUSTER, and the columns are EventCol and ExceptionCol:

BEGIN
  SYS.DBMS_KAFKA.SEEK_OFFSET_TS(
    'ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0',
    TO_DATE('2022/07/04 02:30:00', 'YYYY/MM/DD HH24:MI:SS'),
    TO_DATE('2022/07/04 03:30:00', 'YYYY/MM/DD HH24:MI:SS')
  );

  SYS.DBMS_KAFKA.LOAD_TEMP_TABLE('ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0');
END;
/
SELECT EventCol, ExceptionCol
FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;

Example 22-25 Locating Records Associated with Anomalies Using DBMS_KAFKA.CREATE_SEEKABLE_APP

Suppose that when an application using sequential access to a Kafka stream detected a potential anomaly, the application inserts a row into an anomaly table. The anomaly table includes the Kafka timestamp, as well as any other data specified as important to trace. Another application could use this information to retrieve records around the suspected record to see if there were any other issues associated with the anomaly. In this example, the columns associated with an anomaly that an IT consultant wants to examine are UserCol and ReqeustCol. To achieve this, run the following procedure, load the temporary table, and then select and apply application logic to the results:

BEGIN
  SYS.DBMS_KAFKA.SEEK_OFFSET_TS(
    'ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0',
    TO_DATE('2020/07/04 02:30:00', 'YYYY/MM/DD HH24:MI:SS'),
    TO_DATE('2020/07/04 03:30:00', 'YYYY/MM/DD HH24:MI:SS')
  );

  SYS.DBMS_KAFKA.LOAD_TEMP_TABLE('ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0');
END;
/

SELECT UserCol, RequestCol
FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;

-- application logic

Example 22-26 Dropping the Kafka View and Metadata with DBMS_KAFKA.DROP_SEEKABLE_APP or DBMS_KAFKA.DROP_ALL_APPS

If the Oracle SQL access to Kafka Load application is no longer needed, then you can drop the views and metadata by calling DBMS_KAFKA.DROP_SEEKABLE_APP. In the following example, the Kafka cluster is ExampleCluster, and the application is ExampleApp.

EXEC SYS.DBMS_KAFKA.DROP_SEEKABLE_APP
      ('ExampleCluster', 'ExampleApp');

If the Kafka cluster for one or more Oracle SQL access to Kafka applications no longer exists, then you can drop all of the applications for a given cluster by calling DBMS_KAFKA.DROP_ALL_APPS

EXEC SYS.DBMS_KAFKA.DROP_SEEKABLE_APP('ExampleCluster', 'ExampleApp');

22.18 Using Kafka Cluster Access for Applications

Learn how to use Kafka cluster data access with your applications.

22.18.1 How to Diagnose Oracle SQL Access to Kafka Issues

If you encounter issues with Oracle SQL access to Kafka, then use these guidelines to determine the cause, and resolve the issue.

The following are the main diagnostic issues for Oracle SQL access to Kafka:

Failures to establish an initial connection

Errors of this type are as follows:

  • Incorrect startup server list
  • Incorrect credential information
  • Networking configuration issues

Failures on first access

Failures on first access when calling DBMS_KAFKA CREATE_LOAD_APP, CREATE_STREAMING_APP, or CREATE_SEEKABLE_APP typically have the following causes:
  • Missing or incorrect topic
  • Connection issues

Failures during record selection

Failures of this type typically have the following causes:
  • Connection issues
  • Internal metadata or logic issues
  • Missing records
  • Parsing errors where the Oracle SQL access to Kafka view shape does not match the input.

Failure for an Oracle application and Oracle SQL access to Kafka views to keep up with Kafka data input.

Failures of this type require resource tuning. They occur when the ingestion rate of rows into a topic in a Kafka cluster comes close to or exceeds the Oracle Database ability to consume Kafka records, such that after a period of time, unread records in Kafka become aged out by Kafka before they are consumed by Oracle Database.

Avoid or correct this kind of error by determining the workload. For example, check the frequency of querying, the typical number of records processed per query per Oracle SQL access to Kafka view, the degree of parallelism being used, and the time spent by an application performing analysis. When you have determined the workload, then ensure that the application stack can meet it. Size your resources so that the application and Oracle Database can process peak Kafka records without stressing either the application or Oracle Database resources.

If you find that throughput rates start increasing, then several things can help. For example: increase the degree of parallelism for the application user, start more application instances, or add partitions to the Kafka cluster.

Example 22-27 Resolving an Oracle SQL Access to Kafka (OSAK) Application Error

Suppose your OSAK application EXAMPLEAPP is loading data from the Kafka cluster EXAMPLECLUSTER, and you receive an error such as the following:

ORA-62721: The specified parallel hint [%0!s] exceeds the granule count {%1!s}.

The cause of this error is that the specified value was greater than the maximum possible parallelism, which is determined by the granule count. How do you resolve such an error?

The parallel_hint parameter on LOAD_TEMP_TABLE and EXECUTE_LOAD_APP is related to the degree or parallelism (or DOP), which determines how many parallel process can be run for a given select statement to fetch the data. To leverage parallel queries to their potential, the parallel_hint parameter must be set between 2 and the maximum allowed DOP. The maximum DOP is either the maximum allowed for the user making the call, or the number of partitions associated with the OSAK view, whichever is smaller. The cause is that either the database or the user account running the application has exceeded the maximum allowed DOP.

To resolve this issue, specify a value less than or equal to the granule count. The granule count can be determined by calling the DBMS_KAFKA.GET_GRANULE_COUNT function:

DECLARE
  v_dop        PLS_INTEGER;
  v_success    BOOLEAN := TRUE;  -- set this based on your processing outcome
BEGIN
  LOOP
    v_dop := SYS.DBMS_KAFKA.GET_GRANULE_COUNT('ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0');

    -- Example exit condition (adjust to your intended logic)
    IF v_dop = 0 THEN
      EXIT;
    END IF;

    SYS.DBMS_KAFKA.LOAD_TEMP_TABLE('ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0');

    FOR kafka_record IN (
      SELECT kafka_offset
      FROM   ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0
    ) LOOP
      DBMS_OUTPUT.PUT_LINE('Processing record: ' || kafka_record.kafka_offset);

      -- application logic to process the Kafka records
      -- set v_success := TRUE/FALSE accordingly
    END LOOP;

    IF v_success THEN
      -- Update internal metadata to confirm Kafka records were successfully processed
      SYS.DBMS_KAFKA.UPDATE_OFFSET('ORA$DKV_EXAMPLECLUSTER_EXAMPLEAPP_0');
      COMMIT;
    ELSE
      -- add your application logic to correct for any failures
      NULL;
    END IF;
  END LOOP;
END;
/

22.18.2 Identifying and Resolving Oracle SQL Access to Kafka Issues

To assist with identifying and resolving issues, Oracle SQL access to Kafka provides trace files, message tables, operation results tables, and a state column in the cluster table.

Determine the nature of the issue you see, and then use the utility available to you to identify and address the issue:

  • Connection issue, logic issue, or Kafka access layer (Oracle executables called by a Kafka data select) Check the trace file. Also, you can check the state column in the sys.user_kafka_clusters table.
  • Exceptions from DBMS_KAFKA and DBMS_KAFKA_ADM APIs: Review error messages in the sys.user_kafka_messages table.
  • Operations runtime issue: Review messages in the sys.user_kafka_ops_results table when the performance of Oracle SQL access to Kafka data retrieval is not as expected.

Example 22-28 Connection issue, Logic Issue or Kafka access layer issue

Use the trace file to identify the issue.

  • For connection related issues, the details are available from the view object tracing. To enable, either add the event to the init.ora file or use the alter system command to update the system during runtime:

    Add the following entry to the initialization file (init.ora):

    event='trace[KGRK] disk highest'

    Alter the system:

     alter system set events 'trace[KGRK] disk highest';

    Note:

    Updates to the init.ora file require a restart of the database to take effect.
  • For logic-related errors, all error paths contain tracing. All messages are prefaced with by the string kubsCRK. These logic errors will also result in SQL exceptions being raised.

  • The tracing output for the Kafka access layer of an Oracle SQL access to Kafka application is enabled by calling DBMS_KAFKA.SET_TRACING with the enable argument passed as TRUE. The tracing output is disabled by calling the same function with the enable argument passed as FALSE.

    For example:

    To enable tracing for a cluster named ExampleCluster, with the application is ExampleApp, enter the following:

    EXEC SYS.DBMS_KAFKA.SET_TRACING('ExampleCluster', 'ExampleApp', TRUE);

    To disable tracing for that cluster, enter the following:

    EXEC SYS.DBMS_KAFKA.SET_TRACING('ExampleCluster', 'ExampleApp', FALSE);

    Note:

    To enable tracing, the following event must already be enabled for the database:

    event="39431 trace name context forever, level 1" # Enable external table debug tracing  

If you determine that the issue is a connection issue, then check the State column in the sys.user_kafka_clusters table. The connection levels are designated by numeric values:

  • CONNECTED (0): This state indicates that the connection to the Kafka cluster has been established. Errors that occur while the connection is established indicate an issue with requesting the Kafka data. To identify the issue, enable tracing by using the DBMS_KAFKA.SET_TRACING API, reproduce the problem, and then check the associated trace file for the session for messages containing 'kubsCRK". Also check for messages in the user_kafka_messages table.
  • MAINTENANCE (1): This state indicates that the connection to the Kafka cluster has been established, but errors that occur while the connection is established indicate an issue requesting the Kafka data. To resolve this issue, enable tracing using the DBMS_KAFKA.SET_TRACING API, reproduce the problem, and then check the associated trace file for the session for messages containing kubsCRK. Also check for messages in the user_kafka_messages table.
  • BROKEN (2): This state indicates that a connection cannot be reestablished to the Kafka cluster. Look for errors in the trace file for the facility KUBD, and in the message table.
  • DEREGISTERED (3): This state indicates that the OSAK administrator has forced the cluster to be deregistered, and the associated Oracle SQL access to Kafka views should no longer be accessed. This is expected behavior, and not an error.

Example 22-29 PL/SQL Package issues

Check the Sys.user_kafka_messages table. This table contains any messages logged within the last three days. The data is automatically purged of older data once a day. The messages are also removed if the OSAK views associated with the data are dropped.

Example 22-30 Operations Runtime Issue

If the number of rows retrieved using a SELECT statement appears to be less than expected, then use the data in the sys.user_kafka_ops_results table to review the number of records read from Kafka for the last selection.

The SELECT only contains rows that parsed correctly, so the difference between the rows retrieved and Kafka records read indicates that not all data in the Kafka topic is in the format specified during the DBMS_KAFKA CREATE_LOAD_APP, CREATE_STREAMING_APP, or CREATE_SEEKABLE_APP call.

If the Kafka topic data is not in the specified format, then the answers are as follows:

  1. Fix the producers publishing to the Kafka cluster.
  2. Drop and recreate the application so that it provides the proper format (reference table for DSV, Avro schema for AVRO).
  3. For JSON data, before you drop and recreate the application, check to see if the data exceeds the maximum column length in the VARCHAR2 VALUE column. If the data is larger than the maximum, then you can drop and recreate the application, but this time add the option "jsond" : "clob" to the options parameter. This option enables OSAK to create the column as a character large object (CLOB) column, instead of the default maximum sized VARCHAR2.