Release 2 (9.2)
Part Number A96571-01
This chapter explains the concepts relating to staging events in a queue and propagating events from one queue to another.
This chapter contains these topics:
Streams uses queues of type
SYS.AnyData to stage events. There are two types of events that can be staged in a Streams queue: logical change records (LCRs) and user messages. LCRs are objects that contain information about a change to a database object, while user messages are custom messages created by users or applications. Both types of events are of type
SYS.AnyData and can be used for information sharing within a single database or between databases.
Staged events can be consumed or propagated, or both. These events can be consumed by an apply process or by a user application that explicitly dequeues them. Even after an event is consumed, it may remain in the queue if you have also configured Streams to propagate the event to one or more other queues or if message retention is specified. These other queues may reside in the same database or in different databases. In either case, the queue from which the events are propagated is called the source queue, and the queue that receives the events is called the destination queue. There can be a one-to-many, many-to-one, or many-to-many relationship between source and destination queues. Figure 3-1 shows propagation from a source queue to a destination queue.
You can create, alter, and drop a propagation job, and you can define propagation rules that control which events are propagated. The user who owns the source queue is the user who propagates events. This user must have the necessary privileges to propagate events. These privileges include the following:
If the propagation job propagates events to a destination queue in a remote database, then the owner of the source queue must be able to use the propagation job's database link and the user to which the database link connects at the remote database must have enqueue privilege on the destination queue.
Oracle9i Application Developer's Guide - Advanced Queuing for more information about message retention
Events can be enqueued in two ways:
SYS.AnyData. These user messages can contain LCRs or any other type of message. Any user message that was explicitly enqueued by a user or an application is called a user-enqueued event. Events that were enqueued by a user procedure called from an apply process are also user-enqueued events.
So, each captured event contains an LCR, but a user-enqueued event may or may not contain an LCR.
Propagating a captured event enqueues it into the destination queue. Propagating a user-enqueued event enqueues it into the destination queue.
Events can be dequeued in two ways:
The dequeued events may have originated at the same database where they are dequeued, or they may have originated at a different database.
You can use Streams to configure event propagation between two queues, which may reside in different databases. Streams uses job queues to propagate events.
A propagation is always between a source queue and a destination queue. Although propagation is always between two queues, a single queue may participate in many propagations. That is, a single source queue may propagate events to multiple destination queues, and a single destination queue may receive events from multiple source queues. However, only one propagation is allowed between a particular source queue and a particular destination queue. Also, a single queue may be a destination queue for some propagations and a source queue for other propagations.
A propagation may propagate all of the events in a source queue to the destination queue, or a propagation may propagate only a subset of the events. Also, a single propagation job can propagate both captured and user-enqueued events. You can use rules to control which events in the source queue are propagated to the destination queue.
All destination queues at a database receive events from a single source queue through a single propagation job. By using a single propagation job for multiple destination queues, Streams ensures that an event is sent to a destination database only once, even if the same message is received by multiple destination queues in the same database. Communication resources are conserved because messages are not sent more than once to the same database.
Depending on how you set up your Streams environment, changes could be sent back to the site where they originated. You need to ensure that your environment is configured to avoid cycling the change in an endless loop. You can use Streams tags to avoid such a change cycling loop.
A propagation job propagates events based on rules that you define. For LCR events, each rule specifies the database objects for which the propagation job propagates changes and the types of changes to propagate. You can specify propagation rules for LCR events at the following levels:
For non-LCR events, you can create your own rules to control propagation.
A queue subscriber that specifies a condition causes the system to generate a rule. The rule sets for all subscribers to a queue are combined into a single system-generated rule set to make subscription more efficient.
A propagation schedule specifies how often a propagation job propagates events from a source queue to a destination queue. A default propagation schedule is established for the new propagation job when you create the propagation job using one of the following procedures:
ADD_GLOBAL_PROPAGATION_RULEprocedure in the
ADD_SCHEMA_PROPAGATION_RULEprocedure in the
ADD_TABLE_PROPAGATION_RULEprocedure in the
CREATE_PROPAGATIONprocedure in the
The default schedule has the following properties:
NULL, which means infinite.
NULL, which means that propagation restarts as soon as it finishes the current duration.
If you want to alter the default schedule for a propagation job, then use the
ALTER_PROPAGATION_SCHEDULE procedure in the
A user-enqueued event is propagated successfully to a destination queue when the enqueue into the destination queue is committed. A captured event is propagated successfully to a destination queue when both of the following actions are completed:
When an event is successfully propagated between two Streams queues, the destination queue acknowledges successful propagation of the event. If the source queue is configured to propagate an event to multiple destination queues, then the event remains in the source queue until each destination queue has sent confirmation of event propagation to the source queue. When each destination queue acknowledges successful propagation of the event, and all local consumers in the source queue database have consumed the event, the source queue can drop the event.
This confirmation system ensures that events are always propagated from the source queue to the destination queue, but, in some configurations, the source queue can grow larger than an optimal size. When a source queue grows, it uses more SGA memory and may use more disk space.
There are two common reasons for source-queue growth:
A directed network is one in which propagated events may pass through one or more intermediate databases before arriving at a destination database. An event may or may not be processed by an apply process at an intermediate database. Using Streams, you can choose which events are propagated to each destination database, and you can specify the route that events will traverse on their way to a destination database. Figure 3-2 shows an example directed networks environment.
The advantage of using a directed network is that a source database need not have a physical network connection with the destination database. So, if you want events to propagate from one database to another, but there is no direct network connection between the computers running these databases, then you can still propagate the events without reconfiguring your network, as long as one or more intermediate databases connect the source database to the destination database.
If you use directed networks, and an intermediate site goes down for an extended period of time or is removed, then you may need to reconfigure the network and the Streams environment.
An intermediate database in a directed network may propagate events using queue forwarding or apply forwarding. Queue forwarding means that the events being forwarded at an intermediate database are the events received by the intermediate database. The source database for an event is the database where the event originated.
Apply forwarding means that the events being forwarded at an intermediate database are first processed by an apply process. These events are then recaptured by a capture process at the intermediate database and forwarded. When you use apply forwarding, the intermediate database becomes the new source database for the event because the event is recaptured there.
Consider the following differences between queue forwarding and apply forwarding when you plan your Streams environment:
A single Streams environment may use a combination of queue forwarding and apply forwarding.
Queue forwarding has the following advantages compared to apply forwarding:
GET_SOURCE_DATABASE_NAMEmember procedure on the LCR contained in the event. If you use apply forwarding, then determining the origin of an event requires the use of Streams tags and apply handlers.
If you use apply forwarding, then substantially more work may be required to reconfigure end-to-end capture, propagation, and apply of events, because the destination database(s) downstream from the unavailable intermediate database were using the SCN information of this intermediate database. Without this SCN information, the destination databases cannot apply the changes properly.
Apply forwarding has the following advantages compared to queue forwarding:
Streams enables messaging with queues of type
SYS.AnyData. These queues are called Streams queues. Streams queues can stage user messages whose payloads are of
SYS.AnyData type. A
SYS.AnyData payload can be a wrapper for payloads of different datatypes. A queue that can stage messages of only a particular type are called typed queues.
SYS.AnyData wrappers for message payloads, publishing applications can enqueue messages of different types into a single queue, and subscribing applications can dequeue these messages, either explicitly using a dequeue API or implicitly using an apply process. If the subscribing application is remote, then the messages can be propagated to the remote site, and the subscribing application can dequeue the messages from a local queue in the remote database. Alternatively, a remote subscribing application can dequeue messages directly from the source queue using a variety of standard protocols, such as PL/SQL and OCI.
Streams interoperates with Advanced Queuing (AQ), which supports all the standard features of message queuing systems, including multi-consumer queues, publish and subscribe, content-based routing, internet propagation, transformations, and gateways to other messaging subsystems.
You can wrap almost any type of payload in a
SYS.AnyData payload. To do this, you use the
data_type static functions of the
SYS.AnyData type, where
data_type is the type of object to wrap. These functions take the object as input and return a
The following types of payloads cannot be wrapped in a
Your applications can use the following programmatic environments to enqueue user messages into a Streams queue and dequeue user messages from a Streams queue:
The following sections provide information about using these interfaces to enqueue user messages into and dequeue user messages from a Streams queue.
Oracle9i Application Developer's Guide - Advanced Queuing for more information about these programmatic interfaces
To enqueue a user message containing an LCR into a Streams queue using PL/SQL, first create the LCR to be enqueued. You use the constructor for the
SYS.LCR$_ROW_RECORD type to create a row LCR, and you use the constructor for the
SYS.LCR$_DDL_RECORD type to create a DDL LCR. Then you use the
SYS.AnyData.ConvertObject function to convert the LCR into
SYS.AnyData payload and enqueue it using the
To enqueue a user message containing a non-LCR object into a Streams queue using PL/SQL, you use one of the
SYS.AnyData.Convert* functions to convert the object into
SYS.AnyData payload and enqueue it using the
To enqueue a user message containing an LCR into a Streams queue using JMS or OCI, you must represent the LCR in XML format. To construct an LCR, use the
oracle.xdb.XMLType class. LCRs are defined in the
SYS schema. The LCR schema must be loaded into the
SYS schema using the
catxlcr.sql script in Oracle home in the
To enqueue a message using OCI, perform the same actions that you would to enqueue a message into a typed queue. A typed queue is a queue that can stage messages of a particular type only. To enqueue a message using JMS, a user must have
EXECUTE privilege on
A non-LCR user message can be a message of any user-defined type or a JMS type. The JMS types include the following:
When using user-defined types, you must generate the Java class for the message using Jpublisher, which implements the
ORAData interface. To enqueue a message into a Streams queue, you can use methods
To dequeue a user message from Streams queue using PL/SQL, you use the
DBMS_AQ.DEQUEUE procedure and specify
SYS.AnyData as the payload. The user message may contain an LCR or another type of object.
In a Streams queue, user messages containing LCRs in XML format are represented as
oracle.xdb.XMLType. Non-LCR messages can be one of the following formats:
To dequeue a message from a Streams queue using JMS, you can use methods
TopicReceiver. Because the queue may contain different types of objects wrapped in a
SYS.AnyData wrapper, you must register a list of SQL types and their corresponding Java classes in the typemap of the JMS session. JMS types are already preregistered in the typemap.
For example, suppose a queue can contains LCR messages represented as
oracle.xdb.XMLType and messages of type
address. The classes
JAddress.java are the
ORAData mappings for
address, respectively. Before dequeuing the message, the type map must be populated as follows:
java.util.Dictionary map = ((AQjmsSession)q_sess).getTypeMap(); map.put("SCOTT.PERSON", Class.forName("JPerson")); map.put("SCOTT.ADDRESS", Class.forName("JAddress")); map.put("SYS.XMLTYPE", Class.forName("oracle.xdb.XMLType")); // For LCRs
When using message selectors with
TopicPublisher, the selector can contain any SQL92 expression that has a combination of one or more of the following:
JMSXGroupSeq. The following is an example of a JMS message field:
To dequeue a message using OCI, perform the same actions that you would to dequeue a message from a typed queue.
SYS.AnyData queues can interoperate with typed queues in a Streams environment. A typed queue can stage messages of a particular type only. Table 3-1 shows the types of propagation possible between queues.
|Source Queue||Destination Queue||Transformation|
Note: Propagation is possible only if the messages in the typed queue meet the restrictions outlined in "User-Defined Type Messages".
Requires a rule to filter messages and a user-defined transformation
Follows Advanced Queuing (AQ) rules (see Oracle9i Application Developer's Guide - Advanced Queuing for information)
To propagate messages containing a payload of a certain type from a
SYS.AnyData source queue to a typed destination queue, you must perform a transformation. Only messages containing a payload of the same type as the typed queue can be propagated to the typed queue.
Certain Streams capabilities, such as capturing changes using a capture process and applying changes with an apply process, can be configured only with
If you plan to enqueue, propagate, or dequeue user-defined type messages in a Streams environment, then each type used in these messages must exist at every database where the message may be staged in a queue. Some environments use directed networks to route messages through intermediate databases before they reach their destination. In such environments, the type must exist at each intermediate database, even if the messages of this type are never enqueued or dequeued at a particular intermediate database.
In addition, the following requirements must be met for such types:
The object identifier (OID) need not match at each database.
You can configure a Streams queue to stage and propagate captured and user-enqueued events in a Real Application Clusters environment. In a Real Application Clusters environment, only the owner instance may have a buffer for a queue. Different instances may have buffers for different queues. Queue buffers are discussed later in this chapter. A queue buffer is System Global Area (SGA) memory associated with a Streams queue that contains only captured events.
A Streams queue that contains only user-enqueued events behaves the same as a typed queue in a Real Application Clusters environment. However, if a Streams queue contains or will contain captured events in a Real Application Clusters environment, then the environment must meet the following requirements:
SET_UP_QUEUEprocedure in the
DBMS_STREAMS_ADMpackage. Creating or altering a queue table with the
DBMS_AQADMpackage is not supported if any queue in the queue table contains captured events.
AQ_TM_PROCESSESinitialization parameter must be set to at least
1on each instance.
If the owner instance for a queue table containing a destination queue becomes unavailable, then queue ownership is transferred automatically to another instance in the cluster. If this happens, then database links from remote source queues must be reconfigured manually to connect to the instance that owns the destination queue. The
DBA_QUEUE_TABLES data dictionary view contains information about the owner instance for a queue table. A queue table may contain multiple queues. In this case, each queue in a queue table has the same owner instance as the queue table.
In general, Streams queues and propagation jobs use the infrastructure of AQ, and Streams propagation jobs are configured using the
DBMS_JOBS package. However, unlike an AQ queue, which stages all events in a queue table, a Streams queue has a queue buffer to stage captured events in shared memory. This section describes queue buffers and discusses how queue buffers are used in a Real Application Clusters environment. This section also discusses secure queues and how they are used in Streams. In addition, this section discusses how transactional queues handle captured and user-enqueued events, as well as the need for a Streams data dictionary at databases that propagate captured events.
A queue buffer is System Global Area (SGA) memory associated with a Streams queue that contains only captured events. A queue buffer enables Oracle to optimize captured events by buffering captured events in the SGA instead of always storing them in a queue table. This buffering of captured events happens in any database where captured events are staged in a Streams queue. Such a database may be a source database, an intermediate database, or a destination database. User-enqueued LCR events and user-enqueued non-LCR events are always staged in queue tables, not in queue buffers.
Queue buffers improve performance, but the contents of a queue buffer are lost if the instance containing the buffer shuts down normally or abnormally. Streams automatically recovers from these cases, assuming full database recovery is performed on the instance.
A queue buffer may overflow if there is not enough shared memory available to hold captured events. Captured events that overflow a queue buffer are stored in the appropriate queue table. If the events in a queue buffer are lost, the events spilled from the queue buffer are subsequently deleted in order to keep the queue buffer and its queue table in sync. Also, when a transaction is moved to the error queue, all events in the transaction are staged in a queue table, not in a queue buffer.
Secure queues are queues for which AQ agents must be explicitly associated with one or more database users who can perform queue operations, such as enqueue and dequeue. The owner of a secure queue can perform all queue operations on the queue, but other users cannot perform queue operations on a secure queue unless they are configured as secure queue users. In Streams, secure queues can be used to ensure that only the appropriate users and Streams processes enqueue events into a queue and dequeue events from a queue.
All Streams queues created using the
SET_UP_QUEUE procedure in the
DBMS_STREAMS_ADM package are secure queues. When you use the
SET_UP_QUEUE procedure to create a queue, any user specified by the
queue_user parameter is configured as secure queue users of the queue automatically, if possible. The queue user is also granted
DEQUEUE privileges on the queue. To enqueue events into and dequeue events from a queue, a queue user must also have
EXECUTE privilege on the
DBMS_AQ package. The
SET_UP_QUEUE procedure does not grant this privilege.
To configure the queue user as a secure queue user, the
SET_UP_QUEUE procedure creates an AQ agent with the same name as the user name, if one does not already exist. The user must use this agent to perform queue operations on the queue. If an agent with this name already exists and is associated with the queue user only, then it is used.
SET_UP_QUEUE then runs the
ENABLE_DB_ACCESS procedure in the
DBMS_AQADM package, specifying the agent and the user. If the agent that
SET_UP_QUEUE tries to create already exists and is associated with a user other than the user specified by
queue_user, then an error is raised. In this case, rename or remove the existing agent using the
DROP_AQ_AGENT procedure, respectively, in the
DBMS_AQADM package. Then, retry
When you create a capture process or an apply process, an AQ agent of the secure queue associated with the Streams process is configured automatically, and the user who runs the Streams process is specified as a secure queue user for this queue automatically. Therefore, a capture process is configured to enqueue into its secure queue automatically, and an apply process is configured to dequeue from its secure queue automatically.
For a capture process, the user who invokes the procedure that creates the capture process is the user who runs the capture process. For an apply process, the user specified as the
apply_user is the user who runs the apply process. If no
apply_user is specified, then the user who invokes the procedure that creates the apply process is the user who runs the apply process.
Also, if you change the
apply_user for an apply process using the
ALTER_APPLY procedure in the
DBMS_APPLY_ADM package, then the specified
apply_user is configured as a secure queue user of the queue used by the apply process. However, the old apply user remains configured as a secure queue user of the queue. To remove the old apply user, run the
DISABLE_DB_ACCESS procedure in the
DBMS_AQADM package, specifying the old apply user and the relevant AQ agent. You may also want to drop the agent if it is no longer needed. You can view the AQ agents and their associated users by querying the
DBA_AQ_AGENT_PRIVS data dictionary view.
If you create a
SYS.AnyData queue using the
DBMS_AQADM package, then you use the
secure parameter when you run the
CREATE_QUEUE_TABLE procedure to specify whether the queue is secure or not. The queue is secure if you specify
true for the
secure parameter when you run this procedure. When you use the
DBMS_AQADM package to create a secure queue, and you want to allow users to perform queue operations on the secure queue, then you must configure these secure queue users manually.
If you use the
SET_UP_QUEUE procedure in the
DBMS_STREAMS_ADM package to create a secure queue, and you want a user who is not the queue owner and who was not specified by the
queue_user parameter to perform operations on the queue, then you can configure the user as a secure queue user of the queue manually. Alternatively, you can run the
SET_UP_QUEUE procedure again and specify a different
queue_user for the queue. In this case,
SET_UP_QUEUE will skip queue creation, but it will configure the user specified by
queue_user as a secure queue user of the queue.
If you drop a capture process or an apply process, then the users who were configured as secure queue users for these processes remain secure queue users of the queue. To remove these users as secure queue users, run the
DISABLE_DB_ACCESS procedure in the
DBMS_AQADM package for each user. You may also want to drop the agent if it is no longer needed.
A transactional queue is one in which user-enqueued events can be grouped into a set that are applied as one transaction. That is, an apply process performs a
COMMIT after it applies all the user-enqueued events in a group. The
SET_UP_QUEUE procedure in the
DBMS_STREMS_ADM package always creates a transactional queue.
A nontransactional queue is one in which each user-enqueued event is its own transaction. That is, an apply process performs a
COMMIT after each user-enqueued event it applies. In either case, the user-enqueued events may or may not contain user-created LCRs.
The difference between transactional and nontransactional queues is important only for user-enqueued events. An apply process always applies captured events in transactions that preserve the transactions executed at the source database. Table 3-2 shows apply process behavior for each type of event and each type of queue.
|Event Type||Transactional Queue||Nontransactional Queue|
Apply process preserves the original transaction
Apply process preserves the original transaction
Apply a user-specified group of user-enqueued events as one transaction
Apply each user-enqueued event in its own transaction
When a capture process is created, a duplicate data dictionary called the Streams data dictionary is populated automatically. The Streams data dictionary is a multiversioned copy of some of the information in the primary data dictionary at a source database. The Streams data dictionary maps object numbers, object version information, and internal column numbers from the source database into table names, column names, and column datatypes when a capture process evaluates rules and creates LCRs. This mapping keeps each captured event as small as possible because the event can store numbers rather than names.
The mapping information in the Streams data dictionary at the source database may be needed to evaluate rules at any database that propagates the captured events from the source database. To make this mapping information available to a propagation job, Oracle automatically populates a multiversioned Streams data dictionary at each site that has a Streams propagation job. Oracle automatically sends internal messages that contain relevant information from the Streams data dictionary at the source database to all other databases that receive captured events from the source database.
The Streams data dictionary information contained in these internal messages in a queue may or may not be propagated by a propagation job. Which Streams data dictionary information to propagate depends on the rule set for the propagation job. When a propagation job encounters Streams data dictionary information for a table, the propagation job rule set is evaluated with partial information that includes the source database name, table name, and table owner.
If at least one rule in the rule set either evaluates to
true_rules) or could evaluate to
TRUE given more information (
maybe_rules), then the Streams data dictionary information is propagated. This rule can be either a DML rule or a DDL rule.
When Streams data dictionary information is propagated to a destination queue, it is incorporated into the Streams data dictionary at the database that contains the destination queue, in addition to being enqueued into the destination queue. Therefore, a propagation job reading the destination queue in a directed networks configuration can forward LCRs immediately without waiting for the Streams data dictionary to be populated.