7 Transactional Event Queue with Spring Framework

This chapter describes how Transactional Event Queue (TxEventQ) messaging platform can be used with Spring Framework. There are primarily two ways to publish and consume messages from TxEventQ with Spring Framework.

Topics:

Spring JMS

Spring JMS is a JMS Integration framework that simplifies the use of the JMS API within Spring framework. This integration framework is available on maven repository.

To make Spring JMS framework work with TxEventQs, an application developer will have to perform the following steps.

  1. Create a TxEventQ using PL/SQL or any other interface.

    -- User permissions
    grant connect, resource, unlimited tablespace to user;
    grant execute on dbms_aq to user;
    grant execute on dbms_aqadm to user;
    grant execute on dbms_aqin to user;
    
    dbms_aqadm.create_transactional_event_queue('SPRINGQUEUE');
    dbms_aqadm.start_queue('SPRINGQUEUE'); 
    
  2. Add dependent AQ-JMS jar file in the classpath.

    Transactional Event Queue (TxEventQ) is JMS 1.1 compliant, and the implementation of the JMS Specification 1.1 is released in aqapi-jakarta.jar which is available in maven. This implements all features of JMS 1.1 and implements most of the functionality of Jakarta-JMS 3.0.1.

    If application is using Spring Boot version 3 or above, then it must include aqapi-jakarta.jar file. For previous spring boot versions, aqapi.jar is used.

  3. Configure an AQJMS Connection factory.

    It is important to have a connection pool to create the AQ-JMS connection factory which will be plugged in by Spring JMS framework to create further JMS support objects.

    Note:

    If the AQ-JMS connection factory is not created using the connection pool, then the performance may get hampered.

    Application must use a PoolDataSource to create AQ-JMS Connection factory. A PoolDataSource bean can be created by either configuring Spring-Data-JDBC module as shown in the following example or by an annotated Java method as shown in myPoolDataSource method in the following example.

    @Bean
    public PoolDataSource myPoolDataSource() {
    	PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
    	try {
    		ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
                  Properties props = new Properties();
    		props.put("oracle.net.wallet_location", "<your-wallet-location>");
    		props.put("oracle.net.tns_admin","<tnsnames.ora file location>");
    		ds.setConnectionProperties(props);
    		ds.setURL("jdbc:oracle:thin:@<tnsname-entry> ");
    	} 
    	catch (Exception ex) { 
                  ex.printStackTrace();
           }
    	return ds;
    }
    
    @Bean
    public ConnectionFactory myConnectionFactory(PoolDataSource ds) {
    	ConnectionFactory connectionFactory = null;
    	try {
    		connectionFactory = AQjmsFactory.getConnectionFactory(ds);
    	} 
           catch (Exception ex) { 
                  ex.printStackTrace(); 
           }
    	return connectionFactory;
    }
    

    If application is configured to use Spring-Data-JDBC module, then the PoolDataSource bean may be created by Spring's application.properties file.

    spring.datasource.url=jdbc:oracle:thin:@<tnsname-alias>
    spring.datasource.oracleucp.connection-properties.oracle.net.wallet_location=<wallet-location>
    spring.datasource.oracleucp.connection-properties.oracle.net.tns_admin=<tns-admin-location>
    spring.datasource.driver-class-name=oracle.jdbc.OracleDriver
    spring.datasource.type=oracle.ucp.jdbc.PoolDataSource
    spring.datasource.oracleucp.connection-factory-class-name=oracle.jdbc.pool.OracleDataSource
    

    See Also:

    Spring Data JDBC

  4. The JMSTemplate bean created by Spring-JMS can be used to produce a message into TxEventQ.

    // context is the Spring application’s context
    JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
    jmsTemplate.send("SPRINGQUEUE", s -> {
    	TextMessage msg = s.createTextMessage("Hello TxEventQ");
    	return msg;
    });
    
  5. Create MessageListenerContainer to consume messages from TxEventQ.

    A JmsListener by default, creates a DefaultMessageListenerContainer, which consumes messages from a specified destination.

     @JmsListener(destination = " SPRINGQUEUE")
     public void processMessage(Message msg) throws JMSException {
            	System.out.println("Received message: " + msg.getBody(String.class));
     }
    

The following example is a complete Spring application, which produces a message into TxEventQ using JmsTemplate and consumes the message through JmsListener. This example assumes the use of Spring Boot 3+.

Example: TestApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.core.JmsTemplate;
import oracle.jakarta.jms.AQjmsFactory;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;
import org.springframework.jms.annotation.JmsListener;
import jakarta.jms.*;
import java.util.Properties;

@SpringBootApplication
public class TestApplication {
	public static void main(String[] args) {
		ConfigurableApplicationContext ctx = SpringApplication.run(TestApplication.class, args);
		JmsTemplate jmsTemplate = ctx.getBean(JmsTemplate.class);
              	jmsTemplate.send("SPRINGQUEUE", s -> s.createTextMessage("Hello TxEventQ"));
	}

@Bean
	public PoolDataSource myPoolDataSource() {
        PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
		try {
			ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
			Properties props = new Properties();
			 props.put("oracle.net.wallet_location", "<your-wallet-location>");
			 props.put("oracle.net.tns_admin","<tnsnames.ora file location>");
			 ds.setConnectionProperties(props);
			 ds.setURL("jdbc:oracle:thin:@<tnsname-entry> ");
	     } 
            catch (Exception ex) { 
              		ex.printStackTrace();
 	    }
	     return ds;
 	}

@Bean
public ConnectionFactory myConnectionFactory(PoolDataSource ds) {
	  ConnectionFactory connectionFactory = null;
		try {
			connectionFactory = AQjmsFactory.getConnectionFactory(ds);
		 } 
              catch (Exception ex) { 
              		ex.printStackTrace(); 
 		}
		return connectionFactory;
 	}
	@JmsListener(destination = "SPRINGQUEUE")
       public void processMessage(Message msg) throws JMSException {
        		System.out.println("Received message: " + msg.getBody(String.class));
 	}
}

See Also:

Spring JMS

Transactional Operations with Spring JMS and Transactional Event Queues

Using Oracle TxEventQs, Oracle Database DML operations can be performed along with standard JMS queuing operations in a single local transaction without two phase commit.

With Spring JMS framework, while using TxEventQs, application developers can atomically perform an Oracle database operation along with JMS Produce or JMS Consume. Spring application must use transacted JMS Session for this. JMS session for Oracle TxEventQs provides access to a JDBC connection which it uses to produce or consume JMS Message. Any DML operation performed using this JDBC connection will be committed or rolled back along with JMS operation performed by this JMS session. Application depends on Session aware Consumer function while consuming message from Spring JMS. The following example demonstrates this aspect which uses Spring JMS SessionAwareMessageListener to get access to JMS session which consumed a message inside a Spring MessageListenerContainer. Here, AQ-JMS Session's getDBConnection() method provides access to the Database connection which was used to consume the message. The DML operations performed will be committed when the container commits the session after executing the callback.

@JmsListener(destination = "SPRINGQUEUE", containerFactory = "myContainerFactory", concurrency = "3-5")
public void receiveMessage(String msg, Session s) throws Exception {
        java.sql.Connection conn = null;
        try {
            conn = ((AQjmsSession) s).getDBConnection(); // obtain the JDBC connection from JMS session
            String sql = "INSERT INTO messages (contents) VALUES (?)";
            PreparedStatement preparedStatement = conn.prepareStatement(sql);
            preparedStatement.setString(1, msg);
            int row = preparedStatement.executeUpdate();
            System.out.println("Number of rows: " + row);
            // Perform DML operations using conn and/or JMS operations using s 
        } 
        catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
}

@Bean
public DefaultJmsListenerContainerFactory myContainerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setSessionTransacted(true); // Use transacted sessions
        return factory;
}

Same transactional semantics can be applied while producing the message into Oracle Transactional Event Queue with JMSTemplate's send API. Here the JMS Session must be set as transacted. An example is as follows.

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(JmsTransactionApplication.class, args);
        JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
        jmsTemplate.setSessionTransacted(true);
        jmsTemplate.send("myqueue", s -> {
            java.sql.Connection conn = null;
            try {
                conn = ((AQjmsSession)s).getDBConnection();
                assert(conn != null);
                String sql = "INSERT INTO messages (contents) VALUES (?)";
                java.sql.PreparedStatement preparedStatement = conn.prepareStatement(sql);
                preparedstatement.setString(1,"Test payload for transactional message produce");
                int row = preparedStatement.executeUpdate();
                System.out.println("Number of rows: " + row);
            } catch (Exception e) {
                System.out.println("Got Exception: " + e);
                e.printStackTrace();
            }
            return s.createTextMessage("Test for Transactional produce and DML operation");
        });
        System.out.println("Sent message");
    }

Spring Cloud Stream Binder for Transactional Event Queues

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.

TxEventQs has an implementation for this Spring Cloud Stream Binder which is available in maven repository. This allows microservices implemented using Spring framework to seamlessly use TxEventQs.

The following sections describe how to configure and use the Spring Cloud Stream Binder.

Configure and Use Spring Cloud Stream Binder for Transactional Event Queues

To configure and use Spring Cloud Stream Binder for Transactional Event Queues, perform the following steps.

  1. Download and plug Spring Cloud Stream Binder for Transactional Event Queue from Maven into your Spring Cloud Stream application.

  2. Set up connectivity with Oracle Database.
    1. Create an Oracle wallet.
    2. Set up Application Properties file for connecting with Oracle Database.
      spring.datasource.url=jdbc:oracle:thin:@<tnsname-alias>
      spring.datasource.oracleucp.connection-properties.oracle.net.wallet_location=<wallet-location>
      spring.datasource.oracleucp.connection-properties.oracle.net.tns_admin=<tns-admin-location>
      spring.datasource.driver-class-name=oracle.jdbc.OracleDriver
      spring.datasource.type=oracle.ucp.jdbc.PoolDataSource
      spring.datasource.oracleucp.connection-factory-class-name=oracle.jdbc.pool.OracleDataSource
      
  3. Configure consumer binding properties to consume messages from Transactional Event Queues. Spring cloud stream uses bindings to interface with external messaging systems. A consumer binding can be created using a functional java.util.Consumer bean.
    @Bean
        public Consumer<String> txeventqconsumer() {
    	      return m -> System.out.println("Received: " + m);
    	}
    
    A sample configuration of the corresponding input binding txeventqconsumer-in-0 created by spring-cloud-stream:
    spring.cloud.stream.bindings.txeventqconsumer-in-0.destination=STREAMDESTINATION
    spring.cloud.stream.bindings.txeventqconsumer-in-0.group=STREAMGROUP
    spring.cloud.stream.bindings.txeventqconsumer-in-0.consumer.concurrency=2
    

    Note:

    • The destination names for TxEventQ binder for Spring Cloud Stream are case-sensitive.
    • The destination is created inside TxEventQ if it does not exist already.
  4. Configure producer binding to produce the message to Transactional Event Queues. A producer binding can be created using a functional java.util.Supplier bean.

    @Bean
        public Supplier<String> txeventqsupplier() {
    		return () -> "Hello TxEventQ";
    	}
    
    A sample configuration of the corresponding binding txeventqsupplier-out-0 created by Spring Cloud Stream:
    spring.cloud.stream.bindings.txeventqsupplier-out-0.destination=STREAMDESTINATION
    spring.cloud.stream.bindings.txeventqsupplier-out-0.producer.requiredGroups=STREAMGROUP
    

Transactional Operations with Spring Cloud Stream Binder for Transactional Event Queues

This section explains how a database operation can be done atomically along with sending a message or consuming a message. This involves using TxEventQ Binder specific features in the application. It supported only by latest binder version 0.12.0+.

To perform DML operations atomically along with message send or receive operations, the functional callbacks must work directly on the Spring's Message. For Consumer, TxEventQUtils class can be used to obtain a JDBC connection tied to the message consumption operation. Any DML operation performed using this JDBC connection will be performed within the same single local transaction in which the message was consumed.

@Bean
	public Consumer<Message<String>> consume() {
		return to -> {
			System.out.println("Received: " + to);
			Connection c = TxEventQUtils.getDBConnection(to);
			String sql = "INSERT INTO messages (contents) VALUES (?)";
			PreparedStatement preparedStatement;
			try {
				preparedStatement = c.prepareStatement(sql);
				preparedStatement.setString(1, to);
				preparedStatement.executeUpdate();
			} catch (SQLException e) {
				e.printStackTrace();
			}
		};
	}

For a Supplier bean, the DML operations can be specified as a JDBC connection consumer which is tied to the Spring Message being produced using setConnectionCallback API of TxEventQMessageBuilder utility class. The JDBC connection consumer will be executed atomically along with the message send operation (to the transactional event queue) by the transactional event queue binder.

@Bean
	public Supplier<Message<String>> produce1() {
		return () -> {
			String to = "Hello TxEventQ";
			Consumer<Connection> callback = c -> {
				String sql = "INSERT INTO messages (contents) VALUES (?)";
				PreparedStatement preparedStatement;
				try {
					preparedStatement = c.prepareStatement(sql);
					preparedStatement.setString(1, to);
					preparedStatement.executeUpdate();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			};
			return TxEventQMessageBuilder
				.withPayload(to)
				.setConnectionCallback(callback)
				.build();
		};
	}

For Function beans, both the consumed and the produced messages should be of type Spring's Message. In this case, the specified DML operations will be executed in the same transaction as the message consume and produce operations. All these will be executed in a single local transaction. The DML operation can be specified as a consumer callback for a JDBC connection. Specify the connection callback to the output message being produced along with the message which is consumed by the Function callback using setConnectionCallbackContext method of TxEventQMessageBuilder utility class. This ensures that both the DML operation(s) and message produce are in the same transaction as the message consume operation.

@Bean
	public Function<Message<String>, Message<String>> func() {
		return inputMessage -> {
			String to = inputMessage.getPayload();
			Consumer<Connection> callback = conn -> {
				String sql = "INSERT INTO messages (contents) VALUES (?)";
				PreparedStatement ps;
				try {
					ps = conn.prepareStatement(sql);
					ps.setString(1, to);
					int row = ps.executeUpdate();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			};
			return TxEventQMessageBuilder
				.withPayload(to)
				.setConnectionCallbackContext(callback, inputMessage)
				.build();
		};
	}

Binder Properties Supported by Transactional Event Queues

The Spring Cloud Stream Binder for TxEventQ supports most of the configuration options provided by Spring Cloud Stream. Refer to the official Spring Cloud Stream documentation for more details.

Additional helpful properties are defined by the binder to support batched consumers.

spring.cloud.stream.txeventq.bindings.<bindingName>.consumer.batchSize
spring.cloud.stream.txeventq.bindings.<bindingName>.consumer.timeout

The timeout property allows to specify the maximum wait time in milliseconds for the consumer thread(s) for each consume operation (whether batched or otherwise). The default timeout is 1000 milliseconds.

For batched consumers which consume a list of messages in a single callback, the batchSize property allows to specify the maximum number of messages that can be consumed in a single execution of the callback. This property is ignored if batch-mode is set to false.

Limitations or Unsupported Features

This section explains limitation of TxEventQ binder in 26ai and 19c databases.

  • The Spring Cloud Stream binder for Oracle TxEventQs supports Spring Boot 3+ (Spring 6+) applications. Applications using earlier versions of Spring are not supported.

  • In 19c release, queue names containing special characters such as hyphen(-) and dot(.) are not allowed.

  • The general behavior of the binder is that it consumes messages from the exact partition specified in instanceIndex property for a consumer binding. This is not true if Oracle Database 19c is used. For 19c, the messages may be consumed from any partition and this property is ignored.

  • The binder does not support consuming messages from a specific partition in case of an Oracle RAC cluster with multiple instances.

  • The multiplex property for consumer binding as specified in Spring Cloud Stream documentation is not supported. Instead, use a comma separated list of destination names in the destination property for the binding. An error is thrown if this property is set to true for consumer bindings.

  • The instanceIndexList property for consumer binding to consume messages from multiple partitions through single consumer binding is not supported as TxEventQ is a partitioned broker.