6 Managing Streams

A Stream is a source of continuous and dynamic data. The data can be from a wide variety of data sources such as IoT sensors, transaction or activity log files, point-of-sale devices, ATM machines, transactional databases, or information from geospatial services or social networks.

GGSA supports the following stream types:

6.1 Creating a File Stream

To create a File stream:

  1. On the Catalog page, click Create New Item.
  2. Hover the mouse over Stream and select File from the submenu.
  3. On the Type Properties screen, enter the following details:
    • Name: Enter a unique name for the stream. This is a mandatory field.
    • Display Name: Enter a display name for the stream. If left blank, the Name field value is copied.
    • Description
    • Tags
    • Stream Type: The selected stream is displayed.
  4. Click Next.
  5. On the Source Details screen, enter the following details:
    • File: Upload the CSV or JSON sample file to be used.

      Note:

      Use File stream only for POCs and quick prototyping
    • Read whole content: Select this option to read all the records in the file, at once. If you uncheck this option, the engine reads one record at a time.

    • Number of events per batch: Enter the number of records that you want to process per batch. The default value is one, but you can specify the number of records to process in each read. You can use this option only when Read Whole Content is unchecked.

    • Loop: Select this option to process the file in a loop.

    • Data Format: Select CSV or JSON as the data format.

  6. Click Next.
  7. On the Data Format screen, set the attributes for the selected the data format.
    • For JSON data format:
      • Allow Missing Column Names: Select this option to allow an input stream that has a column undefined in the shape.
      • Array in Multi-lines: Select this option to allow multi-line data formatting.
    • For CSV data format:
      • CSV Predefined Format: Select one of the predefined data format from the drop-down list. For more information, see Predefined CSV Data Formats.
      • First record as header: Select this option to use the first record as the header row.
  8. Click Next.
  9. On the Shape screen, select one of the methods to define the shape:
    • Infer Shape : Select this option to detect the shape automatically from the input data stream.

    • Select Existing Shape: Select one of the existing shapes from the drop-down list.

    • Manual Shape : Select this option to infer the fields from a stream or file. You can also update the datatype of the fields.

      Note:

      • To retrieve the entire JSON payload, add a new field with path $.
      • To retrieve the content of the array, add a new field with path $[arrayField].

      In both the cases, the value returned is Text.

    • From File: Select this option to infer the shape from a JSON schema file, or a JSON or CSV data file. You can also save the auto-detected shape and use it later.
  10. Click Save.

6.2 Creating a GoldenGate Stream

To create a GoldenGate stream:

  1. On the Catalog page, click Create New Item.
  2. Hover the mouse over Stream and select GoldenGate from the submenu.
  3. On the Type Properties screen, enter the following details:
    • Name: Enter a unique name for the stream. This is a mandatory field.
    • Display Name: Enter a display name for the stream. If left blank, the Name field value is copied.
    • Description
    • Tags
    • Stream Type: The selected stream is displayed.
  4. Click Next.
  5. On the Source Type page, enter the following details:
    • Connection: Select a GG Change Data.

    • Table name: Enter a valid table name that includes the period (.) delimiter between the catalog, schema, and table names. For example, test.dbo.table1

    • Generate Full Records: Select this option to stream full data record (value of all fields), irrespective of the database transactional changes to a single column, a subset, or all the columns of a row.
      • Database Connection: Select a GoldenGate sourced database connection.
      • Enable Cache: Select this option to enable caching for GoldenGate Full Records, to enhance its performance.
  6. Click Next.
  7. On the Shape screen, select one of the methods to define the shape:
    • Infer Shape : Select this option to detect the shape automatically from the input data stream.

    • Select Existing Shape: Select one of the existing shapes from the drop-down list.

    • Manual Shape : Select this option to manually infer the fields from a stream or file. You can also update the datatype of the fields.

      Note:

      • To retrieve the entire JSON payload, add a new field with path $.
      • To retrieve the content of the array, add a new field with path $[arrayField].

      In both the cases, the value returned is Text.

    • From Stream: Select this option to detect the shape based on the table shape selected in the previous screen.
    • From File: Select this option to infer the shape from a JSON file. You can also save the auto-detected shape and use it later.
  8. Click Save.

Note:

The difference between a Kafka stream and a GoldenGate stream is that the pipeline constructs, like the Query Group Table, understands the GoldenGate syntax and associates it with the relevant GoldenGate fields.

6.3 Creating a JMS Stream

Prerequisite: A JMS connection.

To create a JMS stream:

  1. On the Catalog page, click Create New Item.
  2. Hover the mouse over Stream and select JMS from the submenu.
  3. On the Type Properties screen, enter the following details:
    • Name: Enter a unique name for the stream. This is a mandatory field.
    • Display Name: Enter a display name for the stream. If left blank, the Name field value is copied.
    • Description
    • Tags
    • Stream Type: The selected stream is displayed.
  4. Click Next.
  5. On the Source Type page, enter the following details:
    • Connection: Select an existing JNDI connection for the stream

    • Connection Factory: Enter a value for the connection factory. A ConnectionFactory encapsulates connection configuration information, and enables JMS applications to create a Connection. The default value is weblogic.jms.ConnectionFactory.

      Note:

      GGSA can read messages from Oracle Advanced Queue. This option is available as a general JMS connector - oracle.jms.AQjmsInitialContextFactory.
    • Jndi name: Enter the name of the Java interface that reads messages from topics, distributed topics, queues and distributed queues

    • Client ID: Enter the unique client ID to be used for a durable subscriber. If you do not provide this value, subscriber ID is used as a clientID to create a durable subscriber.

    • Message Selector : Set the message selector to filter messages. Message selectors assign the work of filtering messages to the JMS provider rather than to the application.

      If your messaging application needs to filter the messages it receives, you can use a JMS API message selector. A message selector is a String that contains an expression. The syntax of the expression is based on a subset of the SQL92 conditional expression syntax. The message selector in the following example selects any message that has a NewsType property that is set to the value Sports or Opinion:

      NewsType = ’Sports’ OR NewsType = ’Opinion’

      The createConsumer and createDurableSubscriber methods allow you to specify a message selector as an argument when you create a message consumer.

    • Subscription ID: Enter the unique subscription ID for durable selector. This value is essential for durable subscriber.

      Note:

      When you specify both clientID and subscriberID, you can have only one running pipeline consuming that stream. If you need multiple subscribers/pipelines, remove clientID or subscriberName from the stream or create different streams (with different clientID and subscriberName) for multiple pipelines.
    • Data Format: Select the data format from the drop-down list. The supported formats are: CSV, JSON, AVRO, MapMessage.

      A MapMessage object is used to send a set of name-value pairs. The names are String objects, and the values are primitive data types in the Java programming language. The names must have a value that is not null, and not an empty string. The entries can be accessed sequentially or randomly by name. The order of the entries is undefined.

  6. Click Next.
  7. On the Data Format screen, enter the shape details for the stream, based on the data format you have selected.
    • For JSON:
      • Allow Missing Column Names: Select this option to allow an input stream that has a column undefined in the shape.
    • For CSV:
      • CSV Predefined Format: Select one of the predefined data format from the drop-down list. For more information, see Predefined CSV Data Formats.
      • First record as header: Select this option to use the first record as the header row.
    • For AVRO:
      • Schema Namespace: Enter the schema name combined with the namespace, to uniquely identify the schema within the store.
      • Schema (optional): Upload a schema file to infer shape from.
    • If you selected MapMessage as the data format, there are no specific attributes to be set on this screen. The Data Format screen is skipped, and you are redirected to the Shape screen.
  8. Click Next.
  9. On the Shape screen, select one of the methods to define the shape:
    • Infer Shape : Select this option to detect the shape automatically from the input data stream.

    • Select Existing Shape: Select one of the existing shapes from the drop-down list.

    • Manual Shape: Select this option to manually infer the fields from a stream or file. You can also update the datatype of the fields.

      Note:

      • To retrieve the entire JSON payload, add a new field with path $.
      • To retrieve the content of the array, add a new field with path $[arrayField].

      In both the cases, the value returned is of type Text.

    • From File: Select this option to infer the shape from a JSON schema file, or a JSON or CSV data file. You can also save the auto-detected shape and use it later.
  10. Click Save.

JMS Server Clean-Up

GGSA creates a durable subscription with the JMS provider, when you create a JMS stream and select the durable subscription option. When you unpublish or kill a pipeline that is using this stream, the durable subscription still remains on the JMS Server. It is advisable to delete the durable subscription from the JMS Server and clean up the resources, if you do not intend to publish the pipeline anymore.

6.4 Creating a Kafka Stream

Prerequisite: A Kafka connection.

To create a Kafka stream:

  1. On the Catalog page, click Create New Item.
  2. Hover the mouse over Stream and select Kafka from the submenu.
  3. On the Type Properties screen, enter the following details:
    • Name: Enter a unique name for the stream. This is a mandatory field.
    • Display Name: Enter a display name for the stream. If left blank, the Name field value is copied.
    • Description
    • Tags
    • Stream Type: The selected stream is displayed.
  4. Click Next.
  5. On the Source Details screen, enter the following details:
    • Connection: Select a Kafka connection for the stream.

    • Topic name: Enter a name for the kafka topic that will store the stream.

    • Data Format: Select CSV, JSON, or AVRO as the data format for the stream.

      for each format type:
  6. Click Next.
  7. On the Data Format screen, enter the shape details for the stream, based on the data format you have selected.
    • For JSON:
      • Allow Missing Column Names: Select this option to allow an input stream that has a column undefined in the shape.
    • For CSV:
      • CSV Predefined Format: Select one of the predefined data formats from the drop-down list. For more information, see Predefined CSV Data Formats.
      • First record as header: Select this option to use the first record as the header row.
    • For AVRO:
      • Schema Namespace: Enter the schema name combined with the namespace, to uniquely identify the schema within the store.
      • Schema (optional): Upload a schema file to infer shape from.
  8. Click Next.
  9. On the Shape screen, select one of the methods to define the shape:
    • Infer Shape : Select this option to detect the shape automatically from the input data stream.

    • Select Existing Shape: Select one of the existing shapes from the drop-down list.

    • Manual Shape : Select this option to manually infer the fields from a stream or file. You can also update the datatype of the fields.

      Note:

      • To retrieve the entire JSON payload, add a new field with path $.
      • To retrieve the content of the array, add a new field with path $[arrayField].

      In both the cases, the value returned is of type Text.

    • From Stream: Select this option to detect the shape based on the earliest or the latest offset of the kafka topic. The default option is earliest. Use latest to infer the shape based on latest records in the Kafka topic.

      This option is currently available only for JSON data format.

    • From File: Select this option to infer the shape from Kafka, a JSON schema file, or a JSON or CSV data file. You can also save the auto-detected shape and use it later.

      This option is enabled if you have selected CSV as the data format.

    • From Schema: Select this option to infer the shape based on the schema you selected in Step 6. This option is enabled if you have selected AVRO as the data format.
  10. Click Save.

6.5 Updating a Stream

To update a stream:
  1. Go to the Catalog page and click the stream that you want to update.
  2. On the Edit Stream screen, click the Edit link corresponding to the following sections, and make the necessary changes.
    • Source Details
    • Source Type Parameters
    • Data Type Parameters
    • Source Shape
  3. Click Save.

6.6 Deleting a Stream

To delete a stream:
  1. On the Catalog page, hover the mouse over the stream that you want to delete.
  2. Click the Deleteicon that appears to your right side on the screen.
  3. On the Delete Confirmation screen, click Delete.

6.7 Application Timestamp

When defining a Stream, you can mark one of the fields in the payload, as an Application Timestamp. To do this, click the clock icon next to the field. This action advances the time by the application, rather than the system; the window ranges and slides are all controlled by the selected field. The application timestamp is available only to query stages connecting directly to the stream source.

6.8 Supported Timestamp Formats in an Input Stream

The following timestamp formats, in an input stream, are supported:
  • 11/21/2005 11:14:23.111 "MM/dd/yyyy HH:mm:ss.SSS"
  • 11/21/2005 11:14:23.11 "MM/dd/yyyy HH:mm:ss.SS"
  • 11/21/2005 11:14:23.1 "MM/dd/yyyy HH:mm:ss.S"
  • 11/21/2005 11:14:23 "MM/dd/yyyy HH:mm:ss"
  • 11/21/2005 11:14 "MM/dd/yyyy HH:mm"
  • 11/21/2005 11:14 "MM/dd/yyyy HH"
  • 11/21/2005 "MM/dd/yyyy"
  • 11-21-2005 11:14:23.111 "MM-dd-yyyy HH:mm:ss.SSS"
  • 11-21-2005 11:14:23.11 "MM-dd-yyyy HH:mm:ss.SS"
  • 11-21-2005 11:14:23.1 "MM-dd-yyyy HH:mm:ss.S"
  • 11-21-2005 11:14:23 "MM-dd-yyyy HH:mm:ss"
  • 11-21-2005 11:14 "MM-dd-yyyy HH:mm"
  • 11-21-2005 11 "MM-dd-yyyy HH"
  • 11-21-2005 "MM-dd-yyyy"
  • 15-DEC-01 11.14.14.111 AM"dd-MMM-yy hh.mm.ss.SSS"
  • 15-DEC-01 11.14.14.11 "dd-MMM-yy hh.mm.ss.SS"
  • 15-DEC-01 11.14.14.1 "dd-MMM-yy hh.mm.ss.S"
  • 15-DEC-01 11.14.14 "dd-MMM-yy hh.mm.ss"
  • 15-DEC-01 11.14 "dd-MMM-yy hh.mm"
  • 15-DEC-01 11 "dd-MMM-yy hh"
  • 15-DEC-01 "dd-MMM-yy"
  • 15/DEC/01 "dd/MMM/yy"
  • 2013-10-5 15:16:0.756 "yyyy-MM-dd HH:mm:ss.SSS"
  • 2013-10-5 15.16.0.756 "yyyy-MM-dd HH.mm.ss.SSS"
  • 2013-10-5 15:16:0 "yyyy-MM-dd HH:mm:ss"
  • 2013-10-5 15.16.0 "yyyy-MM-dd HH.mm.ss"
  • 2013-10-5 15:16 "yyyy-MM-dd HH:mm"
  • 2013-10-5 15.16 "yyyy-MM-dd HH.mm"
  • 2013-10-5 15 "yyyy-MM-dd HH"
  • 2012-11-10 "yyyy-MM-dd"
  • 11:14:14 "HH:mm:ss"
  • "yyyy-MM-dd'T'HH:mm:ss'.'SSS"
  • "yyyy-MM-dd'T'HH:mm:ss"
  • 1/1/2011 "m/d/yyyy"
  • 1-1-2011 "m-d-yyyy"
  • 3/23/2019 "m/dd/yyyy"
  • 3-23-2019 "m-dd-yyyy"
  • 12/4/1982 "mm/d/yyyy"
  • 12-4-2019 "mm-d-yyyy"

Note:

The input timestamp is truncated to millisecond precision.

6.9 Predefined CSV Data Formats

Comma Separated Values (CSV) file is one of the data formats you can select for your input stream. There are variations in the CSV data format due to the different data sources. The following table lists the available predefined CSV data formats:

CSV Predefined Format Description

DEFAULT

Standard comma separated format, as for RFC4180 but allowing empty lines

EXCEL

Excel file format with comma as the value delimiter

INFORMIX_UNLOAD_CSV

Default Informix CSV UNLOAD format used by the UNLOAD TO file_name operation (escaping is disabled.) This is a comma-delimited format with a LF character as the line separator. Values are not quoted and special characters are escaped with '\'. The default NULL string is "\\N".

MYSQL

Default MySQL format used by the SELECT INTO OUTFILE and LOAD DATA INFILE operations. This is a tab-delimited format with a LF character as the line separator. Values are not quoted and special characters are escaped with '\'. The default NULL string is "\\N".

POSTGRESQL_CSV

Default PostgreSQL CSV format used by the COPY operation. This is a comma-delimited format with a LF character as the line separator. The default NULL string is "".

POSTGRESQL_TEXT

Default PostgreSQL text format used by the COPY operation. This is a tab-delimited format with a LF character as the line separator. The default NULL string is "\\N".

RFC4180

Comma separated format as defined by RFC4180

TDF

Tab-delimited format