22 Oracle SQL Access to Kafka
Starting with Oracle AI Database 26ai, you can use Oracle SQL APIs to query Kafka topics dynamically using Oracle SQL.
Oracle SQL Access to Kafka integrates Kafka and OCI Streaming Service streams
with Oracle AI Database in several important ways. First, it enables you to
connect Oracle AI Database to one or more Kafka topics. After the database
is connected, you can then query that topic dynamically using Oracle SQL,
without persisting the Kafka data in Oracle AI Database. This feature
enables you to analyze real time data in combination with data captured in
your database. In addition, Oracle SQL Access to Kafka enables fast,
scalable and lossless loading of Kafka topics into Oracle AI Database. The
DBMS_KAFKA APIs simplify the management of this
entire process.
- About Oracle SQL Access to Kafka Version 2
Oracle SQL Access to Kafka (OSaK) is a native Oracle AI Database feature that enables you to query Kafka topics directly using Oracle SQL. - Global Tables and Views for Oracle SQL Access to Kafka
Learn how Oracle SQL Access to Kafka (OSAK) accesses Kafka STREAMING, SEEKING, and LOAD applications, and how it uses uniqueORA$prefixes for global temporary tables. - Understanding how Oracle SQL Access to Kafka Queries are Performed
Oracle SQL Access to Kafka accesses Kafka streaming data, but queries are performed on Oracle Database global temporary tables, which provides several advantages. - Streaming Kafka Data Into Oracle Databases
Oracle SQL Access to Kafka enables Kafka streaming data to be processed with Oracle AI Database tables using standard SQL semantics. - Querying Kafka Data Records by Timestamp
Oracle SQL Access to Kafka in Seekable mode assists you to query older data stored in Kafka, based on timestamps associated with the Kafka data. - About the Kafka Database Administrator Role
To administer Oracle SQL access to Kafka, grant the Oracle AI Database roleOSAK_ADMIN_ROLEand grant required administration privileges to the administrator role and the Kafka administration API package. - Enable Kafka Database Access to Users
The application user accounts are granted theDBMS_KAFKAdatabase privileges required to access OSAK. - Data Formats Supported with Oracle SQL Access to Kafka
Oracle SQL access to Kafka supports Kafka records represented in three formats: delimited text data (for example,csv), JSON, and Avro. - Configuring Access to a Kafka Cluster
You can configure access to secured Kafka clusters, or non-secured Kafka clusters. - Creating Oracle SQL Access to Kafka Applications
Oracle SQL can access Kafka data using loading, streaming, or seekable applications created with DBMS_KAFKA. - Security for Kafka Cluster Connections
Oracle SQL Access to Kafka supports access to Kafka and Oracle Streaming Service (OSS), using various security mechanisms, such as SSL, SASL, and Kerberos. - Configuring Access to Unsecured Kafka Clusters
To configure access to non-secure Kafka clusters, the OSAK administrator (Oracle Database user with osak_admin_role) must complete this procedure. - Configuring Access to Secure Kafka Clusters Using Oracle Wallet
Configure secure Kafka access by replacing PEM-based SSL with Oracle Wallet and registering the cluster using wallet credentials. - Administering Oracle SQL Access to Kafka Clusters
See how to update, temporarily disable, and delete Kafka cluster definitions with Oracle SQL access to Kafka - Guidelines for Using Kafka Data with Oracle SQL Access to Kafka
Review guidelines, restrictions, and recommendations as part of your application development plan. - Choosing a Kafka Cluster Access Mode for Applications
To use Oracle SQL access to Kafka, decide what mode of data access you require for your applications. - Creating Oracle SQL Access to Kafka Applications
To query Kafka data in a LOAD application, load Kafka data into an Oracle Database table using these procedures. - Using Kafka Cluster Access for Applications
Learn how to use Kafka cluster data access with your applications.
Parent topic: Other Utilities
22.1 About Oracle SQL Access to Kafka Version 2
Oracle SQL Access to Kafka (OSaK) is a native Oracle AI Database feature that enables you to query Kafka topics directly using Oracle SQL.
Starting with Oracle AI Database 26ai, version 2 of Oracle SQL
access to Kafka is installed with the database. The service provides a
native Oracle AI Database connector service to Kafka clusters. It provides a
rich set of features accessed through the DBMS_KAFKA and
DBMS_KAFKA_ADM PL/SQL packages.
Oracle SQL Access to Kafka integrates Kafka and OCI Streaming Service streams with Oracle AI Database in several important ways. First, it enables you to connect Oracle AI Database to one or more Kafka topics. After the database is connected, you can then query that topic dynamically using Oracle SQL, without persisting the Kafka data in Oracle AI Database. This feature enables you to analyze real time data in combination with data captured in your database. In addition, Oracle SQL Access to Kafka enables fast, scalable and lossless loading of Kafka topics into Oracle AI Database. The DBMS_KAFKA APIs simplify the management of this entire process.
Starting with Oracle AI Database 26ai (23.26.2), Oracle SQL Access to
Kafka supports Oracle Wallet-based SSL configuration for secured Kafka
connections. For Oracle AI Database 26ai and later, SSL certificates and
keys are managed through Oracle wallets created with the
orapki tool. The legacy PEM-based SSL properties
are desupported. Replace PEM-based SSL properties with wallet-based
configuration.
What You Can Do with OSaK
With OSaK, you can query, process, and integrate Kafka streaming data directly within Oracle AI Database using familiar Oracle tools and SQL semantics. This capability eliminates the need for external client connectors and enables you to relate streaming data to operational, transactional, or analytical tables within Oracle AI Database. OSaK can scale up data streams for the database in the same fashion as Kafka applications.
Oracle SQL access to Kafka enables you to do the following:
- Create and use a streaming application to process unread Kafka records exactly once, and then discard these records after processing.
- Create and use a loading application to capture unread Kafka records permanently in an Oracle AI Database table, for access by various Oracle applications. Kafka records can be are captured and persisted in user tables in Oracle AI Database both for analytics and data warehousing.
- Re-read or seek records in Kafka topics based on offset or a user-supplied timestamp interval for auditing or reconciliation.
- Join data from multiple Kafka topics, or between Kafka and Oracle tables, all within a sing SQL statement.
How OSaK Works
OSaK exposes system-generated views and external tables over
Kafka topics using the DBMS_KAFKA package. All
operations are part of a database transaction, and meet full ACID
(Atomicity, Consistency, Isolation, Durability) standards. Both data
changes and Kafka partition offset management are performed together
in database transactions, ensuring consistent state. If a
transaction fails, you can use the transaction ID and timestamp to
identify and easily roll back unwanted changes or errors, all
without data loss or duplication.
Without Oracle SQL Access to Kafka, the Kafka partition offsets need to be managed either by the application, or by Kafka, neither of which support transaction semantics. This means that after a system failure, Kafka records can be lost or reprocessed by an application.
Managing Kafka partition offsets in the Oracle AI Database transaction provides better isolation and durability than application- or Kafka-managed offsets. ACID requirements for the database ensure that either all parts of the transaction are committed, or all rolled back, with a unique identifier (a transaction ID) for each transaction. This transaction ID includes timestamps that you can use to identify and roll back errors. The ACID feature of Oracle AI Database transactions provides support for data recovery in case of a failure, without losing or repeating records.
Because Oracle SQL Access to Kafka is available with Oracle AI Database, and is used with PL/SQL and SQL queries, no external client application is required to provide a connector to the database.
The ORA_KAFKA PL/SQL package has functions
and procedures to register a Kafka cluster in a database schema,
query Kafka topics, query data from specified offsets or specified
timestamps, and more. You can choose either to use global temporary
tables without storing the data, or store the data into user tables
in the target Oracle AI Database.
Leveraging Global Temporary Tables (GTT) Best Practice
You can use OSaK to access global temporary tables (GTT) or user tables created in Oracle AI Database so that your application can obtain data. That data can be streams of data, or snapshots of the data from other databases. This data can be accessed directly, or loaded into database tables and be used within your application.
The corresponding global temporary table receives a snapshot from an Oracle SQL access to Kafka view. Applications use this temporary table for one or more queries within a transaction: a GTT is loaded once, and used. The application can use standard Oracle SQL with the GTT. The Kafka offsets are advanced, and then the application commits, indicating that it is finished with the Kafka records loaded in the GTT.
Key Benefits of OSaK Global Temporary Tables
The key benefits of using OSaK GTTs are as follows:
- Batch-Optimized Loading: Each Kafka batch is fetched only once into the GTT, minimizing network round-trips and Kafka cluster load.
- Transaction-Based Caching: The GTT holds a consistent snapshot of the Kafka data for the duration of the transaction. With GTTs, you can issue multiple queries and joins with standard Oracle tables. Repeatable reads are guaranteed without additional Kafka queries needed for the duration of the application instance. You can leverage the mature optimization and processing strategies in Oracle AI Database to minimize code paths needed to join tables efficiently.
- Atomic Offset Management: Upon commit, both database changes and the associated Kafka offsets are atomically persisted, ensuring reliable, exactly-once processing.
- Incremental Data Movement: On subsequent executions, only new records (since the last commit) are fetched, making incremental, efficient data processing seamless.
- You can leverage the mature optimization and processing strategies in Oracle AI Database to minimize code paths needed to join tables efficiently.
Benefits for Database Administrators
- Centralized Management: Kafka integration, offset tracking, and data operations are managed within Oracle’s security and compliance framework. Yo do not need to administer separate middleware.
- Resilient Operations: ACID transactions extend to streaming data: after a failure, no data is lost or reprocessed unnecessarily.
- Efficient Resource Utilization: Each batch requires just one Kafka fetch per transaction, which reduces overhead, network load and latency by fetching Kafka data just once per operation.
- Compliant and Secure: All access, auditing, and governance are handled natively in Oracle Database.
Benefits for Application Developers
- Simple, Powerful SQL Interface: Access Kafka topics with standard SQL and PL/SQL. You can apply mature SQL procedures, simplifying development and enabling rapid prototyping. You can take advantage of the Oracle optimizer to gather statistics on GTT data for efficient execution plans and processing strategies for maximum performance and maintainability.
- Consistent Snapshots: The GTT holds a static data set for each transaction. This enables repeatable reads, making multi-step queries and complex joins predictable and reliable.
- High Performance: There is no need to handle offset logic, idempotency, or error recovery in application code; the database manages these tasks for you.
- Integration Flexibility: You can join Kafka data with operational or historical data within Oracle for holistic analytics and insights.
Parent topic: Oracle SQL Access to Kafka
22.2 Global Tables and Views for Oracle SQL Access to Kafka
Learn how Oracle SQL Access to Kafka (OSAK) accesses Kafka STREAMING,
SEEKING, and LOAD applications, and how it uses unique ORA$ prefixes for
global temporary tables.
For STREAMING and SEEKING applications, OSAK provides views that represent Kafka topic data. Applications use PL/SQL to call an OSAK procedure, which queries the appropriate OSAK view and loads the results into global temporary tables.
LOAD applications do not require global temporary tables. Instead,
LOAD applications perform incremental loads into an existing Oracle
Database table using the EXECUTE_LOAD_APP procedure.
For all three application types (STREAMING, SEEKING and LOAD), OSAK automatically creates the views and external tables.
Both the views and temporary tables created by OSAK have unique
ORA$ prefixes that identify them as generated by OSAK.
OSAK-generated views and tables use the following unique prefixes:
ORA$DKV: Views created by OSAK for Kafka access.ORA$DKX: External tables created by OSAKORA$DKVGTT: Global temporary tables loaded from a streaming or seeking application. which are loaded using OSAK procedures.
Kafka-generated views and external tables serve calls to DBMS_KAFKA to
load data from Kafka into a user-owned table or into a global temporary table.
Typically, these views and external tables are treated as internal objects, which are
not directly manipulated by an Oracle application.
Global temporary tables are loaded transparently when calling
DBMS_KAFKA.LOAD_TEMP_TABLE.
Parent topic: Oracle SQL Access to Kafka
22.3 Understanding how Oracle SQL Access to Kafka Queries are Performed
Oracle SQL Access to Kafka accesses Kafka streaming data, but queries are performed on Oracle Database global temporary tables, which provides several advantages.
A typical application does not query Oracle SQL Access to Kafka views directly. Instead:
- Each query from an Oracle SQL Access to Kafka view fetches data directly from Kafka from the current offset to the current high water mark. Because rows are continually being added, each query from a view will likely retrieve more rows. Therefore,Oracle SQL Access to Kafka views do not support repeatable reads, either explicitly from multiple queries or implicitly within a join.
- There are no reliable statistics gathered from Oracle SQL Access to Kafka views for the query optimizer
- Each query from an Oracle SQL Access to Kafka view results in a trip to the Kafka cluster, re-retrieving the same rows and perhaps additional rows. These query retrievals can affect performance.
The corresponding temporary table receives a snapshot from an Oracle SQL Access to Kafka view. Applications use this temporary table for one or more queries within a transaction. Reading from the temporary table is beneficial for the following reasons:
- Repeatable reads are supported, either explicitly from multiple queries or implicitly within a join
- Reliable statistics are gathered for the query optimizer
- Only one read is made to Kafka when loading the temporary table. Subsequent queries do not require returning to the Kafka cluster to access the data.
The global temporary tables can be joined with standard Oracle tables (for example, Oracle customer relationship management (CRM) tables.
By joining Oracle SQL access to Kafka temporary tables with Oracle Database tables, you obtain the following advantages:
- Leveraging the mature optimization and execution strategies in Oracle Database to minimize code path required to join tables efficiently
- Obtaining Oracle Database transaction semantics, with the security of Oracle Database ACID transaction processing (atomicity, consistency, isolation, and durability), ensuring that all changes to data are performed as if they are a single operation, controlled by the application
- Managing the Kafka partition offsets and committing them to database metadata tables in Oracle Database, so that after a system failure, these Oracle Database transactions with Kafka records are not subject to being lost or reprocessed by an application.
Parent topic: Oracle SQL Access to Kafka
22.4 Streaming Kafka Data Into Oracle Databases
Oracle SQL Access to Kafka enables Kafka streaming data to be processed with Oracle AI Database tables using standard SQL semantics.
Apache Kafka is commonly used to capture and consolidate data from many streaming sources, so that analytics can be performed on this data. Typically, this requires loading of all the Kafka records into the database, and then combining the data with database tables for analytics, either for short-term study or for longer analysis.
With Oracle SQL access to Kafka, you can use standard SQL, PL/SQL and other database development tools to accomplish the load from Kafka to an Oracle Database, and process that data using standard Oracle application logic, such as JDBC applications. Oracle SQL access to Kafka can create a view that maps to all partitions of the Kafka topic that you want to load. Each Oracle SQL access to Kafka call to load more data queries this view, which in turn queries all partitions of the Kafka topic from the previous point last read to the current data high watermark offset (the offset of the last message that was fully inserted to all Kafka partitions). Data retrieved from the Kafka partitions is loaded into a temporary Oracle Database table.
These Oracle SQL Access to Kafka views behave much like a Kafka application instance. They read records from Kafka starting at a given offset until it reaches the high watermark offset
When Oracle SQL Access to Kafka creates a view, it also creates a corresponding global temporary table. The application calls an Oracle SQL Access to Kafka PL/SQL procedure to load this global temporary table with the results of a query from the corresponding Oracle SQL Access to Kafka view.
The global temporary tables can be joined with standard Oracle tables (for example, Oracle customer relationship management (CRM) tables.
By joining Oracle SQL access to Kafka temporary tables with Oracle Database tables, you obtain the following advantages:
- Leveraging the mature optimization and execution strategies in Oracle AI Database to minimize code path required to join tables efficiently
- Obtaining Oracle AI Database transaction semantics, with the security of Oracle AI Database ACID transaction processing (atomicity, consistency, isolation, and durability), ensuring that all changes to data are performed as if they are a single operation, controlled by the application
- Managing the Kafka partition offsets and committing them to database metadata tables in Oracle AI Database, so that after a system failure, these database transactions with Kafka records are not subject to being lost or reprocessed by an application.
Parent topic: Oracle SQL Access to Kafka
22.5 Querying Kafka Data Records by Timestamp
Oracle SQL Access to Kafka in Seekable mode assists you to query older data stored in Kafka, based on timestamps associated with the Kafka data.
In the event of anomalies, you can use Oracle SQL access to Kafka to assist with identifying Kafka data associated with the anomaly in a specified window of time.
For example, suppose a computer company has multiple sites. Each site has labs, and all access to the buildings and labs are protected by key card access. The company has a vast array of employees, some who just need office space, some who maintain the machines in the labs, and some who monitor the buildings for issues such as ventilation issues, unpermitted access, and general usages of the sites. In this scenario, Kafka topics can consist of the following:
- Key card usage (
KCdata) - Facility monitoring (
Fdata) - System monitoring, such as uptime, access, intrusion detection
(
Sdata)
If an usual event is detected while reading through Kafka data and combining it with Oracle data, the application can log the anomaly along with the timestamp of the record containing the unusual event. A second application can then read through these errors and process them. For each unusual event, the application might seek to a window of timestamps 10 seconds before and after the event. This is similar to analyzing exceptions in log files. It is common to look at log entries before and after the event to see if the exception was caused by an earlier issue, or if the exception led to downstream problems.
To evaluate a site issue, you can load the key card readers data
(KCdata) to a permanent table. For example, if multiple
applications use this data, then it would make sense to load that data into an Oracle
Database table that can be used by multiple applications, to assist the real estate team
to track buildings and office usage. The IT department uses the data to determine who is
on site to handle issues.
Using a Streaming query, you can scan the facility data
(Fdata) to determine if there are any atypical or unusual events in
the data. This could be a spike in lab temperature, a door that did not close and is
raising an alarm, the fire detection system sounding an alarm, or other data points
associated with the time frame, such as a door that was left ajar.
The security team is given an alert of a door in a building that did not
close. They use the streaming data to determine the door was left ajar at 3:17 AM. They
can then use a Seeking query to seek multiple other data points
(KCdata, Fdata, Sdata) in a 30-minute
window (3:02 to 3:32) to determine who accessed the building, what doors or labs were
accessed, what machines went offline or were directly accessed, and other data records,
so that they can take the proper response to the developing situation.
In this scenario, you can use Oracle SQL Access to Kafka to create a single view that maps to all partitions of the Kafka topic. When Oracle SQL access to Kafka creates a view, it also creates a corresponding global temporary table. The application first specifies a starting and ending timestamp and then calls Oracle SQL access to Kafka to load the global temporary table with the rows in the specified window of time. You can leverage standard Oracle Database SQL transaction processing to parse large volumes of data to identify relevant device data around the anomalous event.
Parent topic: Oracle SQL Access to Kafka
22.6 About the Kafka Database Administrator Role
To administer Oracle SQL access to Kafka, grant the Oracle AI Database
role OSAK_ADMIN_ROLE and grant required administration privileges to the
administrator role and the Kafka administration API package.
To provide role-based authentication to grant the Oracle SQL access for
Kafka administration privileges to an administrative user, Oracle provides the
OSAK_ADMIN_ROLE starting with Oracle AI Database 26ai. You can
grant this role to an administrator user for Oracle SQL Access to Kafka. This role
grants the system privileges required for users that you designate as Oracle SQL
access for Kafka administrators to configure, register, and manage Kafka clusters.
The system privileges granted by this role are as follows:
- CREATE CREDENTIAL, to create a Kafka SASL-SSL (Simple Authentication and Security Layer) password or OSS (Oracle Streaming Service) authToken
- CREATE ANY DIRECTORY, to create cluster access and cluster configuration directory
- DROP ANY DIRECTORY, to drop cluster access and cluster configuration directory
- READ privileges to
sys.dbms_kafka_clusters - READ privileges to
sys.dbms_kafka_applications - READ privileges to
sys.dbms_kafka_messages
Parent topic: Oracle SQL Access to Kafka
22.7 Enable Kafka Database Access to Users
The application user accounts are granted the DBMS_KAFKA
database privileges required to access OSAK.
As a DBA, you create and grant users privileges to administer and use Oracle SQL access to Kafka. There are two categories of users:
-
Oracle SQL Access to Kafka administrators are privileged users. To simplify management of Oracle SQL access to Kafka, Oracle recommends that the Oracle DBA grant the
OSAK_ADMIN_ROLEto designated Kafka administrators. This role is precreated in the database starting with Oracle AI Database 26ai.Administrators run the
DBMS_KAFKA_ADMpackage methods to configure and manage the Kafka cluster information. Either users grantedOSAK_ADMIN_ROLEor the Oracle DBA can create the operating system level Kafka Configuration Directory, and populate that directory with configuration files. Oracle SQL Access to Kafka administrators create the Oracle directory objects for Kafka cluster access directories. -
Application users of Kafka topic data are granted the
READprivileges required to access to theDBMS_KAFKApackages, so that they can access and use data accessed from Kafka cluster topics.
Example 22-1 Grant OSAK_ADMIN_ROLE to Kafka Administrator Users
In this example, the OSAK_ADMIN_ROLE is granted to user
kafka-admin:
GRANT OSAK_ADMIN_ROLE
TO kafka-admin;
Example 22-2 Grant User Access to Kafka Users
To enable applications to use Oracle SQL access to Kafka, you grant
DBMS_KAFKA access. These application users must already have
the following privileges on the source Kafka cluster and target Oracle AI
Database:
CREATE SESSIONCREATE TABLECREATE VIEW- Available quota on the tablespace where they access Kafka data
- Read access on the cluster access directory of a registered Kafka cluster
Parent topic: Oracle SQL Access to Kafka
22.8 Data Formats Supported with Oracle SQL Access to Kafka
Oracle SQL access to Kafka supports Kafka records represented in three
formats: delimited text data (for example, csv), JSON, and
Avro.
Kafka is without schemas, and format-neutral. Application data is stored as
opaque byte arrays in the key and value field of a Kafka record. Because the Kafka key
is used mainly for hashing data into Kafka partitions, only the value field of a Kafka
record is retrieved and rendered as an Oracle row. In this context, application data is
stored as opaque (uninterpreted) byte arrays in the key and value fields of a Kafka
message. The application is responsible for serialization and deserialization of the
data and for supplying a schema that defines the structure of the data format. In Oracle
SQL Access for Kafka, the data format and schema are specified in the options argument
to the DBMS_KAFKA.CREATE_[LOAD|STREAMING|SEEKABLE]_APP()
procedures.
Note:
Regardless of the format type, the tables and views created contain three additional columns: KAFKA_PARTITION,
KAFKA_OFFSET, and KAFKA_EPOCH_TIMESTAMP.
- JSON Format and Oracle SQL Access to Kafka
For JSON, Oracle SQL access to Kafka determines the columns for the table or view. - Delimited Text Format and Oracle SQL Access to Kafka
For delimited text formats, Oracle SQL access to Kafka creates views and temporary tables in the user schema with Kafka data. - Avro Formats and Oracle SQL Access to Kafka
For Avro formats, Oracle SQL access to Kafka uses the Avro schema to determine the data columns and the three metadata columns.
Parent topic: Oracle SQL Access to Kafka
22.8.1 JSON Format and Oracle SQL Access to Kafka
For JSON, Oracle SQL access to Kafka determines the columns for the table or view.
The following is an example of using options to display data for a JSON streaming application:
DECLARE
v_options VARCHAR2;
BEGIN
v_options := '{"fmt" : "JSON"}';
SYS.DBMS_KAFKA.CREATE_STREAMING_APP (
'ALPHA1',
'MYAPP',
'ExampleTopic',
v_options);
END;
/
With JavaScript Object Notation (JSON) data, Oracle SQL Access to Kafka
creates views and global temporary tables in the user schema over Kafka data. These
views are prefixed by ORA$DKV_ The temporary tables are prefixed by
ORA$DKVGTT_. The package DBMS_KAFKA.CREATE_xxx_APP uses a fixed schema to return JSON
data from a Kafka record.
For example:
SQL> describe ORA$DKVGTT_ALPHA1_MYAPP_0;
Name Null? Type
----------------------------------------- -------- ----------------------------
KAFKA_PARTITION NUMBER(38)
KAFKA_OFFSET NUMBER(38)
KAFKA_EPOCH_TIMESTAMP NUMBER(38)
VALUE VARCHAR2(4000)
With the VARCHAR2 type, the length of the VALUE
column is restricted by the maximum varchar2 length of your
database. Note that the VALUE column has the option to be of type
CLOB.
The KAFKA_ columns identify the partition id, the
offset, and the timestamp of the Kafka record. (The underlying timestamp
representation is an integer representing the number of milliseconds since Unix
epoch.)
The data in the value portion of the Kafka record is returned as text to the
VALUE column. The character encoding of the external text is
fixed as AL32UTF8. Oracle SQL access to Kafka logic does not check
for valid JSON syntax in the VALUE columns. However, faulty JSON is
discovered when JSON operators in a SQL query attempt to parse the
VALUE data.
Parent topic: Data Formats Supported with Oracle SQL Access to Kafka
22.8.2 Delimited Text Format and Oracle SQL Access to Kafka
For delimited text formats, Oracle SQL access to Kafka creates views and temporary tables in the user schema with Kafka data.
With delimited data, such as CSV or comma-delimited data, Oracle SQL Access
to Kafka creates views and global temporary tables in the user schema over Kafka data.
These views are prefixed by ORA$DKV_. The temporary tables are prefixed
by ORA$DKVGTT_. With DSV format, the data columns are based on the
reference table passed in the options plus the three metadata columns.
The temporary tables and views created with Oracle SQL access to Kafka
delimited text format data have columns that reflect the shape of the delimited text
data in the value field of a Kafka record. Oracle SQL access to Kafka converts text data
into the native Oracle data types expressed in the table and view definition. The
character encoding of the external text is fixed as AL32UTF8.
When a Kafka record is retrieved, a canonical layout is created, starting with the Kafka
partition identifier (INTEGER), Kafka record offset
(INTEGER), and Kafka record timestamp (INTEGER),
followed by delimited text data in the Kafka value. In other words, the Kafka data is
flattened out and streamed as rows of pure delimited text fields, using the order of the
view schema definition.
The following Oracle data types are supported:
INTEGER,INT,NUMBERCHAR,VARCHAR2NCHAR,NVARCHAR2CLOB,NCLOB,BLOBFLOAT,BINARY_FLOAT,BINARY_DOUBLETIMESTAMP,DATETIMESTAMP WITH TIME ZONE,TIMESTAMP WITH LOCAL TIME ZONEINTERVALRAWBOOLEAN
To simplify the specification of delimited text at application creation time, you provide the name of a table that describes the columns of the user data in the order that they are physically ordered in the Kafka record value field. Oracle SQL Access to Kafka uses that name in views and temporary tables.
The following example shows the shape of the delimited text data table (a reference table, or reftable) provided when you create an Oracle SQL Access to Kafka application. Again, the Kafka value field reflects the identical physical order and the desired data type conversion from the delimited text.
You should preserve reftables after they are used for a
CREATE_xxx_APP call to
create Oracle SQL Access to Kafka views and temporary tables reflecting the shape.
You will require the reftable to recreate views.
SQL> describe FIVDTI_SHAPE;
Name Null? Type
----------------------------------------- -------- ----------------------------
F1 NUMBER
I2 NUMBER
V3 VARCHAR2(50)
D4 DATE
T5 TIMESTAMP(6)
V6 VARCHAR2(200)
I7 NUMBER
The reference table describes the fields in the Kafka record value only.
For example, the reftable FIVDTI_SHAPE could support Kafka records
where F1, I2, V3, D4, T5, V6, I7 are fields in the Kafka record value. The fields in
the Kafka record value must be separated by delimiters (for example, comma
delimiters).
Note:
The reference table cannot include invisible (hidden) columns. The ordering of the columns must match the order of the data values from the Kafka record. An invisible column has aCOLUMN_ID of NULL, so its position in
the column list cannot be determined.
Oracle SQL Access to Kafka temporary tables created for data described by the
FIVDTI_SHAPE table will have the following schema:
SQL> describe ORA$DKVGTT_ALPHA1_MYAPP__0;
Name Null? Type
----------------------------------------- -------- ----------------------------
KAFKA_PARTITION NUMBER(38)
KAFKA_OFFSET NUMBER(38)
KAFKA_EPOCH_TIMESTAMP NUMBER(38)
F1 NUMBER
I2 NUMBER
V3 VARCHAR2(50)
D4 DATE
T5 TIMESTAMP(6)
V6 VARCHAR2(200)
I7 NUMBER
Parent topic: Data Formats Supported with Oracle SQL Access to Kafka
22.8.3 Avro Formats and Oracle SQL Access to Kafka
For Avro formats, Oracle SQL access to Kafka uses the Avro schema to determine the data columns and the three metadata columns.
- About Using Avro Format with Oracle SQL Access to Kafka
Learn how Oracle SQL access to Kafka makes Kafka data in the Avro format available for use in Oracle database tables and views. - Primitive Avro Types Supported with Oracle SQL Access to Kafka
To use Apache Avro Schema primitive type names in the database, Oracle converts these types to SQL data types. - Complex Avro Types Supported with Oracle SQL Access to Kafka
To use Apache Avro Schema complex type names in the database, Oracle converts these types to supported SQL data types. - Avro Logical Types Supported with Oracle SQL Access to Kafka
To use Apache Avro Schema logical type names in the database, Oracle converts these types to supported SQL data types.
Parent topic: Data Formats Supported with Oracle SQL Access to Kafka
22.8.3.1 About Using Avro Format with Oracle SQL Access to Kafka
Learn how Oracle SQL access to Kafka makes Kafka data in the Avro format available for use in Oracle database tables and views.
To enable the use of the Apache Avro formatted data by applications in Oracle database
table and views, Oracle SQL Access for Kafka converts the data format based on the Avro
schema specified in the options argument to the
DBMS_KAFKA.CREATE_[LOAD|STREAMING|SEEKABLE]_APP() procedures. This
means that an Oracle SQL access to Kafka application can only support a single Avro
schema for a Kafka topic. It is not supported to have messages with different Avro
schemas in the topic stream. If the schema evolves, then you must create a new Oracle
SQL access to Kafka application.
Parent topic: Avro Formats and Oracle SQL Access to Kafka
22.8.3.2 Primitive Avro Types Supported with Oracle SQL Access to Kafka
To use Apache Avro Schema primitive type names in the database, Oracle converts these types to SQL data types.
Table 22-1 Avro Primitive types and Oracle Type Conversions for Oracle SQL Access to Kafka
| Type Description | Avro Primitive Type | Oracle Type |
|---|---|---|
|
null/no value |
|
|
|
(not applicable) |
|
|
|
32-bit signed integer |
|
|
|
64-bit signed integer |
|
|
|
IEEE 32-bit floating point |
|
|
|
IEEE 64-bit floating point |
|
|
|
byte array/binary |
|
|
|
UTF-8 encoded character string |
|
|
The following example Avro schema defines a record that uses all Avro primitive types:
{
"type": "record",
"name": "primitives",
"fields": [
{ "name": "f_null", "type": "null" },
{ "name": "f_boolean", "type": "boolean" },
{ "name": "f_int", "type": "int" },
{ "name": "f_long", "type": "long" },
{ "name": "f_float", "type": "float" },
{ "name": "f_double", "type": "double" },
{ "name": "f_bytes", "type": "bytes" },
{ "name": "f_string", "type": "string" }
]
}
If you created Oracle SQL access to Kafka temporary tables for Avro data by using this example Avro schema, then the temporary tables have the following schema:
describe ORA$DKVGTT_ALPHA1_MYAPP__0;
Name Null? Type
----------------------------------------- -------- ----------------------------
KAFKA_PARTITION NUMBER(38)
KAFKA_OFFSET NUMBER(38)
KAFKA_EPOCH_TIMESTAMP NUMBER(38)
F_NULL CHAR(1)
F_BOOLEAN NUMBER(1)
F_INT NUMBER(38)
F_LONG NUMBER(38)
F_FLOAT BINARY_FLOAT
F_DOUBLE BINARY_DOUBLE
F_BYTES BLOB
F_STRING VARCHAR2(4000)
The VARCHAR2 type length (in this example, for the
F_STRING column) is determined by the maximum
varchar2 length of your database.
Parent topic: Avro Formats and Oracle SQL Access to Kafka
22.8.3.3 Complex Avro Types Supported with Oracle SQL Access to Kafka
To use Apache Avro Schema complex type names in the database, Oracle converts these types to supported SQL data types.
Description
The Apache Avro complex data types take specified attributes. To use the Avro complex types, Oracle SQL access to Kafka converts them to Oracle types, as specified in the following table.
Table 22-2 Avro Complex types and Oracle Type Conversions for Oracle SQL Access to Kafka
| Avro Complex Type | Oracle Type | Type Description |
|---|---|---|
|
|
|
A |
|
|
|
An Avro An avro |
|
|
|
Struct field. The |
|
|
|
A Values can be of any type. |
|
|
|
An array of any type The |
Note:
The Avro complex typesrecord, map, and
array are converted to a JSON format string before conversion
to a VARCHAR2 type.
The following example Avro schema defines a record that uses all Avro complex types:
{
"type" : "record",
"name" : "complex",
"fields" : [
{ "name" : "f_fixed",
"type" : { "type" : "fixed", "name" : "ten", "size" : 10}
},
{ "name" : "f_enum",
"type" : { "type" : "enum", "name" : "colors",
"symbols" : [ "red", "green", "blue" ] }
},
{ "name" : "f_record",
"type" : {"type" : "record", "name" : "person",
"fields" : [ { "name" : "first_name", "type" : "string" },
{ "name" : "last_name", "type" : "string"} ] }
},
{ "name" : "f_map",
"type" : { "type" : "map", "values" : "int" }
},
{ "name" : "f_array",
"type" : {"type" : "array", "items" : "string" }
}]
}
If you created Oracle SQL access to Kafka temporary tables for Avro data by using this example Avro schema, then the temporary tables have the following schema:
describe ORA$DKVGTT_ALPHA1_MYAPP__0;
Name Null? Type
----------------------------------------- -------- ----------------------------
KAFKA_PARTITION NUMBER(38)
KAFKA_OFFSET NUMBER(38)
KAFKA_EPOCH_TIMESTAMP NUMBER(38)
F_FIXED BLOB
F_ENUM VARCHAR2(4000)
F_RECORD VARCHAR2(4000)
F_MAP VARCHAR2(4000)
F_ARRAY VARCHAR2(4000)
The VARCHAR2 type length (in this example, for the
F_ENUM, F_RECORD, F_MAP and
F_ARRAY columns) is determined by the maximum
varchar2 length of your database.
Parent topic: Avro Formats and Oracle SQL Access to Kafka
22.8.3.4 Avro Logical Types Supported with Oracle SQL Access to Kafka
To use Apache Avro Schema logical type names in the database, Oracle converts these types to supported SQL data types.
Description
An Avro logical type is an Avro primitive or complex type with extra attributes to represent a derived type. Logical types are converted to Oracle types as specified in the following table.
Table 22-3 Avro Complex types and Oracle Type Conversions for Oracle SQL Access to Kafka
| Type Description | Avro Logical Type | Oracle Type |
|---|---|---|
|
decimal: arbitrary-precision signed decimal number of the form unscaled × 10-scale |
decimal (bytes, fixed) |
|
|
UUIDs (Universally Unique Identifiers), also known as GUIDS (Globally Unique Identifiers): These IDs are randomly generated, in conformity with RFC-4122. |
UUID (string) |
Not supported. |
|
date A date within the calendar, with no reference to a particular time zone or time of day Number of days from the Unix epoch, 1 January 1970 |
date (int) |
|
|
time (millis): A time of day, with no reference to a particular calendar, time zone or date, represented as number of milliseconds after midnight: 00:00:00.000 |
time-millis (int) |
|
|
time (micros): A time of day, with no reference to a particular calendar, time zone or date number of microseconds after midnight: 00:00:00.000000 |
time-micros (long) |
|
|
timestamp (millis) UTC: An instant on the global timeline, independent of a particular time zone or calendar number of milliseconds from the Unix epoch, 1 January 1970: 00:00:00.000 UTC |
timestampmillis (long) |
|
|
timestamp (micros) UTC: An instant on the global timeline, independent of a particular time zone or calendar number of microseconds from the Unix epoch, 1 January 1970: 00:00:00.000000 UTC |
timestampmicros (long) |
|
|
duration An amount of time defined by a number of months, days and milliseconds. |
fixed (size:12) |
Not supported. |
Note:
Decimal types, which are used with the logical types time-millis, time-macros, timestampmillis and timestampmicros, are internally stored as byte arrays (fixed or not). Depending on the Avro writer, some of these arrays store the string representation of the decimal, while others store the unscaled value. To avoid presenting ambiguous data, Oracle recommends that you use the optionavrodecimaltype to declare explicitly which
representation is used. If this option is not explicitly specified, then the default
option for Oracle SQL access to Kafka is that the unscaled representation of the
data is stored in the decimal columns of the file.
The following example Avro schema defines a record that uses all Avro logical types:
{
"type" : "record",
"name" : "logical",
"fields" : [ {
"name" : "f_decimal",
"type" : {
"type" : "bytes",
"logicalType" : "decimal",
"precision" : 4,
"scale" : 2
}
}, {
"name" : "f_date",
"type" : {
"type" : "int",
"logicalType" : "date"
}
}, {
"name" : "f_time_millis",
"type" : {
"type" : "int",
"logicalType" : "time-millis"
}
}, {
"name" : "f_time_micros",
"type" : {
"type" : "long",
"logicalType" : "time-micros"
}
}, {
"name" : "f_timestamp_millis",
"type" : {
"type" : "long",
"logicalType" : "timestamp-millis"
}
}, {
"name" : "f_timestamp_micros",
"type" : {
"type" : "long",
"logicalType" : "timestamp-micros"
}
} ]
}
If you created Oracle SQL access to Kafka temporary tables for Avro data by using this example Avro schema, then the temporary tables have the following schema:
describe ORA$DKVGTT_ALPHA1_MYAPP__0;
Name Null? Type
----------------------------------------- -------- ----------------------------
KAFKA_PARTITION NUMBER(38)
KAFKA_OFFSET NUMBER(38)
KAFKA_EPOCH_TIMESTAMP NUMBER(38)
F_DECIMAL NUMBER
F_DATE DATE
F_TIME_MILLIS TIMESTAMP(3)
F_TIME_MICROS TIMESTAMP(6)
F_TIMESTAMP_MILLIS TIMESTAMP(3)
F_TIMESTAMP_MICROS TIMESTAMP(6)
Parent topic: Avro Formats and Oracle SQL Access to Kafka
22.9 Configuring Access to a Kafka Cluster
You can configure access to secured Kafka clusters, or non-secured Kafka clusters.
- Create a Kafka Access Directory
The Oracle SQL access to Kafka administrator must create a Kafka Access Directory for each Kafka cluster to control database user access to the cluster. - The Kafka Configuration File (osakafka.properties)
To access Kafka clusters, you must create a configuration file (osak config dir) that contains the information required to access the Kafka cluster. - Security Configuration Access for the Kafka Configuration Directory
Configure OSAK to access security-enabled Apache clusters.
Parent topic: Oracle SQL Access to Kafka
22.9.1 Create a Kafka Access Directory
The Oracle SQL access to Kafka administrator must create a Kafka Access Directory for each Kafka cluster to control database user access to the cluster.
The Kafka Access Directory is an Oracle
directory object that is used to decide which Oracle users are allowed to access
that Oracle SQL Access to Kafka cluster. Each Kafka cluster requires its own
dedicated Kafka Access Directory. As the administrator for Oracle SQL access to
Kafka, you manage access by first creating the Kafka Access Directory object for
each cluster. You then grant READ privileges on this directory to database users who
need to access the Kafka cluster. The Kafka Access Directory must be created before
you call the DBMS_KAFKA_ADM.REGISTER_CLUSTER() procedure.
Example 22-3 Creating a Kafka Access Directory and Granting READ Access
First create a Kafka Configuration Directory object. In this example,
the object is osak_kafkaclus1_access:
CREATE DIRECTORY osak_kafkaclus1_access AS '';
After the Kafka Cluster is successfully registered, the Oracle SQL access to Kafka administrator grants READ access on this directory to users.
In this example, the user example_user is granted access to
osak_kafkaclus1_access:
GRANT READ ON DIRECTORY osak_kafkaclus1_access TO example_user;
Parent topic: Configuring Access to a Kafka Cluster
22.9.2 The Kafka Configuration File (osakafka.properties)
To access Kafka clusters, you must create a configuration file (osak
config dir) that contains the information required to access the
Kafka cluster.
- About the Kafka Configuration File
Theosakafka.propertiesfile contains configuration information required to access secured Kafka Clusters, as well as additional information about Oracle SQL access to Kafka. - Oracle SQL Access for Kafka Configuration File Properties
To create anosakafka.propertiesfile for the Kafka Configuration Directory, review and specify the properties as described here. - Creating the Kafka Configuration Directory
To access secure Kafka clusters, you must create a Kafka Configuration Directory for each Kafka cluster.
Parent topic: Configuring Access to a Kafka Cluster
22.9.2.1 About the Kafka Configuration File
The osakafka.properties file contains configuration
information required to access secured Kafka Clusters, as well as additional information
about Oracle SQL access to Kafka.
The Kafka Configuration File,
osakafka.properties, is created in the Kafka Configuration
Directory. The Oracle SQL access to Kafka administrator (granted
OSAK_ADMIN_ROLE) creates the osakafka.properties
file. This file is used by the DBMS_KAFKA_ADM package to make
connections to an Apache Kafka cluster.
The Oracle SQL access to Kafka administrator creates a Kafka Configuration Directory in which to store the configuration files for each Kafka Cluster. Each Kafka Configuration Directory has its own Kafka Configuration File. To manage access to Apache Kafka clusters, only an Oracle SQL access to Kafka administrator has read and write access to Kafka Configuration Directories for Kafka clusters. No other users are granted any privileges on Kafka Configuration Directories or Kafka Configuration Files.
Functions of the Kafka Configuration File
The osakafka.properties file is similar to the consumer
properties file used by a Kafka Consumer using librdkafka. Secure
Apache Kafka clusters require credential files. These additional files again are
like the ones required by a Kafka Consumer using librdkafka. The
osakafka.properties file has the following properties:
- It is created and managed by the Oracle SQL access to Kafka administrator as part of the setup and configuration needed to access a Kafka cluster.
- It consists of a text file of key-value pairs. Each line has the
format
key=valuedescribing the key and the value, and is terminated with a new line. The new line character cannot be part of the key or value. - It contains Oracle SQL access for Kafka parameters, which are
identified with the
osakprefix. - It contains debugging properties for Oracle SQL access to Kafka.
- It is used by the
DBMS_KAFKA_ADMpackage to make connections to a Kafka cluster usinglibrdkafkaAPIs. - It is required for secure Kafka clusters, to store security
configuration properties required to connect to Kafka clusters using
librdkakainterfaces, Oracle SQL access to Kafka tuning properties, which are identified with theosakprefix, and debugging properties. For secure cluster access, the key-value pairs contain include cluster configuration files certificates and client public and private keys. - For SSL connections, the Kafka
configuration file no longer stores separate PEM file references for
ssl.ca.location,ssl.key.location, andssl.certificate.location. Instead, the file stores the Oracle Wallet location throughssl.wallet.location. The wallet password is stored in a database credential, and the credential name is provided as a parameter so the password can be retrieved and passed to the connection layer at runtime. - It is optional for non-secure Kafka clusters, and can be used to specify the tuning and debugging properties for cluster connections.
The osakafka.properties file is stored in the Kafka
Configuration Directory, in the path ORACLE_base/osak/clusters/cluster-name/config, where Oracle_base is the Oracle base directory of the target
Oracle Database, and cluster-name
is the name of the Kafka Cluster whose access information is stored in the
configuration file.
Guidelines for Creating Kafka Configuration Files
As part of the setup and configuration required to access an Apache
Kafka cluster, you are required to create an Oracle SQL access for Kafka
administrator. This administrator owns the Kafka Configuration File. The information
in this file is used to set session context in C interfaces, which make connections
to a Kafka cluster using librdkafka APIs.
The SYS.DBMS_KAFKA_SEC_ALLOWED_PROPERTIES system table
contains a pre-populated list of supported consumer configuration properties,
including security properties. For extensibility, SYS can add more
properties to this table with certain restrictions.
The DBMS_KAFKA_ADM.REGISTER_CLUSTER() procedure reads only those
properties from the osakafka.properties file that are also listed
in the SYS.DBMS_KAFKA_SEC_ALLOWED_PROPERTIES system table. Any
extra properties are ignored.
Parent topic: The Kafka Configuration File (osakafka.properties)
22.9.2.2 Oracle SQL Access for Kafka Configuration File Properties
To create an osakafka.properties file for the Kafka
Configuration Directory, review and specify the properties as described here.
osakafka.properties File Processing
The properties specified in the osakafka.properties must be those
listed in the table that follows. If you provide any other key-value pairs, then
these values are ignored.
Note the following:
- Property names with the
osakprefix are internal tuning properties or debugging properties. - Property names without the
osakprefix are Kafka consumer properties, which are used bylibrdkafka. For a complete list of properties, refer to the documentation for the Apache Kafka C/C++ client library (librdkafka) documentation.
| Property | Allowed Values | Description |
|---|---|---|
|
|
all |
Used to debug connectivity issues. |
|
|
1024 * 1024 |
For |
|
|
PLAINTEXT SSL SASL_PLAIN_TEXT SASL_SSL |
Security Protocol used to communicate with Kafka brokers |
|
|
GSSAPI PLAIN SCRAM-SHA-256 SCRAM-SHA-512 |
SASL mechanism to use for authentication NOTE: Despite the plural name, only one mechanism must be configured. This property is allowed to provide backward
compatibility for older Kafka clusters. Where possible, Oracle
recommends that you use the property |
|
|
GSSAPI PLAIN SCRAM-SHA-256 SCRAM-SHA-512 |
Simple Authentication and Security Layer (SASL) mechanism to use for authentication |
|
|
|
The user name required for authenticating to the Kafka cluster. The corresponding password value for this user name must
be stored as a database credential, using the
|
|
|
|
The Client Kerberos principal name |
|
|
|
The Kerberos ticket cache file Example: This file must exist in the cluster configuration directory. |
|
|
|
The Kerberos configuration of the Kafka Cluster. Example
This file must exist in the cluster configuration directory |
|
|
|
The primary name of the Kerberos principal, which is the name that
appears before the slash ( |
|
|
Valid Values: https none |
Endpoint identification algorithm to validate the Kafka broker hostname, using a Kafka broker certificate. Values are as follows:
Default Value: |
|
|
Path to the Oracle Wallet directory or wallet file used for SSL authentication. |
The wallet must contain the trusted certificate and, if required, the
client certificate and private key. The wallet path should be
provided as an absolute path or as a |
|
|
The database credential name that stores the Oracle Wallet password. |
The wallet password is stored in a database credential, and the credential name is provided as a parameter so the password can be retrieved and passed to the connection layer at runtime. |
Example
The following is an example osakafka.properties file
that specifies security protocol SSL, and provides authentication
by using a Certification Authority (CA) certificate on the client:
security.protocol=SSL
ssl.wallet.location=/u01/app/oracle/osak/kafkaclus1/config/wallet/ewallet.p12
ssl.endpoint.identification.algorithm=https
When Oracle Wallet is used, the CA certificate, client certificate, and private key are imported into the wallet before the cluster is registered.
Related Topics
Parent topic: The Kafka Configuration File (osakafka.properties)
22.9.2.3 Creating the Kafka Configuration Directory
To access secure Kafka clusters, you must create a Kafka Configuration Directory for each Kafka cluster.
The Oracle SQL access to Kafka administrator creates the Kafka
Configuration Directory, which is an operating system directory in the Oracle base
directory. The path is Oracle-base/osak/,
where cluster_name/configOracle-base is the Oracle
base directory, and cluster_name is the value of
the cluster name parameter passed to the
SYS.DBMS_KAFKA_ADM.REGISTER_CLUSTER call. Each Kafka cluster
requires its own dedicated Kafka Configuration Directory.
This directory must contain all the configuration files and certificates needed to access the Kafka Cluster:
osakafka.propertiesfile.- Security files listed in the
osakafka.propertiesfile
In the following example, the Oracle base directory is
/u01/app/oracle, and the cluster name is
kafkaclus1:
mkdir u01/app/oracle/osak/kafkaclus1/config;
CREATE DIRECTORY osak_kafkaclus1_config AS
'u01/app/oracle/osak/kafkaclus1/config' ;Parent topic: The Kafka Configuration File (osakafka.properties)
22.9.3 Security Configuration Access for the Kafka Configuration Directory
Configure OSAK to access security-enabled Apache clusters.
To configure access to a secure Kafka Cluster, the administrator must
add several configuration files to the Kafka Configuration Directory, and
prepare a DBMS_CREDENTIAL object. The specific
configuration depends on which security protocol is used to configure
security on the Kafka cluster.
For secured Kafka clusters, Oracle SQL Access to Kafka uses Oracle
Wallet-based configuration instead of separate PEM files for CA
certificates, client certificates, and private keys. The cluster
configuration directory contains osakafka.properties and
the wallet files needed for the connection. Legacy PEM-based configurations
are no longer supported.
Caution:
Passwords must not be embedded in configuration files. Wallet passwords are stored in database credentials and resolved during cluster registration.Note:
The Kerberoskeytab file is not required, because Kerberos
ticket management is handled outside of Oracle SQL access to Kafka.
- SASL_SSL/GSSAPI
OSAK configuration settings forSASL_SSLusingGSSAPIauthentication protocol. - SASL_PLAINTEXT/GSSAPI
OSAK configuration settings forSASL_PLAINTEXTusingGSSAPIauthentication protocol. - SASL_PLAINTEXT/SCRAM-SHA-256
OSAK configuration settings forSASL_PLAINTEXTusingSCRAM-SHA-256authentication protocol. - SASL_SSL/PLAIN
OSAK configuration settings forSASL_SSLusingPLAINauthentication protocol. - SSL with Client Authentication
OSAK configuration settings forSSLauthentication protocol. - SSL without Client Authentication
OSAK configuration settings forSSLwithout client authentication.
Parent topic: Configuring Access to a Kafka Cluster
22.9.3.1 SASL_SSL/GSSAPI
OSAK configuration settings for SASL_SSL using
GSSAPI authentication protocol.
Description
The SASL_SSL/GSSAPI protocol specifies Kerberos
authentication with encryption. The Kerberos tickets must be managed externally
(outside Oracle SQL access To Kafka).
DBMS_CREDENTIAL
Not required, because Kerberos tickets are managed externally.
Required Files in the Kafka Configuration Directory
- The certificate authority (CA) file
- The
osakafka.propertiesfile.
In the following example, the property
security.protocol specifies SASL_SSL. The
property sasl.mechanism specifies GSSAPI. SSL
certificates are managed using an Oracle Wallet. The wallet location is specified
using ssl.wallet.location.
security.protocol=SASL_SSL
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.kerberos.config=krb5.conf
sasl.kerberos.ccname=krb5ccname_osak
sasl.kerberos.principal=kafkaclient/<FQDN-hostname>@<Realm>
ssl.wallet.location=/path/to/wallet/ewallet.p12
ssl.endpoint.identification.algorithm=https
22.9.3.2 SASL_PLAINTEXT/GSSAPI
OSAK configuration settings for SASL_PLAINTEXT using
GSSAPI authentication protocol.
Description
The SASL_PLAINTEXT/GSSAPI protocol specifies Kerberos
authentication without SSL encryption. Kerberos tickets must be managed externally
(outside Oracle SQL Access to Kafka).
Because this protocol does not use SSL, Oracle Wallet is not required for this configuration.
Caution:
With SASL_PLAINTEXT, data is sent in clear text over the network This configuration is unsecure and not recommended for production systems.DBMS_CREDENTIAL
Not required, because Kerberos tickets are managed externally.
Required Files in the Kafka Configuration Directory
- The
osakafka.propertiesfile. - Kerberos configuration files, including
krbS.confand the Kerberos ticket cache (for example,krb5ccname_osak)
In the following example, the property security.protocol specifies
SASL_PLAINTEXT, and the property
sasl.mechanism specifies GSSAPI.
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
sasl.kerberos.principal=kafkaclient/FQDN-hostname@Realm
sasl.kerberos.config=krb5.conf
sasl.kerberos.ccname=krb5ccname_osak
22.9.3.3 SASL_PLAINTEXT/SCRAM-SHA-256
OSAK configuration settings for SASL_PLAINTEXT using
SCRAM-SHA-256 authentication protocol.
Description
The SASL_PLAINTEXT/SCRAM-SHA-256 protocol specifies
SASL SCRAM authentication without SSL encryption.
Because this protocol does not use SSL, Oracle Wallet is not required
for this configuration. Oracle Wallet is only applicable for SSL-enabled protocols
such as SSL and SASL_SSL.
Caution:
With SASL_PLAINTEXT, data is sent in clear text over the network This configuration is unsecure and not recommended for production systems.DBMS_CREDENTIAL
Required, to store the password for the SASL user name.
Required Files in the Kafka Configuration Directory
- The
osakafka.propertiesfile.
Oracle Wallet is not used for SASL_PLAINTEXT configurations because
SSL/TLS is not enabled.
In the following example, the property security.protocol
specifies SASL_PLAINTEXT, and the property
sasl.mechanism specifies SCRAM-SHA-256.
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.username=testuser
Example 22-4 Credential Setup
Create a database credential to store the SASL password:
BEGIN
DBMS_CREDENTIAL.CREATE_CREDENTIAL(
credential_name => 'CRED_SASL_USER',
username => 'testuser',
password => 'user_password'
);
END;
22.9.3.4 SASL_SSL/PLAIN
OSAK configuration settings for SASL_SSL using
PLAIN authentication protocol.
Description
The SASL_SSL/PLAIN protocol specifies SASL authentication using the
PLAIN mechanism over SSL encryption. This configuration is commonly used for Oracle
Streaming Service (OSS) Kafka clusters and other secured Kafka environments.
Starting with Oracle AI Database 26ai (23.26.2), SSL configuration requires the use
of Oracle Wallet. All trusted certificates must be stored in an Oracle Wallet
created using the orapki tool. Legacy PEM-based SSL configuration
using ssl.ca.location is no longer supported.
DBMS_CREDENTIAL
Required to store the sasl.password and the Oracle
Wallet password.
Required Files in the Kafka Configuration Directory
- The
osakafka.propertiesfile. - The Oracle Wallet files (for example,
ewallet.p12) containing trusted CA certificates.
Example 22-5 OSS Cluster osakafka.properties File
In the following example, the property security.protocol
specifies SASL_SSL, and the property
sasl.mechanism specifies PLAIN. SSL
certificates are managed using an Oracle Wallet:
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.username=<tenancyName>/<username>/<streamPoolID>
#-- limit request size to 1 MB per partition
max.partition.fetch.bytes=1048576
ssl.wallet.location=ewallet.p12
ssl.endpoint.identification.algorithm=https
Example 22-6 Non-OSS Cluster osakafka.properties File
In the following example, the property security.protocol
specifies SASL_SSL, and the property
sasl.mechanism specifies PLAIN. The Oracle
Wallet replaces the legacy CA file configuration:
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.username=kafkauser
ssl.wallet.location=ewallet.p12
ssl.endpoint.identification.algorithm=https
22.9.3.5 SSL with Client Authentication
OSAK configuration settings for SSL authentication
protocol.
Description
The Secure Socket Layer (SSL) protocol specifies SSL
with client authorization.
Note:
Starting with Oracle AI Database 26ai (23.26.2), SSL configuration for Oracle SQL Access to Kafka requires the use of Oracle Wallet. All certificates and keys must be stored in an Oracle Wallet created using theorapki tool. Legacy PEM-based configurations are no
longer supported.
DBMS_CREDENTIAL
Required, to store the password for the Oracle Wallet.
Required Files in the Kafka Configuration Directory
- The
osakafka.propertiesfile. - The Oracle Wallet files (for example,
ewallet.p12). These include the trusted certificate and client credentials. For example:- Trusted CA certificates
- Client certificate (if required)
- Client private key (if required)
- A database credential that stores the wallet password.
Example 22-7 SSL
osakafka.properties File
In the following example, the property security.protocol
specifies SSL, and the property
ssl.wallet.location specifies the Oracle Wallet location.
security.protocol=SSL
ssl.wallet.location=ewallet.p12
ssl.endpoint.identification.algorithm=https
Example 22-8 Credential Setup
Create a database credential to store the Oracle Wallet password:
BEGIN
DBMS_CREDENTIAL.CREATE_CREDENTIAL(
credential_name => 'CRED_SSL_WALLET',
username => 'wallet_user',
password => 'wallet_password'
);
END;
/
Example 22-9 Cluster Registration
Specify the wallet credential name during cluster registration:
SELECT DBMS_KAFKA_ADM.REGISTER_CLUSTER(
cluster_name => 'KAFKACLUS1',
bootstrap_servers => 'mykafkabootstrap-host:9092',
kafka_provider => DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE,
cluster_access_dir => 'OSAK_KAFKACLUS1_ACCESS',
credential_name => NULL,
cluster_config_dir => 'OSAK_KAFKACLUS1_CONFIG',
cluster_description => 'My test cluster kafkaclus1',
options => '{"walletcredname":"CRED_SSL_WALLET"}'
)
FROM dual;
22.9.3.6 SSL without Client Authentication
OSAK configuration settings for SSLwithout client
authentication.
Description
The Secure Socket Layer (SSL) protocol specifies SSL
without client authorization.
Starting with Oracle AI Database 26ai (23.26.2), SSL configuration uses Oracle Wallet to store trusted certificates. Even when client authentication is not required, the trusted CA certificate must be stored in the wallet. The legacy PEM-based SSL properties are desupported. Replace PEM-based SSL properties with wallet-based configuration.
DBMS_CREDENTIAL
Required, to store the password for the Oracle Wallet.
Required Files in the Kafka Configuration Directory
- The
osakafka.propertiesfile. - The Oracle Wallet files (for example,
ewallet.p12) containing trusted certificates.
Example 22-10 SSL
osakafka.properties File
In the following example, the property security.protocol
specifies SSL, and the property
ssl.wallet.location specifies the Oracle Wallet location.
security.protocol=SSL
ssl.wallet.location=ewallet.p12
ssl.endpoint.identification.algorithm=https
Credential Setup
BEGIN
DBMS_CREDENTIAL.CREATE_CREDENTIAL(
credential_name => 'CRED_SSL_WALLET',
username => 'wallet_user',
password => 'wallet_password'
);
END;
/
Cluster Registration
Specify the wallet credential name during cluster registration:
SELECT DBMS_KAFKA_ADM.REGISTER_CLUSTER(
cluster_name => 'KAFKACLUS2',
bootstrap_servers => 'mykafkabootstrap-host:9092',
kafka_provider => DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE,
cluster_access_dir => 'OSAK_KAFKACLUS2_ACCESS',
credential_name => NULL,
cluster_config_dir => 'OSAK_KAFKACLUS2_CONFIG',
cluster_description => 'My test cluster kafkaclus2',
options => '{"walletcredname":"CRED_SSL_WALLET"}'
)
FROM dual;
22.10 Creating Oracle SQL Access to Kafka Applications
Oracle SQL can access Kafka data using loading, streaming, or seekable applications created with DBMS_KAFKA.
Oracle SQL access to Kafka provides the following application modes that you can use to attach to the Apache Kafka cluster:
- Loading: Use to load data from a Kafka topic into an Oracle Database table.
- Streaming: Use to read sequentially through a Kafka topic.
- Seekable: Use to gain random access to a Kafka topic between starting and ending timestamps that you designate.
Choose the type of application that you want to create, depending on the kind of access to Kafka topics that you require:
DBMS_KAFKA.CREATE_LOAD_APPcreates an application that can be used in Loading mode.DBMS_KAFKA.CREATE_STREAMING_APPcreates an application that can be used in Streaming mode.DBMS_KAFKA.CREATE_SEEKABLE_APPcreates an application that can used in Seekable mode.
Example 22-11 Creating a Streaming Application with Four Views for a Kafka Topic
In the following example, a streaming application is created to use a set of four views with temporary tables for a Kafka topic that has four (4) partitions. Each view creates a temporary table. Each view (and temporary table) is associated with one partition of the Kafka topic:
DECLARE
v_options VARCHAR2(4000);
BEGIN
v_options := '{"fmt":"DSV","reftable":"user_shape_table_name"}';
SYS.DBMS_KAFKA.CREATE_STREAMING_APP(
'ExampleCluster',
'ExampleApp',
'ExampleTopic',
v_options,
4
);
END;
/
Example 22-12 Creating a Streaming Application with One View for a Kafka Topic
In the following example, a streaming application is created to use one view (1) with a temporary table where the temporary tables for a Kafka topic has four partitions. The view (a temporary table) is associated with the entire Kafka topic:
DECLARE
v_options VARCHAR2(4000);
BEGIN
v_options := '{"fmt":"DSV","reftable":"user_shape_table_name"}';
SYS.DBMS_KAFKA.CREATE_STREAMING_APP(
'ExampleCluster',
'ExampleApp',
'ExampleTopic',
v_options,
1
);
END;
/
Parent topic: Oracle SQL Access to Kafka
22.11 Security for Kafka Cluster Connections
Oracle SQL Access to Kafka supports access to Kafka and Oracle Streaming Service (OSS), using various security mechanisms, such as SSL, SASL, and Kerberos.
Note:
The credentials used to access the Kafka cluster must have access to both the Kafka broker metadata, as well as any topics that will be part of any Oracle SQL access to Kafka application. If there are access control lists (ACLs) enabled for the credentials, then ensure that access is granted to both the brokers and to the Kafka topics. In a shared Oracle Real Application Clusters (Oracle RAC) environment, security credentials should be in a shared location, not local to a cluster member node.Secure Kafka Clusters
To maintain securely encrypted data transmission between Oracle Database
and clusters, Oracle SQL access to Kafka employs several security protocols. For
access to secure Kafka clusters and Oracle Streaming Services (OSS) clusters,
security configuration files are used. These operating system files must exist in
the cluster configuration directory. The cluster configuration Oracle directory
object is created to access the cluster configuration files. Only the
osak_admin_role is granted READ access to this
directory. The cluster configuration files are readable only by the
osak_admin_role. The cluster configuration files include the
osakafka.properties file. Keys and Certificates for SSL are
stored in the Oracle keystore.
Note:
The secure access method is now Oracle Wallet-based SSL
configuration. Secure Kafka clusters should use a wallet created with
orapki, and the wallet should contain the CA certificate
and client credentials required by the connection.
The wallet password must be stored as a database credential and referenced during cluster registration.
The cluster access Oracle directory object is used to control access to the Kafka cluster. This directory object does not contain any configuration files. Kafka sessions are exclusive to individual PDBs in the multitenant environment. Each PDB where you want to create an application to connect to a Kafka broker must create its own application.
No passwords must be embedded in files. Any embedded password properties
in the osakafka.properties file will be ignored. All passwords must
be stored as database credentials using the DBMS_CREDENTIAL
package.
Kafka Clusters Using Kerberos Authentication
For Kafka clusters using Kerberos Authentication, the Kerberos ticket
for the Kafka principal specified in the osakafka.properties file
must be acquired on the database system, and renewed periodically outside of Oracle
SQL access to Kafka.
The Kafka Configuration Directory object and the Kafka cluster access
directory object and database credential name must be supplied as input parameters
to the DBMS_KAFKA_ADM.REGISTER_CLUSTER() call.
The Oracle SQL Access to Kafka administrator (a user with the
osak_admin_role, the OSAK_ADMIN) performs the cluster
registration and administration tasks.
Parent topic: Oracle SQL Access to Kafka
22.12 Configuring Access to Unsecured Kafka Clusters
To configure access to non-secure Kafka clusters, the OSAK administrator (Oracle Database user with osak_admin_role) must complete this procedure.
Access to non-secure Kafka clusters requires that you create a cluster
access database directory object to control access to the Kafka cluster. The grants
on this database directory are used to control which Oracle Database users can
access the Kafka cluster. This database directory has an empty path: it does not a
need a corresponding operating system directory, and it also does not contain any
files. Oracle recommends that the Oracle Directory Object Name for a cluster access
database directory object takes the form
OSAK_CLUSTER_NAME_ACCESS, where
CLUSTER_NAME is the name of the Kafka
cluster.
Procedure:
-
Create the cluster access database directory with an empty path. This directory is used to control which Oracle users can access the Kafka cluster.
For example, create a cluster access database directory object called
oaskaccess_kafkaclust1with an empty path. This directory is used to control which Oracle users can access the Kafka cluster.SQL> CREATE DIRECTORY OSAK_KAFKACLUS2_ACCESS AS ''; -
On the target Oracle Database server, create the cluster configuration operating system directory in the Oracle base path directory, using the path
Oracle_base/osak/cluster_name/configwhereOracle_baseis the Oracle base directory, andcluster_nameis the Kafka cluster name. For example:mkdir /u01/app/oracle/osak/kafkaclus2/configLog in to the database as SYSDBA, start SQL, and create the corresponding Oracle directory object. In this example, the Kafka cluster name is
KAFKACLUS2:SQL> CREATE DIRECTORY OSAK_KAFKACLUS2_CONFIG AS 'u01/app/oracle/osak/kafkaclus2/config'; -
Create an empty
osakafka.propertiesfile, or an osakafka.properties file with OSAK tuning or debugging properties. -
In SQL, register the Kafka cluster using
DBMS_KAKFA_ADM.REGISTER_CLUSTER(). For example, using the server hostnamemykafkabootstrap-host, port 9092, for Kafka clusterKAFKACLUS2:SELECT DBMS_KAFKA_ADM.REGISTER_CLUSTER( cluster_name => 'KAFKACLUS2', bootstrap_servers => 'Kafka-example-host:9092', kafka_provider => DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE, cluster_access_dir => 'OSAK_KAFKACLUS2_ACCESS', credential_name => NULL, cluster_config_dir => 'OSAK_KAFKACLUS2_CONFIG', cluster_description => 'My test cluster kafkaclus2', options => NULL ) FROM dual;If configuration is successful, then the registration return is 0 (zero):
SQL> DBMS_KAFKA_ADM_RE….. 0 - Grant read access to a Kafka user. In the following example, user
app2-usris granted access to the Kafka cluster namedKAFKACLUS2:SQL> grant read on directory osak_kafkaclus2_access toapp2-usr;
Parent topic: Oracle SQL Access to Kafka
22.13 Configuring Access to Secure Kafka Clusters Using Oracle Wallet
Configure secure Kafka access by replacing PEM-based SSL with Oracle Wallet and registering the cluster using wallet credentials.
In previous releases, access to secure Kafka clusters required
configuration files, such as osakafka.properties, and additional
security files such as SSL/TLS PEM files and certificates. These files were stored
in a cluster configuration database directory object. The configuration files and
directory were accordingly protected by the operating system directory and file
access privileges.
Starting with Oracle AI Database 23.26.2, Oracle is replacing the
SSL/TLS PEM files and certificates procedure with Oracle Wallet configuration.
Instead of multiple files, the cluster configuration database directory object
points to a single wallet (ewallet.p12 in this example). Instead of
file-based secrets, access is enabled with DBMS_CREDENTIAL.
With the Oracle Wallet method, Kafka access is controlled in three layers:
- Access control: This is maintained by creating a DIRECTORY object for access and using GRANT READ to control user access.
- Configuration: The configuration files are in an operating system directory, and the location of these files are referenced in a separate DIRECTORY object.
- Authentication: Authentication is provided by using Wallet + DBMS_CREDENTIAL
Procedure:
-
Create a Cluster Access Directory Object for Kafka for access.
CREATE DIRECTORY osakaccess_kafkaclus1 AS '';
Note:
In this example, the directory object
osakaccess_kafkaclus1is a logical object created to attach database privileges (GRANT READ). The directory object is not being used to point to an actual file system location. Instead, it is being used as a security access control mechanism inside the database. -
Create the Cluster Configuration operating system directory for the configuration file location.
mkdir -p /u01/app/oracle/osak/kafkaclus1/config
Set permissions as follows on this directory:
- Directory:
750 (rwxr-x---) osakafka.properties:540 (rw-r-----)- Other files:
440 (r--r-----)
- Directory:
-
Create the Oracle Directory Object for the
configdirectory location:CREATE DIRECTORY OSAK_KAFKACLUS1_CONFIG AS '/u01/app/oracle/osak/kafkaclus1/config';
Note:
The
OSAK_KAFKACLUS1_ CONFIGDirectory object points to where configuration and wallet files are placed for this Kafka access. -
Create Oracle Wallet (replace the PEM files.
WALLET_DIR=/u01/app/oracle/osak/kafkaclus1/config/wallet WALLET_PWD=MyStrongPassword123 orapki wallet create -wallet $WALLET_DIR -pwd $WALLET_PWD -auto_login
The credential ensures the database can open the wallet during Kafka operations.
-
Import certificates into the wallet.
If starting from PEM files:
# Convert PEM to PKCS12 openssl pkcs12 -export -passout 'Welcome1' \ -in client.pem -inkey client.key -out client.p12 # Import into wallet orapki wallet import_pkcs12 \ -wallet $WALLET_DIR \ -pwd $WALLET_PWD \ -pkcs12file client.p12 \ -pkcs12pwd Welcome1
(Optional): Import CA cert if needed):
orapki wallet add \ -wallet $WALLET_DIR \ -trusted_cert \ -cert ca-cert.pem \ -pwd $WALLET_PWD
-
Verify the wallet.
orapki wallet display -wallet $WALLET_DIR -pwd $WALLET_PWD
-
Create the
osakafka.propertiesfile (Wallet-Based SSL)security.protocol=SSL ssl.wallet.location=/u01/app/oracle/osak/kafkaclus1/config/wallet/ewallet.p12 ssl.endpoint.identification.algorithm=https
Note:
Remove the desupported properties:
ssl.ca.locationssl.certificate.locationssl.key.locationssl.key.password
If these properties are not removed, you will receive an error.
-
Store the wallet password in the database credential.
begin dbms_credential.create_credential( credential_name => 'KAFKACLUS1_WALLET_CRED', username => 'wallet', password => 'MyStrongPassword123' ); end; / -
Register the Kafka cluster with the wallet.
SELECT DBMS_KAFKA_ADM.REGISTER_CLUSTER( cluster_name => 'KAFKACLUS1', bootstrap_servers => 'mykafkabootstrap-host:9092', kafka_provider => DBMS_KAFKA_ADM.KAFKA_PROVIDER_APACHE, cluster_access_dir => 'OSAK_KAFKACLUS1_ACCESS', credential_name => NULL, cluster_config_dir => 'OSAK_KAFKACLUS1_CONFIG', cluster_description => 'My test cluster kafkaclus1', options => '{"walletcredname":"KAFKACLUS1_WALLET_CRED"}' ) FROM dual;The output you should see from this is
0. -
Grant access to Kafka users.
GRANT READ ON DIRECTORY OSAK_KAFKACLUS1_ACCESS TO app1_usr;
As a result of this process:
- Kafka cluster registration points to a DIRECTORY object called OSAK_KAFKACLUS1_ACCESS.
- Only users with READ privilege on that DIRECTORY can access the specified Kafka cluster.
- The administrator can easily grant or revoke access to specific users.
- The administrator can audit access using database mechanisms.
Parent topic: Oracle SQL Access to Kafka
22.14 Administering Oracle SQL Access to Kafka Clusters
See how to update, temporarily disable, and delete Kafka cluster definitions with Oracle SQL access to Kafka
- Updating Access to Kafka Clusters
If the Kafka cluster environment changes, you can update the cluster definition and configuration for those changes. - Disabling or Deleting Access to Kafka Clusters
You can temporarily disable an Oracle SQL access to a Kafka cluster, or delete the connection if it is no longer required.
Parent topic: Oracle SQL Access to Kafka
22.14.1 Updating Access to Kafka Clusters
If the Kafka cluster environment changes, you can update the cluster definition and configuration for those changes.
Parent topic: Administering Oracle SQL Access to Kafka Clusters
22.14.2 Disabling or Deleting Access to Kafka Clusters
You can temporarily disable an Oracle SQL access to a Kafka cluster, or delete the connection if it is no longer required.
Example 22-13 Disabling a Kafka Cluster
During temporary outages of the Kafka environment, you can temporarily disable access to the Kafka cluster
- DBMS_KAFKA_ADM.DISABLE_CLUSTER followed by
- DBMS_KAFKA_ADM.ENABLE_CLUSTER when the Kafka environment is back up
Example 22-14 Deleting a Kafka Cluster
When a cluster definition is no longer needed, the OSAK Administrator can remove the cluster definition
- DBMS_KAFKA_ADM.DEREGISTER_CLUSTER
Parent topic: Administering Oracle SQL Access to Kafka Clusters
22.15 Guidelines for Using Kafka Data with Oracle SQL Access to Kafka
Review guidelines, restrictions, and recommendations as part of your application development plan.
- Kafka Temporary Tables and Applications
Oracle SQL access to Kafka views and their corresponding temporary tables are bound to a unique Kafka application (a group ID), and must exclusively access one or more partitions in a topic on behalf of that application. - Sharing Kafka Data with Multiple Applications Using Streaming
To enable multiple applications to use Kafka data, use Oracle SQL access to Kafka to stream Kafka tables to a user table. - Dropping and Recreating Kafka Tables
Because the Kafka offsets are managed by theDBMS_KAFKAmetadata tables, changes to a Kafka topic configuration can require manual updates to Oracle SQL access to Kafka applications.
Parent topic: Oracle SQL Access to Kafka
22.15.1 Kafka Temporary Tables and Applications
Oracle SQL access to Kafka views and their corresponding temporary tables are bound to a unique Kafka application (a group ID), and must exclusively access one or more partitions in a topic on behalf of that application.
Use these guidelines to assist you with constricting your applications.
Kafka Group IDs and Oracle SQL Access to Kafka Temporary Tables
Unlike standard Oracle tables and views, in accordance with the rules
for consuming Apache Kafka data, Kafka temporary tables cannot be shared across
multiple applications. With Kafka data, each temporary table is a snapshot of data
fetched directly from Kafka at a particular point of time, and has a canonical name
format that identifies the Kafka cluster, the application name, and a view ID, an integer identifying a particular view accessing
one or more partitions in the cluster or topic on behalf of an application
associated with a consumer group ID (groupID) in Kafka. The
temporary views and tables created in Oracle AI Database are bound to a unique Kafka
application (identified by groupID), and must exclusively access
one or more partitions in a topic on behalf of that application. It cannot share
access to these partitions simultaneously with other applications. This restriction
extends to an Oracle application instance. An Oracle SQL Access to Kafka view and
its associated temporary table must be exclusive to that application. If you want to
configure multiple applications to query the same Kafka topic or partition data,
then these applications must identify themselves as a different application (that
is, with different, unique Kafka group IDs), and create their own Oracle SQL access
to Kafka applications, reflecting their own group ID and applcation identity, and
their own set of offsets to track.
Guidelines for Using Views and Tables with Oracle SQL Access to Kafka
Create views and tables for your applications in accordance with the kinds of analytics you want to perform with that data.
If you want your application to use Oracle SQL for analytics, then Oracle recommends that you create an Oracle SQL access to Kafka view for that application that captures all partitions of the data that you want to query. Each visit by a single application instance captures all new Kafka data in a topic, and generates aggregate information that the application can then store or display.
If you do not want to perform analytics using Oracle SQL, but instead use complex logic in the application itself, then Oracle recommends that you scale out the application instances, and have each Oracle SQL access to Kafka view access a single partition on behalf of a single application instance. For this case, typically the Kafka data is joined with standard Oracle tables to enrich the data returned to the application.
In cases where some SQL analytics and joins are performed before more analysis is done by the application, views mapping to some subset of the partitions in a topic can be a good option to choose.
22.15.2 Sharing Kafka Data with Multiple Applications Using Streaming
To enable multiple applications to use Kafka data, use Oracle SQL access to Kafka to stream Kafka tables to a user table.
To share Kafka data with multiple Oracle users, so that table is not tied to
a specific Group ID, Oracle recommends that you have an application user run the Oracle
SQL access to Kafka in Loading mode, with the PL/SQL procedure
DBMS_KAFKA.EXECUTE_LOAD_APP, to create a table owned by that user.
With this option, a single application instance runs the Loading PL/SQL procedure on a
regular basis to load all new data incrementally from a Kafka topic into an Oracle AI
Database table. After the data is loaded into the table, it can then be made accessible
to standard Oracle database applications granted access to that table, without the
restrictions that apply to temporary tables.
22.15.3 Dropping and Recreating Kafka Tables
Because the Kafka offsets are managed by the DBMS_KAFKA
metadata tables, changes to a Kafka topic configuration can require manual updates to Oracle
SQL access to Kafka applications.
To ensure that Oracle application instances can identify what Kafka table content has been read, and where it has been read, partition offsets of a Kafka topic must tracked on a per application instance basis.
Kafka supports three models for committing offsets:
- Auto-commit, where Kafka commits the last offset fetched on a short time schedule
- Manual commit, where applications send a request for Kafka to commit an offset
- Application-managed commits, where Kafka commits are entirely managed by the applications.
Oracle uses application-managed commits. In these commits, Kafka sees this as an
application declaring manual commits without ever explicitly committing to Kafka.
Offsets are recorded and maintained exclusively in DBMS_KAFKA
metadata tables. These tables are protected by the ACID transaction properties of
Oracle Database. To insure the integrity of transactions, Oracle does not support
Kafka auto-commit or Kafka manual commit in Oracle SQL Access to Kafka.
If a Kafka topic is dropped and recreated, then you must update that table manually, depending on the scenario:
Example 22-15 Dropping and Resetting a View with the Same Partitions
If the number of partitions remains the same as the original Kafka topic
configuration, then you must reset the view reset the Oracle SQL access to Kafka
view to begin processing from the beginning of the Kafka partition within the
recreated topic. To reset the view, call the procedure
DBMS_KAFKA.INIT_OFFSET(view_name, 0,
'WML'), where view_name
is the name of the view.
Example 22-16 Dropping and Resetting a View with Fewer Partitions
This option is not available. If the number of partitions is less than the original Kafka topic configuration, then the Oracle SQL access to Kafka applications associated with this topic must be dropped and recreated.
Example 22-17 Dropping and Resetting a View with More Partitions
If the number of partitions is greater than the original Kafka topic
configuration, then you must reset the Oracle SQL Access to Kafka view by calling
the procedure DBMS_KAFKA.INIT_OFFSET(view_name, 0, 'WML'), where view_name is the name of the view, and then
call the procedure DBMS_KAFKA.ADD_PARTITIONS for each Oracle SQL
Access to Kafka application using this topic.
22.16 Choosing a Kafka Cluster Access Mode for Applications
To use Oracle SQL access to Kafka, decide what mode of data access you require for your applications.
- Configuring Incremental Loads of Kafka Records Into an Oracle Database Table
To enable applications to load data incrementally from a Kafka topic into an Oracle Database table, you use Oracle SQL Access to Kafka in Loading mode. - Streaming Access to Kafka Records in Oracle SQL Queries
To access Kafka topics in a sequential manner from the beginning of the topic, or from a specific starting point in a Kafka topic, you can use Oracle SQL Access to Kafka in Streaming mode. - Seekable access to Kafka Records in Oracle SQL queries
To access Kafka records randomly between two timestamps, you use Oracle SQL Access to Kafka in Seekable mode
Parent topic: Oracle SQL Access to Kafka
22.16.1 Configuring Incremental Loads of Kafka Records Into an Oracle Database Table
To enable applications to load data incrementally from a Kafka topic into an Oracle Database table, you use Oracle SQL Access to Kafka in Loading mode.
Configuring Oracle SQL Access to Kafka to perform incremental loads using the
EXECUTE_LOAD_APP procedure enables you to move Kafka data into
standard Oracle tables, which are accessible by multiple applications without the
one reader constraint imposed when using Oracle SQL access to Kafka temporary
tables.
To load Kafka data incrementally into an Oracle Database table, an application
declares that it is a loading application by calling the PL/SQL procedure
DBMS_KAFKA.CREATE_LOAD_APP to initialize a state for subsequent
calls toDBMS_KAFKA.EXECUTE_LOAD_APP. The
DBMS_KAFKA.CREATE_LOAD_APP procedure creates a single view over
all partitions of the topic.
If you do not require data from the entire topic, then you also have the option to
configure the application to call the DBMS_KAFKA.INIT_OFFSET[_TS]
procedure to set the starting point in Kafka topic partitions for loading the Kafka
data.
The DBMS_KAFKA.EXECUTE_LOAD_APP procedure is called in an
application loop to load data from where the previous call left off to the current
high water mark of the Kafka topic. This procedure runs in an autonomous
transaction.
To load data into an Oracle Database table from a Kafka topic:
- DBMS_KAFKA.CREATE_LOAD_APP to create an Oracle SQL Access to Kafka Load application
- Optionally, DBMS_KAFFA_INIT_OFFSET_TS or DBMS_KAFKA_INIT_OFFSET to set the first Kafka record to be read
- LOOP until done
- DBMS_KAFKA.EXECUTE_LOAD_APP to load Kafka data starting from where we left off to the current high water mark
- DBMS_KAFKA.DROP_LOAD_APP to drop the load application
Parent topic: Choosing a Kafka Cluster Access Mode for Applications
22.16.2 Streaming Access to Kafka Records in Oracle SQL Queries
To access Kafka topics in a sequential manner from the beginning of the topic, or from a specific starting point in a Kafka topic, you can use Oracle SQL Access to Kafka in Streaming mode.
If your application requires access to Kafka topics in a sequential
manner, you can configure Oracle SQL Access to Kafka in Streaming mode. This mode
enables a SQL query using an Oracle SQL access to Kafka temporary table to access
Kafka records sequentially in an application processing loop. With this use case,
the application declares that it is a streaming application by calling the PL/SQL
procedure DBMS_KAFKA.CREATE_STREAMING_APP to initialize the state
for subsequent queries of Oracle SQL access to Kafka views. In addition to creating
views, this procedure also creates a global temporary table for each view. You also
have the option to use the INIT_OFFSET[_TS] procedure to set the starting point in
Kafka topic partitions for your application. When you set as starting point, the
initial query reads the Kafka data from the starting point. The application then can
perform the following steps, in a processing loop:
- Call
DBMS_KAFKA.CREATE_STREAMING_APPto create the Oracle SQL access to Kafka streaming application. - (Optional) call
DBMS_KAFFA_INIT_OFFSET_TSorDBMS_KAFKA_INIT_OFFSETto set the first Kafka record that you want to be read. - LOOP until done:
- Call DBMS_KAFKA.LOAD_TEMP_TABLE to load the global temporary table with the next set of rows from Kafka
- SELECT from the OSAK global temporary table Process data retrieved
- If the processing was successful, call
DBMS_KAFKA.UPDATE_OFFSETto update the last Kafka offsets read - Commit the offset tracking information using
COMMIT.
- When finished, call DBMS_KAFKA.DROP_STREAMING_APP to drop the application.
The PL/SQL procedure DBMS_KAFKA.UPDATE_OFFSET transparently advances
Kafka partition offsets of the Kafka group ID for all of the partitions that are
identified with the Oracle SQL access to Kafka view, so that for every call to
DBMS_KAFKA.LOAD_TEMP_TABLE, a new set of unread Kafka records
is retrieved and processed
Note that UPDATE_OFFSET initiates an Oracle transaction if a
transaction is not already started, and records the last offsets in metadata tables.
Because of this, to ensure that the transaction does not lose its session
information you should
configure your application to commit the transaction after every call to
UPDATE_OFFSET. After you commit the transaction, because Oracle
SQL access to Kafka manages offsets within an Oracle transaction, no records are
lost or reread. If the transaction fails to complete, then offsets are not advanced.
When the application resumes data reads, it can then restart the data reads of the
Kafka data from where it stopped its previous reads.
Parent topic: Choosing a Kafka Cluster Access Mode for Applications
22.16.3 Seekable access to Kafka Records in Oracle SQL queries
To access Kafka records randomly between two timestamps, you use Oracle SQL Access to Kafka in Seekable mode
The Seekable mode of Oracle SQL access to Kafka enables an application to read Kafka
records between timestamps of interest, typically identified by a peer application
doing streaming access. In this mode, you specify the start and end timestamps that
define a window of time from which the DBMS_KAFKA.LOAD_TEMP_TABLE
procedure will populate the temporary table. An application declares that it is a
Seekable application by calling the PL/SQL procedure
DBMS_KAFKA.CREATE_SEEKABLE_APP to initialize the state for
accessing Kafka in Seekable mode. This procedure creates a view and a corresponding
global temporary table over all partitions of the topic. The
DBMS_KAFKA.SEEK_OFFSET_TS procedure is called to specify the
time window from which to query. The application calls
SEEK_OFFSET_TS before calling the
DBMS_KAFKA.LOAD_TEMP_TABLE procedure to load the temporary
table with the next set of rows.
To query Kafka data in ”Seekable” mode in order to access Kafka records between two timestamps
- DBMS_KAFKA.CREATE_SEEKABLE_APP to create the Oracle SQL Access to Kafka seekable application
- LOOP until done
- DBMS_KAFKA.SEEK_OFFSET_TS to seek to a user defined window of time in a Kafka topic
- Call DBMS_KAFKA.LOAD_TEMP_TABLE to load the global temporary table with the set of rows from Kafka
- SELECT from the OSAK global temporary table
- Process the data
- DBMS_KAFKA.DROP_SEEKABLE_APP when done with the application
Parent topic: Choosing a Kafka Cluster Access Mode for Applications
22.17 Creating Oracle SQL Access to Kafka Applications
To query Kafka data in a LOAD application, load Kafka data into an Oracle Database table using these procedures.
Typical uses of load procedures include:
DBMS_KAFKA.CREATE_LOAD_APP: This procedure is used to set up loading
into an Oracle table
DBMS_KAFKA.INIT_OFFSET[_TS] (OPTIONAL): This procedure is used to set
offsets in all topic partitions to control the starting point of a sequence of load
operations. You repeat this procedure until you no longer want to load new rows from the
Kafka topic on which you run the procedure.
DBMS_KAFKA.EXECUTE_LOAD_APP: This procedure is used to load new unread
records from a Kafka topic to high water mark of all topic partitions
DBMS_KAFKA. DROP_LOAD_APP: This procedure is used when
loading is complete from the Kafka topic on which you are running procedures.
- Creating Load Applications with Oracle SQL Access to Kafka
If you want to load data into an Oracle Database table, then use the Loading mode ofDBMS_KAFKA. - Creating Streaming Applications with Oracle SQL Access to Kafka
If you want to load data into an Oracle Database table, then use the Loading mode ofDBMS_KAFKA. - Creating Seekable Applications with Oracle SQL Access to Kafka
If you want to investigate issues that occurred in the past, and randomly access a Kafka topic between starting and ending timestamps, then use the Seekable mode ofDBMS_KAFKA.
Parent topic: Oracle SQL Access to Kafka
22.17.1 Creating Load Applications with Oracle SQL Access to Kafka
If you want to load data into an Oracle Database table, then use the Loading
mode of DBMS_KAFKA.
DBMS_KAFKA.EXECUTE_LOAD_APP for the created LOAD
application.
Example 22-18 Loading Data Into a Table with DBMS_KAFKA.EXECUTE_LOAD_APP
In this example, you create create one view and associated temporary table for a loading application. The Kafka cluster name is ExampleCluster, the application name is ExampleApp. The Kafka Topic is ExampleTopic, which is a topic that has four partitions:
DECLARE
v_options VARCHAR2(4000);
BEGIN
v_options := '{"fmt":"DSV","reftable":"user_reftable_name"}';
SYS.DBMS_KAFKA.CREATE_LOAD_APP(
'ExampleCluster',
'ExampleApp',
'ExampleTopic',
v_options
);
END;
/Example 22-19 Loading Data Periodically Into a Table with DBMS_KAFKA.EXECUTE_LOAD_APP
As an alternative to processing Kafka data from a set of application views, you can
choose simply to load the data from Kafka into an Oracle Database table,
periodically fetching the latest data into the table. The
DBMS_KAFKA.EXECUTE_LOAD_APP procedure in this example obtains
the latest data from the Kafka cluster, and inserts the data into the table,
ExampleLoadTable. An application that uses the data in this
table has the option to calll DBMS_KAFKA.INIT_OFFSET[_TS] to set
the starting point for the load.
DECLARE
v_records_inserted PLS_INTEGER;
BEGIN
SYS.DBMS_KAFKA.EXECUTE_LOAD_APP(
'ExampleCluster',
'ExampleLoadApp',
'ExampleLoadTable',
v_records_inserted
);
-- Optional: view the output value
-- DBMS_OUTPUT.PUT_LINE('Records inserted: ' || v_records_inserted);
END;
/
Example 22-20 Dropping the Kafka View and Metadata with DBMS_KAFKA.DROP_LOAD_APP or DBMS_KAFKA.DROP_ALL_APPS
If the Oracle SQL access to Kafka Load application is no longer needed, then you can
drop the views and metadata by calling DBMS_KAFKA.DROP_LOAD_APP. In
the following example, the Kafka cluster is ExampleCluster, and the
application is ExampleApp.
EXEC SYS.DBMS_KAFKA.DROP_LOAD_APP('ExampleCluster', 'ExampleApp');
If the Kafka cluster for one or more Oracle SQL access to Kafka applications no longer
exists, then you can drop all of the applications for a given cluster by calling
DBMS_KAFKA.DROP_ALL_APPS
EXEC SYS.DBMS_KAFKA.DROP_ALL_APPS('ExampleCluster');
Parent topic: Creating Oracle SQL Access to Kafka Applications
22.17.2 Creating Streaming Applications with Oracle SQL Access to Kafka
If you want to load data into an Oracle Database table, then use the Loading
mode of DBMS_KAFKA.
Streaming enables the ability to process data at scale. You can use Oracle SQL access to Kafka in streaming mode to create multiple application instances. Multiple instances enables applications to scale out and divide the workload of analyzing Kafka data across the application instances running concurrently on one or more threads, processes, or systems.
An Oracle SQL access to Kafka streaming application includes a set of dedicated Oracle SQL access to Kafka global temporary tables and Oracle SQL access to Kafka views. These temporary tables and views can be used for retrieving new, unread records from partitions in a Kafka topic.
It also creates, if not already present, a metadata view that is used to inspect the Kafka cluster for active topic and partition information regarding the Kafka topic. This view is created once, and serves all applications that are sharing the same cluster.
Each Oracle SQL access to Kafka global temporary table and its related view is exclusively used by one instance of an Oracle SQL access to Kafka application.
Each application instance calls LOAD_TEMP_TABLE, which populates the
dedicated Oracle SQL access to Kafka global temporary table with Kafka rows
retrieved from the associated view. The application then can run one or more SQL
queries against the content in the Oracle SQL access to Kafka global temporary
table. When the application is done with the current set of Kafka rows, it calls
UPDATE_OFFSET and COMMIT.
A STREAMING mode application is different from a
LOAD or SEEKING application in that you can
configure the application to select how many Oracle SQL access to Kafka views and
temporary tables are required for your application purpose. As with other types of
Oracle SQL access to Kafka applications, each application instance exclusively
queries one unique Oracle SQL access to Kafka temporary table. Each Oracle SQL
access to Kafka view and global temporary table name includes the cluster name, the
application name, and an application instance identifier (ID).
In creating your application, be aware that the number Oracle SQL access to Kafka views and temporary table pairs you create must be between 1 and N where N is the number of partitions in the Kafka topic.
During runtime, each application instance runs in its own user session, and processes
one Oracle SQL access to Kafka global temporary table and its associated view.
Accordingly, to run application instances concurrently, you must allocate at least
as many sessions to the user as there are partitions in the Kafka topic (that is,
the value of N). If the view_count exceeds the
maximum sessions per user, then this call fails with an error indicating that there
are insufficient sessions allocated to the user. The number of Kafka partitions
bound to a specific Oracle SQL access to Kafka view and its associated global
temporary table varies, depending on how many views are created, and on how many
partitions exist. Oracle SQL access to Kafka balances the number of partitions
assigned to each view.
Example 22-21 Streaming Data Into a Table with DBMS_KAFKA.CREATE_STREAMING_APP
In this example, you create a set of four views and associated temporary tables for a
Streaming mode application using data from a topic called
ExampleTopic. The topic has four partitions, and each view and
temporary table is associated with one partition:
DECLARE
v_options VARCHAR2(4000);
BEGIN
v_options := '{"fmt":"DSV","reftable":"user_reftable_name"}';
SYS.DBMS_KAFKA.CREATE_STREAMING_APP(
'ExampleCluster',
'ExampleApp',
'ExampleTopic',
v_options,
4
);
END;
/Example 22-22 Loading Data Into a Single Table with DBMS_KAFKA.CREATE_STREAMING_APP
In this example, Streaming mode is used to create one view and associated temporary table for an application that is associated with all four partition of the topic:
DECLARE
v_options VARCHAR2(4000);
BEGIN
v_options := '{"fmt":"DSV","reftable":"user_reftable_name"}';
SYS.DBMS_KAFKA.CREATE_STREAMING_APP(
'ExampleCluster',
'ExampleApp',
'ExampleTopic',
v_options,
1
);
END;
/
Example 22-23 Dropping the Kafka View and Metadata with DBMS_KAFKA.DROP_STREAMING_APP or DBMS_KAFKA.DROP_ALL_APPS
If the Oracle SQL access to Kafka Load application is no longer needed, then you can
drop the views and metadata by calling
DBMS_KAFKA.DROP_STREAMING_APP. In the following example, the
Kafka cluster is ExampleCluster, and the application is
ExampleApp.
EXEC SYS.DBMS_KAFKA.DROP_STREAMING_APP('ExampleCluster', 'ExampleApp');
If the Kafka cluster for one or more Oracle SQL access to Kafka applications no
longer exists, then you can drop all of the applications for a given cluster by
calling DBMS_KAFKA.DROP_ALL_APPS
EXEC SYS.DBMS_KAFKA.DROP_ALL_APPS('ExampleCluster');
Parent topic: Creating Oracle SQL Access to Kafka Applications
22.17.3 Creating Seekable Applications with Oracle SQL Access to Kafka
If you want to investigate issues that occurred in the past, and randomly
access a Kafka topic between starting and ending timestamps, then use the Seekable mode of
DBMS_KAFKA.
Before accessing Kafka topics in Seekable mode, you must create an
Oracle SQL Access to Kafka application with
DBMS_KAFKA.CREATE_SEEKABLE_APP This package creates an
application that you can use in Seekable mode.
Using Oracle SQL access to Kafka in Seekable mode enables you to use
Kafka data to investigate issues that have occurred in the past. Provided that the
data is still present in the Kafka steam, you can create a Seekable application by
calling DBMS_KAFKA.CREATE_SEEKABLE_APP. When you have created a
Seekable mode application, you can then call the procedure
DBMS_KAFKA.SEEK_OFFSET_TS to request the Oracle SQL access to
Kafka view to retrieve a range of data records. For example, suppose that an IT
consultant was informed that a production issue occurred around 03:00 in the
morning, and needed to investigate the cause. The consultant could use the following
procedure, load the temporary table, and then select to retrieve an hour's worth of
data around that time:
In creating your application, be aware that the number Oracle SQL access to Kafka views and temporary table pairs you create must be between 1 and N where N is the number of partitions in the Kafka topic.
During runtime, each application instance runs in its own user session, and processes
one Oracle SQL access to Kafka global temporary table and its associated view.
Accordingly, to run application instances concurrently, you must allocate at least
as many sessions to the user as there are partitions in the Kafka topic (that is,
the value of N). If the view_count exceeds the
maximum sessions per user, then this call fails with an error indicating that there
are insufficient sessions allocated to the user. The number of Kafka partitions
bound to a specific Oracle SQL access to Kafka view and its associated global
temporary table varies, depending on how many views are created, and on how many
partitions exist. Oracle SQL access to Kafka balances the number of partitions
assigned to each view.
Example 22-24 Searching a Date Range in Kafka Data Using DBMS_KAFKA.CREATE_SEEKABLE_APP
In this example, suppose that an IT consultant was informed that a
production issue occurred around 03:00 in the morning, and needed to investigate the
cause. The consultant could use the following procedure, load the temporary table,
and then select to retrieve an hour's worth of data around that time, where the
Kafka cluster is EXAMPLECLUSTER, and the columns are
EventCol and ExceptionCol:
BEGIN
SYS.DBMS_KAFKA.SEEK_OFFSET_TS(
'ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0',
TO_DATE('2022/07/04 02:30:00', 'YYYY/MM/DD HH24:MI:SS'),
TO_DATE('2022/07/04 03:30:00', 'YYYY/MM/DD HH24:MI:SS')
);
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE('ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0');
END;
/
SELECT EventCol, ExceptionCol
FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
Example 22-25 Locating Records Associated with Anomalies Using DBMS_KAFKA.CREATE_SEEKABLE_APP
Suppose that when an application using sequential access to a Kafka stream detected a
potential anomaly, the application inserts a row into an anomaly table. The anomaly
table includes the Kafka timestamp, as well as any other data specified as important
to trace. Another application could use this information to retrieve records around
the suspected record to see if there were any other issues associated with the
anomaly. In this example, the columns associated with an anomaly that an IT
consultant wants to examine are UserCol and
ReqeustCol. To achieve this, run the following procedure, load
the temporary table, and then select and apply application logic to the results:
BEGIN
SYS.DBMS_KAFKA.SEEK_OFFSET_TS(
'ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0',
TO_DATE('2020/07/04 02:30:00', 'YYYY/MM/DD HH24:MI:SS'),
TO_DATE('2020/07/04 03:30:00', 'YYYY/MM/DD HH24:MI:SS')
);
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE('ORA$DKVGTT_EXAMPLECLUSTER_SEEKABLEAPP_0');
END;
/
SELECT UserCol, RequestCol
FROM ORA$DKV_EXAMPLECLUSTER_SEEKABLEAPP_0;
-- application logic
Example 22-26 Dropping the Kafka View and Metadata with DBMS_KAFKA.DROP_SEEKABLE_APP or DBMS_KAFKA.DROP_ALL_APPS
If the Oracle SQL access to Kafka Load application is no longer needed,
then you can drop the views and metadata by calling
DBMS_KAFKA.DROP_SEEKABLE_APP. In the following example, the
Kafka cluster is ExampleCluster, and the application is
ExampleApp.
EXEC SYS.DBMS_KAFKA.DROP_SEEKABLE_APP
('ExampleCluster', 'ExampleApp');
If the Kafka cluster for one or more Oracle SQL access to Kafka
applications no longer exists, then you can drop all of the applications for a given
cluster by calling DBMS_KAFKA.DROP_ALL_APPS
EXEC SYS.DBMS_KAFKA.DROP_SEEKABLE_APP('ExampleCluster', 'ExampleApp');
Parent topic: Creating Oracle SQL Access to Kafka Applications
22.18 Using Kafka Cluster Access for Applications
Learn how to use Kafka cluster data access with your applications.
- How to Diagnose Oracle SQL Access to Kafka Issues
If you encounter issues with Oracle SQL access to Kafka, then use these guidelines to determine the cause, and resolve the issue. - Identifying and Resolving Oracle SQL Access to Kafka Issues
To assist with identifying and resolving issues, Oracle SQL access to Kafka provides trace files, message tables, operation results tables, and a state column in the cluster table.
Parent topic: Oracle SQL Access to Kafka
22.18.1 How to Diagnose Oracle SQL Access to Kafka Issues
If you encounter issues with Oracle SQL access to Kafka, then use these guidelines to determine the cause, and resolve the issue.
The following are the main diagnostic issues for Oracle SQL access to Kafka:
Failures to establish an initial connection
Errors of this type are as follows:
- Incorrect startup server list
- Incorrect credential information
- Networking configuration issues
Failures on first access
DBMS_KAFKA CREATE_LOAD_APP,
CREATE_STREAMING_APP, or CREATE_SEEKABLE_APP
typically have the following causes:
- Missing or incorrect topic
- Connection issues
Failures during record selection
Failures of this type typically have the following causes:- Connection issues
- Internal metadata or logic issues
- Missing records
- Parsing errors where the Oracle SQL access to Kafka view shape does not match the input.
Failure for an Oracle application and Oracle SQL access to Kafka views to keep up with Kafka data input.
Failures of this type require resource tuning. They occur when the ingestion rate of rows into a topic in a Kafka cluster comes close to or exceeds the Oracle Database ability to consume Kafka records, such that after a period of time, unread records in Kafka become aged out by Kafka before they are consumed by Oracle Database.
Avoid or correct this kind of error by determining the workload. For example, check the frequency of querying, the typical number of records processed per query per Oracle SQL access to Kafka view, the degree of parallelism being used, and the time spent by an application performing analysis. When you have determined the workload, then ensure that the application stack can meet it. Size your resources so that the application and Oracle Database can process peak Kafka records without stressing either the application or Oracle Database resources.
If you find that throughput rates start increasing, then several things can help. For example: increase the degree of parallelism for the application user, start more application instances, or add partitions to the Kafka cluster.
Example 22-27 Resolving an Oracle SQL Access to Kafka (OSAK) Application Error
Suppose your OSAK application EXAMPLEAPP is loading data from the
Kafka cluster EXAMPLECLUSTER, and you receive an error such as the
following:
ORA-62721: The specified parallel hint [%0!s] exceeds the granule count {%1!s}.
The cause of this error is that the specified value was greater than the maximum possible parallelism, which is determined by the granule count. How do you resolve such an error?
The parallel_hint parameter on
LOAD_TEMP_TABLE and EXECUTE_LOAD_APP is
related to the degree or parallelism (or DOP), which determines how many parallel
process can be run for a given select statement to fetch the data. To leverage
parallel queries to their potential, the parallel_hint parameter
must be set between 2 and the maximum allowed DOP. The maximum DOP is either the
maximum allowed for the user making the call, or the number of partitions associated
with the OSAK view, whichever is smaller. The cause is that either the database or
the user account running the application has exceeded the maximum allowed DOP.
To resolve this issue, specify a value less than or equal to the granule count. The
granule count can be determined by calling the
DBMS_KAFKA.GET_GRANULE_COUNT function:
DECLARE
v_dop PLS_INTEGER;
v_success BOOLEAN := TRUE; -- set this based on your processing outcome
BEGIN
LOOP
v_dop := SYS.DBMS_KAFKA.GET_GRANULE_COUNT('ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0');
-- Example exit condition (adjust to your intended logic)
IF v_dop = 0 THEN
EXIT;
END IF;
SYS.DBMS_KAFKA.LOAD_TEMP_TABLE('ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0');
FOR kafka_record IN (
SELECT kafka_offset
FROM ORA$DKVGTT_EXAMPLECLUSTER_EXAMPLEAPP_0
) LOOP
DBMS_OUTPUT.PUT_LINE('Processing record: ' || kafka_record.kafka_offset);
-- application logic to process the Kafka records
-- set v_success := TRUE/FALSE accordingly
END LOOP;
IF v_success THEN
-- Update internal metadata to confirm Kafka records were successfully processed
SYS.DBMS_KAFKA.UPDATE_OFFSET('ORA$DKV_EXAMPLECLUSTER_EXAMPLEAPP_0');
COMMIT;
ELSE
-- add your application logic to correct for any failures
NULL;
END IF;
END LOOP;
END;
/
Parent topic: Using Kafka Cluster Access for Applications
22.18.2 Identifying and Resolving Oracle SQL Access to Kafka Issues
To assist with identifying and resolving issues, Oracle SQL access to Kafka provides trace files, message tables, operation results tables, and a state column in the cluster table.
Determine the nature of the issue you see, and then use the utility available to you to identify and address the issue:
- Connection issue, logic issue, or Kafka access layer (Oracle
executables called by a Kafka data select) Check the trace file.
Also, you can check the state column in the
sys.user_kafka_clusterstable. - Exceptions from DBMS_KAFKA and DBMS_KAFKA_ADM APIs: Review
error messages in the
sys.user_kafka_messagestable. - Operations runtime issue: Review messages in the
sys.user_kafka_ops_resultstable when the performance of Oracle SQL access to Kafka data retrieval is not as expected.
Example 22-28 Connection issue, Logic Issue or Kafka access layer issue
Use the trace file to identify the issue.
-
For connection related issues, the details are available from the view object tracing. To enable, either add the event to the
init.orafile or use the alter system command to update the system during runtime:Add the following entry to the initialization file (
init.ora):event='trace[KGRK] disk highest'Alter the system:
alter system set events 'trace[KGRK] disk highest';Note:
Updates to theinit.orafile require a restart of the database to take effect. -
For logic-related errors, all error paths contain tracing. All messages are prefaced with by the string
kubsCRK. These logic errors will also result in SQL exceptions being raised. -
The tracing output for the Kafka access layer of an Oracle SQL access to Kafka application is enabled by calling
DBMS_KAFKA.SET_TRACINGwith the enable argument passed asTRUE. The tracing output is disabled by calling the same function with the enable argument passed asFALSE.For example:
To enable tracing for a cluster named
ExampleCluster, with the application isExampleApp, enter the following:EXEC SYS.DBMS_KAFKA.SET_TRACING('ExampleCluster', 'ExampleApp', TRUE);To disable tracing for that cluster, enter the following:
EXEC SYS.DBMS_KAFKA.SET_TRACING('ExampleCluster', 'ExampleApp', FALSE);Note:
To enable tracing, the following event must already be enabled for the database:
event="39431 trace name context forever, level 1" # Enable external table debug tracing
If you determine that the issue is a connection issue, then check the
State column in the
sys.user_kafka_clusters table. The connection levels are
designated by numeric values:
- CONNECTED (0): This state indicates that the
connection to the Kafka cluster has been established. Errors that occur while
the connection is established indicate an issue with requesting the Kafka data.
To identify the issue, enable tracing by using the
DBMS_KAFKA.SET_TRACINGAPI, reproduce the problem, and then check the associated trace file for the session for messages containing 'kubsCRK". Also check for messages in theuser_kafka_messagestable. - MAINTENANCE (1): This state indicates that
the connection to the Kafka cluster has been established, but errors that occur
while the connection is established indicate an issue requesting the Kafka data.
To resolve this issue, enable tracing using the
DBMS_KAFKA.SET_TRACING API, reproduce the problem, and then check the associated trace file for the session for messages containingkubsCRK. Also check for messages in theuser_kafka_messagestable. - BROKEN (2): This state indicates that a connection cannot be reestablished to the Kafka cluster. Look for errors in the trace file for the facility KUBD, and in the message table.
- DEREGISTERED (3): This state indicates that the OSAK administrator has forced the cluster to be deregistered, and the associated Oracle SQL access to Kafka views should no longer be accessed. This is expected behavior, and not an error.
Example 22-29 PL/SQL Package issues
Check the Sys.user_kafka_messages table. This table
contains any messages logged within the last three days. The data is automatically
purged of older data once a day. The messages are also removed if the OSAK views
associated with the data are
dropped.
Example 22-30 Operations Runtime Issue
If the number of rows retrieved using a SELECT statement
appears to be less than expected, then use the data in the
sys.user_kafka_ops_results table to review the number of
records read from Kafka for the last selection.
The SELECT only contains rows that parsed correctly, so the
difference between the rows retrieved and Kafka records read indicates that not all
data in the Kafka topic is in the format specified during the DBMS_KAFKA
CREATE_LOAD_APP, CREATE_STREAMING_APP, or
CREATE_SEEKABLE_APP call.
If the Kafka topic data is not in the specified format, then the answers are as follows:
- Fix the producers publishing to the Kafka cluster.
- Drop and recreate the application so that it provides the proper format (reference table for DSV, Avro schema for AVRO).
- For JSON data, before you drop and recreate the application, check
to see if the data exceeds the maximum column length in the
VARCHAR2 VALUEcolumn. If the data is larger than the maximum, then you can drop and recreate the application, but this time add the option"jsond" : "clob"to the options parameter. This option enables OSAK to create the column as a character large object (CLOB) column, instead of the default maximum sizedVARCHAR2.
Parent topic: Using Kafka Cluster Access for Applications