Exploring transactional messaging in Oracle 23c Free - developer release
transactional messaging service collaboration eventuate platformOne key Eventuate.io design principle is to leverage existing infrastructure technologies as much as possible. Specifically rather than reinventing storage and messaging technologies, it uses relational databases, such as MySQL and Postgres and message brokers, such as Apache Kafka and RabbitMQ. Eventuate glues together the database and message broker using the Transactional Outbox Pattern. The Eventuate CDC service uses either Transaction log tailing or Polling publisher patterns to pull messages from the outbox table and publish them to the message broker.
I must confess that I’ve mostly ignored the Oracle database for longer than I remember. But while implementing some enhancements for an Eventuate customer that uses Oracle, I was reminded of an intriguing alternative to the Transactional Outbox pattern: Oracle’s transactional event queues (formerly known as Advanced Queuing). The new Oracle 23c Free - developer release provides a great opportunity to explore this alternative. It’s freely available - no license to sign! And, it supports the Oracle Kafka API, which claims to allow Kafka applications to use Oracle’s transactional event queues with minimal code changes.
This article describes what I’ve learned so far in my investigation of Oracle 23c Free.
Running the Oracle 23c Free Docker container
Running Oracle 23c Free is remarkably easy:
docker run -it container-registry.oracle.com/database/free:latest
Or at least it is easy if you have a Intel-based Mac. The container image only supports Intel and doesn’t run on an M1-based Mac. As a result, I had to develop using a Gitpod environment. Gitpod worked reasonably well although downloading the large database image was really slow.
Attempt #1 at using the Kafka Consumer to consume messages sent by PL/SQL
Once the database container was running, I then dove into the Oracle documentation to learn how to do the following:
- Create a queue using PL/SQL - I later discovered that I could use
KafkaAdmin
to do this - Send a message using PL/SQL instead of the Kafka Producer so that the messages would be sent as part of the database transaction that updates business entities
- Read messages from the queue using the Kafka Consumer
Let’s look at each step.
Creating a queue using PL/SQL
Creating a queue using PL/SQL is straightforward:
call DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(queue_name, queue_payload_type => 'json').
Since my goal was to send JSON messages, I specified the queue_payload_type
to be json
.
My code invoked the stored procedure using jdbcTemplate.call()
.
Sending a message using PL/SQL
The Kafka Producer can be used to send messages.
However, since the goal is to send a message in the same transaction that updates business entities, the sending must be done using JDBC.
Fortunately, sending a message using JDBC also appeared to be straightforward using the DBMS_AQ.ENQUEUE()
stored procedure:
DBMS_AQ.ENQUEUE(queue_name, enqueue_options, message_properties, json(message), msgid);
The json()
function converts the string message to a JSON value.
Since enqueue_options
and message_properties
arguments are PL/SQL records I wrote a wrapper stored procedure that was easier to invoke using jdbcTemplate.call()
.
So far so good.
Let’s now look at using the Kafka Java client to consume messages from the queue.
Reading messages using the Kafka Consumer
The first thing I discovered is that you don’t use the regular Kafka Client.
Instead, you must use an Oracle-specific replacement called okafka
.
The API is very similar to the Kafka client but the package name is different: org.oracle.okafka.clients.consumer.KafkaConsumer
There are also various limitations.
Most notably you cannot commit specific offsets.
Once I’d added the right dependencies to the project and overrode the Maven BOM to downgrade to the Kafka Client 2.8.1, I had a consumer running.
Sadly, however, it didn’t receive any messages.
Moreover, the ConsumerRebalanceListener
was never invoked.
Attempt #2 at using the Kafka Consumer to consume messages sent by PL/SQL
I carefully studied the docs to see what I was doing wrong.
Creating a Kafka ‘topic’ using dbms_teqk.aq$_create_kafka_topic()
While there wasn’t anything obviously wrong, I noticed that the example wasn’t creating queues using DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE()
.
Instead, it was using dbms_teqk.aq$_create_kafka_topic()
.
I then changed the queue creation code to use this apparently undocumented stored procedure.
Sending a message to a Kafka ‘topic’ using PL/SQL
After I changed the queue creation code, the DBMS_AQ.ENQUEUE()
call started failing.
Apparently, the queue’s payload type was now JMS message instead of JSON
.
This makes sense since the Oracle Kafka client actually uses Oracle JMS.
I then changed the message sending code to create a JMS binary message.
I then got this error:
ORA-25600: Invalid shard: Input shard does not match with shard in the queue
ORA-06512: at "SYS.DBMS_AQ", line 240
Fixing this error took a remarkably long time. I could not find any relevant documentation on message/shard keys. ChatGPT was equally confused, especially since it denied that Oracle 23c existed.
Somehow I eventually discovered that I needed to call this stored procedure after creating the queue:
DBMS_AQADM.SET_QUEUE_PARAMETER(?,'KEY_BASED_ENQUEUE', 1)
According to the Oracle documentation:
When set, the shard to which a message gets enqueued is determined by the key value specified in the message. Refer to key-based sharding (link) for more details. This parameter cannot be unset once set.
Sadly, ‘link’ isn’t a link and so I couldn’t find out more information about key-based sharding.
But after making this change, DBMS_AQ.ENQUEUE()
started working.
I also followed ChatGPT’s suggestion to specify the message/shard key using the correlation
property of message_properties
:
message_properties.correlation = shard_key;
No idea, however, if this is correct.
Reading messages using the Kafka Consumer
Once I’d made the above changes, the consumer started to work.
The ConsumerRebalanceListener
was invoked.
And, the consumer received messages.
Yay!
However, even though the consumer appears to work, a background thread repeatedly generated the following error:
java.sql.SQLException: ORA-06533: Subscript beyond count
ORA-06512: at "SYS.DBMS_TEQK", line 825
ORA-06512: at "SYS.DBMS_TEQK", line 790
ORA-06512: at line 1
at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:630)
at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:564)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1231)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:772)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:299)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:512)
at oracle.jdbc.driver.T4CCallableStatement.doOall8(T4CCallableStatement.java:159)
at oracle.jdbc.driver.T4CCallableStatement.executeForRows(T4CCallableStatement.java:1237)
at oracle.jdbc.driver.OracleStatement.executeSQLStatement(OracleStatement.java:1820)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1472)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3761)
at oracle.jdbc.driver.OraclePreparedStatement.execute(OraclePreparedStatement.java:4136)
at oracle.jdbc.driver.OracleCallableStatement.execute(OracleCallableStatement.java:4279)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.execute(OraclePreparedStatementWrapper.java:1014)
at org.oracle.okafka.clients.consumer.internals.AQKafkaConsumer.syncGroup(AQKafkaConsumer.java:948)
In summary
My code appears to work, at least partially.
But I don’t know why.
And I don’t know how to fix ORA-06533: Subscript beyond count
.
Yet another day of 21st century software development - a maze of twisty little passages, all alike.
Suggestions welcome!
Next steps
The next step is to investigate the ORA-06533: Subscript beyond count
error.
I’ll also write a simple application that updates a business entity and sends a message to a queue within the same transaction.
I’m assuming that this will work as expected but …