AcknowledgeTxEventQNotification
This instance method sends an acknowledgement for a TxEventQ notification.
Declaration
// C# public void AcknowledgeTxEventQNotification( OracleTxEventQNotificationDirective directive);
Parameters
directive
- Specifies the OracleTxEventQNotificationDirective
enumeration value.
Exceptions
InvalidOperationException
is thrown when:
-
the CIC connection is closed,
-
registration is not found, or
-
acknowledgement is being sent to an AQ.
Remarks
This method should be called from a callback function after receiving a notification. There are two possible values for OracleTxEventQNotificationDirective
enumeration, OracleTxEventQNotificationDirective.Commit
or OracleTxEventQNotificationDirective.Rollback
.
Acknowledgements are meant to be sent to TxEventQ only but not to an AQ queue. The callback method is invoked using a separate thread so its a good practice to catch exceptions otherwise the issues may go unnoticed.
In TxEventQ notifications, we get payload as part of the notification. In TxEventQ, each payload is associated with one of the three states below:
-
Ready
: This means message is available to be dequeued. As soon as a notification is sent for this message, the state gets changed toAcknowledgement_waiting
. -
Acknowledgement_waiting
: Notification for this message is sent and currently the TxEventQ is waiting for an acknowledgement from client application. -
Processed
- This message is dequeued and it is not available for further dequeue.
When we send a Commit
acknowledgement, all messages in the Acknowledgement_waiting
state are marked as Processed
.
When we send a Rollback
acknowledgement, all messages in the Acknowledgement_waiting
state are marked as Ready
.
If message's state is Acknowledgement_waiting
and the connection between application-server is terminated or crashed, then the state of these messages will automatically be reset as Ready
.
Example
/***********Oracle DB Setup*********** declare qprops sys.dbms_aqadm.QUEUE_PROPS_T; BEGIN qprops.sort_list := 'enq_time'; sys.dbms_aqadm.create_transactional_event_queue (queue_name => 'raw_txeventq', queue_payload_type => 'RAW', queue_properties => qprops); END; / BEGIN dbms_aqadm.start_queue( queue_name => 'raw_txeventq'); END; / **************************************/
// C# using Oracle.ManagedDataAccess.Client; using Oracle.ManagedDataAccess.Types; namespace TxEventQ_notification_test { class Program { bool IsNotified = false; static Random rnd = new Random(); public static void Main(String[] args) { string constr = "user id=scott;password=tiger;data source=oracle"; OracleConnection con = new OracleConnection(constr); { con.Open(); using (OracleAQQueue queue = new OracleAQQueue("raw_txeventq", con, OracleAQMessageType.Raw)) { try { queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.Immediate; queue.EnqueueOptions.DeliveryMode = OracleAQMessageDeliveryMode.Persistent; Program obj = new Program(); queue.MessageAvailable += new OracleAQMessageAvailableEventHandler(obj.OnMyNotificationReceived); Console.WriteLine("Registration Done, please enqueue message inside \"raw_txeventq\" either from this application or any other application"); while(!obj.IsNotified) Thread.Sleep(1000); Console.WriteLine("Notification Received"); } catch (Exception ex) { Console.WriteLine(ex.ToString()); } } } } public void OnMyNotificationReceived(object src, OracleAQMessageAvailableEventArgs arg) { Console.WriteLine("********Notification Received********"); int tmp = rnd.Next(2); try { if (tmp == 0) { Console.WriteLine("Sending Commit Ack"); arg.Queue.AcknowledgeTxEventQNotification(OracleTxEventQNotificationDirective.Commit); } else { Console.WriteLine("Sending Rollback Ack"); arg.Queue.AcknowledgeTxEventQNotification(OracleTxEventQNotificationDirective.Rollback); } } catch (Exception ex) { Console.WriteLine("?????????????Exception in Callback Method??????????????????\n" + ex.Message); } finally { IsNotified = true; } } } }