36 Configuring BRM to Publish Notifications to Kafka Servers

Learn how to configure Oracle Communications Billing and Revenue Management (BRM) to publish notification events and messages to your Kafka server.

Topics in this document:

Note:

The Kafka DM is supported in BRM 12.0 Patch Set 4 and later releases.

Overview of BRM Configuration Tasks for Kafka Servers

The high-level tasks for configuring BRM to send notifications and messages to your Kafka server include:

  1. Installing and setting up Apache Kafka and Apache ZooKeeper.

    For more information, see "Apache Kafka Quickstart" on the Apache Kafka web site.

  2. Installing the BRM Kafka Data Manager (DM).

    See "Installing the BRM Kafka DM".

  3. Setting up thread pooling for the Kafka DM by editing the Kafka DM's Infranet.properties file.

    See "Configuring Thread Pooling for the Kafka DM".

  4. Enabling SSL between the Kafka DM and your Kafka Server.

    For more information, see "Enabling SSL between Kafka DM and Kafka Server".

  5. Modifying the list of notification events to send to the Payload Generator External Module (EM) by editing the pin_notify_kafka_sync file.

    See "Configuring Event Notification for Kafka Servers".

  6. Defining how the Payload Generator EM builds business events for your Kafka server by editing the payloadconfig_kafka_sync.xml file.

    See "Defining Business Events for Your Kafka Server".

  7. Mapping business events to topics in your Kafka server by editing the dm_kafka_config.xml file.

    See "Mapping Business Events to Kafka Topics".

  8. (Optional) Configuring BRM to replace the dynamic key in message payloads with the Kafka database number.

    See "Configuring the Dynamic Key Value".

  9. (Optional) Setting the name and location of the file for logging failed business events.

    See "Configuring Where to Record Failed Events".

  10. (Optional) Customizing the PCM_OP_PUBLISH_POL_PREP_EVENT policy opcode to modify notification events before they are sent to a Data Manager (DM).

    See "Customizing Notification Enrichment".

Installing the BRM Kafka DM

If you are an existing customer upgrading from BRM 12.0, BRM 12.0 Patch Set 1, BRM 12.0 Patch Set 2, or BRM 12.0 Patch Set 3 to BRM 12.0 Patch Set 4 or later, follow the instructions in "BRM Supports Integration with Apache Kafka Servers" in BRM Patch Set Release Notes to install the Kafka DM on your system.

If you are installing BRM for the first time, you can install the Kafka DM by following these instructions:

  1. Install the latest version of Apache Kafka. For instructions on downloading and installing Kafka, see "Apache Kafka Quickstart" on the Apache Kafka website.

    For the latest compatible software version, see "BRM Software Compatibility" in BRM Compatibility Matrix.

  2. Set the Kafka environment variables:

    setenv KAFKA_HOME Kafka_path
    setenv KAFKA_BOOTSTRAP_SERVER_LIST KafkaHost1:port1,KafkaHost2:port2

    where:

    • Kafka_path is the path to the directory in which the Kafka library JARs are installed.

    • KafkaHost1:port1,KafkaHost2:port2 are the hosts and ports that the Kafka client will connect to in a bootstrap Kafka cluster the first time it starts. You can specify any number of hosts and ports in this list.

      You can alternatively set this list in the dm_kafka_config.xml. See "Mapping Business Events to Kafka Topics".

  3. Follow the instructions in "Installing BRM" in BRM Installation Guide to install the BRM 12.0 Patch Set 4 or later full build package (brmserver_12.0.0.x.0_platform_generic_full.jar). When the Installation Type screen appears during the installation process, do one of the following:

    • To install all BRM components, including the Kafka DM, select the Complete installation option.

    • To install the Kafka DM along with other individual BRM components, select the Custom installation option, select Kafka Data Manager 12.0.0.x.0, and then select any other optional managers that you want to install.

Configuring Thread Pooling for the Kafka DM

You configure how many parallel requests that the Kafka DM can receive from the CM by editing the BRM_home/sys/dm_kafka/Infranet.properties file. Under the file's DM EAI Configuration section, configure the entries listed in Table 36-1.

Table 36-1 Kafka DM Infranet.properties Entries

Entry Description

infranet.dm.name

The name of the Kafka DM. If there are multiple Kafka DMs, each one must have a unique name.

The default is DM-KAFKA-1.

infranet.server.enabletimeinfo

Whether to log the timing information for each opcode that is run within the Kafka DM. The valid values are yes and no.

Note: Disable this entry for production systems.

infranet.connection.pool.enable

Whether to enable thread pooling for the Kafka DM:

  • true: Thread pooling is enabled. This is the default.

  • false: Thread pooling is disabled. In this case, a thread is spawned for each CM request.

infranet.connection.pool.size

The number of threads that can run in the JS server to accept requests from the CM.

Enter a number from 1 through 2000. The default is 64.

Enabling SSL between Kafka DM and Kafka Server

Apache Kafka allows Kafka clients, such as the Kafka DM, to use SSL for encrypting traffic and for authentication. By default, SSL is disabled between Kafka clients and the Kafka Server. To secure communications between them, you must enable SSL in both the Kafka Server and Kafka DM.

Enabling SSL in Kafka Server

To enable SSL in the Kafka Server:

  1. Create a Kafka Server certificate, KeyStore, and TrustStore by following the instructions in "Encryption and Authentication using SSL" in the Apache Kafka documentation.

  2. Update your Kafka server.properties file with the SSL parameters.

  3. Verify that SSL is set up properly in the Kafka Server. To do so:

    1. Create a test Kafka client certificate, KeyStore, and TrustStore by following the instructions in "Encryption and Authentication using SSL" in the Apache Kafka documentation.

    2. Create a client producer.properties file for kafka-console-producer and then add the client SSL properties to it.

    3. Create a client consumer.properties file for kafka-console-consumer and then add the client SSL properties to it.

    4. In Terminal-1, run the Kafka Producer Console:

      kafka-console-producer.sh --broker-list domainName:portNumber --topic topicName --producer.config config/producer.properties

      where:

      • domainName and portNumber are the domain name and port number for the system running the Kafka Server.

      • topicName is the name of the topic in which to send messages.
    5. In Terminal-2, run the Kafka Consumer Console:

      kafka-console-consumer.sh --bootstrap-server domainName:portNumber --topic topicName --from-beginning --consumer.config config/consumer.properties

      where:

      • domainName and portNumber are the domain name and port number for the system running the Kafka Server Consumer.

      • topicName is the name of the topic in which to retrieve messages.
    6. Send sample messages from the Kafka Producer Console, and then verify that the messages are received in the Kafka Consumer Console.

Enabling SSL in Kafka DM

To enable SSL in the Kafka DM:

  1. Create a Kafka client certificate, KeyStore, and TrustStore by following the instructions in "Encryption and Authentication using SSL" in the Apache Kafka documentation.

  2. Verify that the client KeyStore and TrustStore are set up properly by running the following command:

    openssl s_client -debug -connect domainName:portNumber -tls1_2 

    where domainName and portNumber are the domain name and port number for the system running the Kafka client.

    If the command's output does not display the certificate or if there are other error messages, the KeyStore is not set up properly.

  3. Add all of the sensitive data, such as the KeyStore password and TrustStore password, as a key-value pair in the BRM wallet. To do so, use the Oracle mkstore utility.

    For example, to set the password for the TrustStore password, you could run this command:

    mkstore -wrl "/home/myuser/wallet" -createCredential "TrustStorePassword" "password"
  4. Update the Kafka DM configuration file (dm_kafka_config.xml) with the SSL details. See "Mapping Business Events to Kafka Topics" for more information.

Configuring Event Notification for Kafka Servers

You define which notification events are sent to the Payload Generator EM by using the event notification file. The default Kafka DM event notification file (pin_notify_kafka_sync) specifies to send all account and pricing events to the Payload Generator EM, but you can add or exclude notification events from this file to accommodate your business needs.

To configure event notification for Kafka servers:

  1. Open the BRM_home/sys/data/config/pin_notify_kafka_sync file in an XML editor.

  2. To add a notification event, use this syntax:

    opcode_number     flag     event

    where:

    • opcode_number is the number associated with the opcode to run when the event occurs. Opcode numbers are defined in header (*.h) files in the BRM_home/include/ops directory. To send the notification event to the Payload Generator EM, enter 1301 for the opcode number.

    • flag is the name of the flag to pass to the opcode when it is called by the event notification feature. 0 means no flag is passed.

    • event is the name of the event that triggers the execution of the opcode. You can use any BRM default or custom event defined in your system. Triggering events do not have to be persistent. For example, you can use notification events (see "About Notification Events") and events that you have excluded from the BRM database (see "Managing Database Usage" in BRM System Administrator's Guide).

    For example:

    1301     0     /event/session

    This example specifies that when an /event/session event occurs, BRM event notification calls opcode number 1301, which is the EAI framework publishing opcode (PCM_OP_PUBLISH_GEN_PAYLOAD). In this case, the contents of the event are passed to the opcode without any flags.

  3. To exclude a notification event, comment it out by inserting a number sign (#) at the beginning of the entry. For example:

    # 1301     0     /event/session
  4. Close and save the edited file.

  5. If your system has multiple configuration files for event notification, merge them.

    Ensure that the merged file includes the contents from your BRM_home/sys/data/config/pin_notify_kafka_sync file.

  6. Load your final event notification list into the BRM database by using the load_pin_notify utility:

    load_pin_notify -v event_notification_file

    where event_notification_file is the path and name of the BRM event notification file.

Defining Business Events for Your Kafka Server

You define which notification events the Payload Generator EM uses to form a complete business event for your Kafka server by using the Kafka DM payload file (payloadconfig_kafka_sync.xml). You can add or remove business events from the file to accommodate your business needs.

For information, see "Defining Business Events".

Mapping Business Events to Kafka Topics

You specify how to connect the Kafka DM to your Kafka brokers, the Kafka DM mode to use, the authentication method for communication, and the Kafka topics to create by using the Kafka DM configuration file (BRM_home/sys/dm_kafka/dm_kafka_config.xml).

Note:

Ensure that you are using the correct XML version of the dm_kafka_config.xml file. Use version 2.0 for BRM 12.0 Patch Set 6 and later, and use version 1.0 for BRM 12.0 Patch Set 4 and Patch Set 5. The XML version number can be found in the following entry:

<KafkaConfig version="2.0">

Version 1.0 is supported in Patch Set 6 and later for backwards-compatibility only, but you will not be able to use the new 2.0 functionality. For information about the new functionality supported in version 2.0, see "Kafka DM Enhancements" in BRM Patch Set Release Notes.

For each Kafka topic that you create, you specify:

  • The name of the Kafka topic, which must be unique.

  • The business events to include in the payload published to the Kafka topic.

  • The payload format: XML or JSON. The Kafka DM converts the flists for each business event into the specified format.

  • For the XML payload format, the style of all field names in the XML payload:

    • ShortName: The XML field names are in all capitals, such as <POID>, <ACCOUNT_OBJ>, and <SUBSCRIBER_PREFERENCES_INFO>. This is the default.

    • CamelCase: The XML field names are in CamelCase, such as <Poid>, <AccountObj>, and <SubscriberPreferencesInfo>.

    • NewShortName: The XML field names are in CamelCase and are prefixed with fld, such as <fldPoid>, <fldAccountObj>, and <fldString>.

    • OC3CNotification: The input is transformed to match the field and formatting requirements of Oracle Communications Convergent Charging Controller. Use this style if Convergent Charging Controller is your external notification application.

  • (Optional) The key setting. See "About Setting Topic and Payload Keys".

  • (Optional) The headers to add to each message sent to the topic. See "Adding Headers to Messages".

  • (Optional) The payload settings. See "Adding Separate Payload Settings".

  • (Optional) The mapping of BRM flist fields to XML or JSON elements. See "Mapping Flist Fields to Payload Tags".

For information about how to edit the dm_kafka_config.xml file, see "Editing the dm_kafka_config.xml File".

About Setting Topic and Payload Keys

Note:

Only XML version 2.0 of the dm_kafka_config.xml file supports topic and payload keys.

By default, each message or payload sent to a Kafka topic has its key set to the payload name, such as CustCreate. You can optionally change the key to another value by setting the key attribute.

The key attribute can be added to the <DefaultTopicDefinition>, <TopicDefinition>, and <Payload> elements, and it can be set to the following:

  • {PayloadName}: This variable is replaced with the message payload name, such as CustDelete or CustCreate. This is the default.

  • {Random}: This variable is replaced with a random number, such as 1644450215456.

  • {Dynamic}: This variable is replaced with the value passed in the PCM_OP_PUBLISH_POL_PREP_EVENT output flist. See "Configuring the Dynamic Key Value" for more information.

  • Static Value: The characters or numbers you enter will be passed literally. For example, if you enter 1.0.0.0, the key will be set to 1.0.0.0.

You can also combine attribute values, such as {PayloadName}_1.0.0.

The following shows sample entries for setting a key in the default BrmTopic topic and the NotificationTopic topic:

<DefaultTopicDefinition name="BrmTopic" format="XML" style="OC3CNotification" key="A1"/>
<TopicDefinition name="NotificationTopic" format="XML" style="OC3CNotification" key="{PayloadName}-1234">

In this example, messages sent to BrmTopic would have their key set to A1. Messages sent to NotificationTopic would have their key set to CustDelete-1234, CustCreate-1234, and so on.

For more information about adding keys to payloads, see "Adding Separate Payload Settings".

Adding Headers to Messages

Note:

Only XML version 2.0 of the dm_kafka_config.xml file supports adding headers to messages.

You can configure the Kafka DM to add one or more headers to each message sent to a specified Kafka topic. By default, headers are not added to messages, except when a topic's format is XML and style is OC3CNotification. For these topics, the default header is:

NOTIFICATION_TYPE=PayloadName, NOTIFICATION_VERSION="1.0.0.0.0"

Headers can be added under the <DefaultTopicDefinition> or <TopicDefinition> sections. To do so, add a <Headers> section. Under it, add a <Header> element for each header that you want to add:

<TopicDefinition name="MessageTopic" format="XML">    
   <Headers>
      <Header key="key" value="value" style="style"/>  
      <Header key="key" value="value" style="style"/>          
   </Headers>

Table 36-2 describes the attributes in the <Header> element.

Table 36-2 Header Attributes

Attribute Name Description

key

The header key, such as NOTIFICATION_TAG or NOTIFICATION_VERSION.

Value

The value for the header key, which can be the following:

  • {PayloadName}: This variable is replaced with the message payload name, such as CustDelete or CustCreate.

  • Static value: The characters or numbers you enter will be passed literally. For example, if you enter 1.0.0.0, the header key value for all messages will be 1.0.0.0.

You can also combine both values, such as {PayloadName}_2A.

style

The style to use for {PayloadName} values:

  • CamelCase: The payload name is written in CamelCase, such as CustCreate or WelcomeMsg.

  • UpperCaseUnderscore: The payload name is written in all capitals with an underscore between each word, such as CUST_CREATE or WELCOME_MSG.

The following shows sample header entries for a topic where the header key is NOTIFICATION_TAG, the key value is {PayloadName}_EVENT, and the style is UpperCaseUnderscore:

<TopicDefinition name="MessageTopic" format="XML">    
   <Headers>
      <Header key="NOTIFICATION_TAG" value="{PayloadName}_EVENT" style="UpperCaseUnderscore"/>            
   </Headers>

In this case, if the payload name is BalanceExpiry, the following header would be added to the message:

NOTIFICATION_TAG=BALANCE_EXPIRY_EVENT

Adding Separate Payload Settings

Note:

Only XML version 2.0 of the dm_kafka_config.xml file supports adding separate payloads.

For each topic, you can configure the Kafka DM to write business events in a separate message payload. To do so, you add a <Payloads> section under the <DefaultTopicDefinition> or <TopicDefinition> sections. Under <Payloads>, add a <Payload> element for each business event you want written to a separate message payload:

<Payloads>    
   <Payload name="name" key="key" partition="partition">            
   </Payload>    
</Payloads>

Table 36-3 describes the attributes in the <Payload> element.

Table 36-3 Payload Attributes

Attribute Name Description

name

The name of the business event to write to a separate payload.

key

The payload key setting:

  • {PayloadName}: This variable is replaced with the message payload name, such as CustDelete or CustCreate. This is the default.

  • {Random}: This variable is replaced with a random number, such as 1644450215456.

  • {Dynamic}: This variable is replaced with the value passed in the PCM_OP_PUBLISH_POL_PREP_EVENT output flist. See "Configuring the Dynamic Key Value" for more information.

  • Static Value: The characters or numbers you enter will be passed literally. For example, if you enter 1.0.0.0, the key will be set to 1.0.0.0.

Note: Keys set at the payload level override the keys set at the topic level.

See "About Setting Topic and Payload Keys" for more information

partition

The partition in which to write the message and topic.

By default, the Kafka DM assigns messages to a random partition in the Kafka topic.

The following shows sample entries for the BalanceExpiry payload:

<Payloads>    
   <Payload name="BalanceExpiry" key="ABCD" partition="1">         
   </Payload>    
</Payloads>

In this example, a BalanceExpiry business event would have its key set to ABCD, and would be loaded into partition 1 of the Kafka topic.

Mapping Flist Fields to Payload Tags

Note:

Only XML version 2.0 of the dm_kafka_config.xml file supports the mapping of flist fields.

The Kafka DM provides default mappings between a business event's flist fields and the XML or JSON elements in the payload sent to Kafka topics. You can override how the Kafka DM transforms one or more business event flist fields to XML or JSON elements at the topic level and the payload level.

Note:

If mappings for the same flist field are defined at both the topic level and the payload level, the mapping in the payload takes precedence.

Field mappings can be added under the <DefaultTopicDefinition>, <TopicDefinition>, and <Payload> sections. To do so, add a <FieldMaps> section. Under it, add a <FieldMap> element for each field that you want to override:

<TopicDefinition name="MessageTopic" format="XML">    
   <FieldMaps>
      <FieldMap pinfld="field" tag="tag"/>  
      <FieldMap pinfld="field" value="tag"/>          
   </FieldMaps>

Table 36-4 describes the attributes in the <FieldMap> element.

Table 36-4 FieldMap Attributes

Attribute Name Description

pinfld

The name of the flist field in the BRM business event payload.

tag

The name of the XML or JSON element in the payload published to the Kafka DM.

The following shows sample field mapping entries:

<TopicDefinition name="NotificationTopic" format="XML" style="Notification">        
   <FieldMaps>            
      <FieldMap pinfld="PIN_FLD_ACCOUNT_OBJ" tag="AccountObjId1"/>            
      <FieldMap pinfld="PIN_FLD_BAL_GRP_OBJ" tag="BalGrpPoidId1"/>        
   </FieldMaps>
   <Payloads>
      <Payload name="BalanceExpiry">
         <FieldMaps>                    
            <FieldMap pinfld="PIN_FLD_ACCOUNT_OBJ" tag="AccountPoidId2"/>
         </FieldMaps>            
      </Payload>
   </Payloads>
</TopicDefinition>

In this example, fields would be mapped as follows:

  • For the BalanceExpiry payload name: PIN_FLD_ACCOUNT_OBJ is mapped to AccountPoidId2, PIN_FLD_BAL_GRP_OBJ is mapped to BalGrpPoidId1, and all other flist fields use the default mappings.

  • All other payload names: PIN_FLD_ACCOUNT_OBJ is mapped to AccountObjId1, PIN_FLD_BAL_GRP_OBJ is mapped to BalGrpPoidId1, and all other flist fields use the default mappings.

Editing the dm_kafka_config.xml File

To map business events to Kafka topics:

  1. Open the BRM_home/sys/dm_kafka/dm_kafka_config.xml file in an XML editor.

  2. In the <KafkaAsyncMode> XML element, specify the Kafka DM mode to use:

    <KafkaAsyncMode>value</KafkaAsyncMode>

    where value is true for asynchronous mode, and false for synchronous mode. In asynchronous mode, the Kafka DM records in a log file all business events that fail to publish to the Kafka server. In synchronous mode, the Kafka DM returns errors to BRM when a business event fails to publish to the Kafka server.

  3. In the following XML element, set the amount of time, in milliseconds, the Kafka DM waits for the Kafka server to respond:

    <ProducerConfig>max.block.ms=timeout</ProducerConfig>

    where timeout specifies the amount of time. If <KafkaAsyncMode> is set to false, set timeout to 3000 or higher. If <KafkaAsyncMode> is set to true, set timeout to 500 or less.

  4. In the <BootstrapServerList> XML element, enter a comma-separated list of addresses for the Kafka brokers in this format:

    <BootstrapServerList>hostname1:port1,hostname2:port2</BootstrapServerList>

    You can alternatively leave the default value in the XML entry and set the list in the environment variable, as shown in "Installing the BRM Kafka DM."

  5. To enable SSL or authentication between the Kafka DM and your Kafka server, do the following:

    1. Under the <ProducerConfigs> section, add <ProducerConfig> XML elements for your authentication protocol and mechanism. For information about the possible entries, see "Security" in the Apache Kafka documentation.

    2. For sensitive information such as passwords, create a token for the password value. For example:

      <ProducerConfig>ssl.truststore.password={TokenName}</ProducerConfig>

      (For XML version 2.0) Ensure that TokenName matches the key value stored in the Oracle wallet. See "Enabling SSL in Kafka DM".

    3. (For XML version 1.0 only) For each token, add an <EncryptedVariable> element that contains: the token name and the encrypted password.

      Use the pin_crypt_app utility to encrypt the password. See "pin_crypt_app" for more information.

      <EncryptedVariable>TokenName=&ozt|MyEncryptedPassword</EncryptedVariable>

    This shows sample <ProducerConfigs> entries for authentication using the SASL_SSL protocol and SASL/PLAIN mechanism:

    <ProducerConfigs>        
       <ProducerConfig>ssl.truststore.location=${HOME}/kafka/keystores/client.truststore.jks</ProducerConfig>
       <ProducerConfig>ssl.truststore.password={TrustStorePassword}</ProducerConfig>
       <ProducerConfig>ssl.keystore.location=${HOME}/kafka/store/server/server</ProducerConfig>
       <ProducerConfig>ssl.keystore.password={KeyStorePassword}</ProducerConfig>
       <ProducerConfig>ssl.key.password={KeyPassword}</ProducerConfig>
       <ProducerConfig>security.protocol=SASL_SSL</ProducerConfig>
       <ProducerConfig>sasl.mechanism=PLAIN</ProducerConfig>
       <ProducerConfig>sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="dmkafka" password="{Password}";</ProducerConfig>
    </ProducerConfigs>
  6. In the <DefaultTopicDefinition> XML element, set the following:

    • name attribute: The name of the default Kafka topic.

    • format attribute: The payload format: XML or JSON.

    • style attribute: The style of XML payloads: ShortName, CamelCase, NewShortName, or OC3CNotification.

    • key attribute: The key to add to each message: {PayloadName}, {Random}, {Dynamic}, or a static value. This attribute is optional.

    • <Headers> element: Information about any headers to add to messages sent to this topic. This element is optional.

    • <Payloads> element: Information about any separate payloads to create in messages sent to this topic. This element is optional.

    • <FieldsMaps> element: Information about how to map specific flist fields to the target format. This element is optional.

    This example creates a default topic named BrmTopic that converts flists into XML payloads in the CamelCase style, adds a random key, adds the header NOTIFICATION VERSION=1.0.0.0.0 to all messages sent to it, routes BalanceExpiry business events to partition 1, and maps the PIN_FLD_ACCOUNT_OBJ flist field.

    <DefaultTopicDefinition name="BrmTopic" format="XML" style="CamelCase" key="{Random}">
       <Headers>                                 
          <Header key="NOTIFICATION_VERSION" value="1.0.0.0.0"/>    
       </Headers>
       <Payloads>      
          <Payload name="BalanceExpiry" key="ABCD" partition="1">            
             <FieldMaps>                
                <FieldMap pinfld="PIN_FLD_ACCOUNT_OBJ" tag="AccountPoidId2"/>            
             </FieldMaps>        
          </Payload>        
       </Payloads>
    </DefaultTopicDefinition>
  7. For each Kafka topic that you want to create, add a <TopicDefinition> XML element and set the following:

    • name attribute: The name of the Kafka topic

    • format attribute: The payload format: XML or JSON

    • style attribute: The style of XML payloads: ShortName, CamelCase, NewShortName, or OC3CNotification

    • key attribute: The key to add to each message: {PayloadName}, {Random}, {Dynamic}, or a static value. This attribute is optional.

    • <Headers> element: Information about any headers to add to messages sent to this topic. This element is optional.

    • <Payloads> element: Information about any separate payloads to create in messages sent to this topic. This element is optional.

    • <FieldsMaps> element: Information about how to map specific flist fields to the target format. This element is optional.

    For example:

    <TopicDefinition name="NotificationTopic" format="XML" style="OC3CNotification" key="{Dynamic}">
       <Headers>                         
          <Header key="NOTIFICATION_TAG" value="{PayloadName}Event" style="CamelCase"/>        
          <Header key="NOTIFICATION_VERSION" value="1.0.0.0.0"/>    
       </Headers>
          <Payload name="BillDue" key="{PayloadName}" partition="2">            
             <FieldMaps>                
                <FieldMap pinfld="PIN_FLD_ACCOUNT_OBJ" tag="AccountPoidId1"/>            
             </FieldMaps>        
          </Payload>     
    </TopicDefinition>
  8. Save and close the file.

  9. Restart the Kafka DM for the changes to take effect.

Configuring the Dynamic Key Value

You can configure BRM to replace dynamic keys in message payloads with a value you specify in the PCM_OP_PUBLISH_POL_PREP_EVENT policy opcode.

To configure BRM to replace the dynamic key value:

  1. Configure the KafkaDBNumber business parameter in the notification instance of the /config/business_params object:

    1. Use the following command to create an editable XML file from the notification instance of the /config/business_params object:

      pin_bus_params -r BusParamsNotification bus_params_notification.xml

      This command creates the XML file named bus_params_notification.xml.out in your working directory. If you do not want this file in your working directory, specify the path as part of the file name.

    2. Set KafkaDBNumber to the Kafka database number:

      <KafkaDBNumber>databaseNumber</KafkaDBNumber>

      where databaseNumber is the number for the Kafka database. The default is 0.0.9.6 / 0.

    3. Save the file and change its name to bus_params_notification.xml.

    4. Use the following command to load this change into the /config/business_params object:

      pin_bus_params bus_params_notification.xml

      You should run this command from the BRM_home/sys/data/config directory, which includes support files used by the utility. To run it from a different directory, see "pin_bus_params".

    5. Read the object with the testnap utility or the Object Browser to verify that all fields are correct.

      For general instructions on using testnap, see "Using the testnap Utility to Test BRM". For information on how to use Object Browser, see "Reading Objects".

    6. Stop and restart the CM.

      For more information, see "Starting and Stopping the BRM System".

    7. (Multischema systems only) Run the pin_multidb script with the -R CONFIG parameter.

      For more information, see "pin_multidb" in BRM System Administrator's Guide.

  2. Customize the PCM_OP_PUBLISH_POL_PREP_EVENT policy opcode to compare the value of the PIN_FLD_POID input flist field to that of the KafkaDBNumber business parameter.

    If they match, the policy opcode must pass the following output flist fields in the PIN_FLD_NOTIFICATION_KEY_INFO substruct:

    • PIN_FLD_NOTIFICATION_KEY: Set this to your desired dynamic key value.

    • PIN_FLD_STATUS: Set this to 1.

    For example, the following output flist settings specify to replace the dynamic key value with ServiceLifeStateChangeExpiry:

    1 PIN_FLD_NOTIFICATION_KEY_INFO   SUBSTRUCT [0] allocated 20, used 5
    2     PIN_FLD_NOTIFICATION_KEY          STR [0] "ServiceLifeStateChangeExpiry"
    2     PIN_FLD_STATUS                   ENUM [0] 1
  3. Set the message payload key attribute to {Dynamic}. See "About Setting Topic and Payload Keys".

Configuring Where to Record Failed Events

If you configure the Kafka DM to operate in Asynchronous mode, it records to a log file information about business events that fail to publish successfully to the Kafka server. Publishing might fail, for example, because the Kafka server is down or the connection fails.

Note:

Failed business events are written to a log file only in Asynchronous mode. When a business event fails to publish in Synchronous mode, the Kafka DM rolls back the transaction and returns an error to BRM.

By default, the Kafka DM records failed business events to the BRM_log_file_home/dm_kafka/kafka_failed_message.log file, but you can change the name and location of the file.

To configure where the Kafka DM records failed business events, set the filename entry in the BRM_home/sys/dm_kafka/log4j2.xml file:

<RollingFile name="KAFKA" fileName="${env:PIN_LOG}/dm_kafka/kafka_failed_message.log" filePattern="${env:PIN_LOG}/dm_kafka/kafka_failed_message.log.%i">

The following shows a sample flist for a failed business event that could be recorded in the kafka_failed_message.log file:

0 PIN_FLD_EXTENDED_INFO        SUBSTRUCT [0] allocated 3, used 3
0 PIN_FLD_ACCOUNT_OBJ               POID [0] 0.0.0.1 /account 161798 0
0 PIN_FLD_LOGINS                   ARRAY [0] allocated 2, used 2
1     PIN_FLD_SERVICE_OBJ           POID [0] 0.0.0.1 /service/ip 162950 0
1     PIN_FLD_LOGIN                  STR [0] "user738"
0 PIN_FLD_LOGINS                   ARRAY [1] allocated 1, used 1
1     PIN_FLD_LOGIN                  STR [0] ""
0 PIN_FLD_LOGINS                   ARRAY [2] allocated 2, used 2
1     PIN_FLD_SERVICE_OBJ           POID [0] 0.0.0.1 /service/email 160518 0
1     PIN_FLD_LOGIN                  STR [0] "user738@portal.com"
0 PIN_FLD_LOGINS                   ARRAY [3] allocated 1, used 1
1     PIN_FLD_LOGIN                  STR [0] ""
0 PIN_FLD_STRING                     STR [0] "CustCreate"
0 PIN_FLD_END_T                   TSTAMP [0] (1669908080) 01/12/2022 07:21:20:000 AM
0 PIN_FLD_BAL_INFO                 ARRAY [0] allocated 3, used 3
1     PIN_FLD_OBJECT_CACHE_TYPE     ENUM [0] 0
1     PIN_FLD_BAL_GRP_OBJ           POID [0] 0.0.0.1 /balance_group 160006 1
1     PIN_FLD_BALANCES             ARRAY [840] allocated 1, used 1
2         PIN_FLD_CREDIT_PROFILE     INT [0] 3
0 PIN_FLD_CREATED_T               TSTAMP [0] (1672646104) 01/01/2023 23:55:04:692 PM  

Customizing Notification Enrichment

By default, the PCM_OP_PUBLISH_EVENT opcode retrieves the delivery identifier for a delivery method, such as the email address for an email delivery method, by reading the /profile/subscriber_preferences object. If one or more delivery identifiers are not present in the object, the opcode includes in its output flist the PIN_FLD_NOTIFICATION_STATUS_INFO substruct indicating the list of missing delivery identifier tags. For example, this PIN_FLD_NOTIFICATION_STATUS_INFO substruct indicates that the Twitter handle and email address were not found in the /profile/subscriber_preferences object:

1  PIN_FLD_NOTIFICATION_STATUS_INFO SUBSTRUCT [0] allocated 20, used 2
2     PIN_FLD_STATUS      ENUM [0] 1
2     PIN_FLD_STATUSES   ARRAY [0] allocated 20, used 2
3        PIN_FLD_NAME      STR [0] "TwitterHandle"
2     PIN_FLD_STATUSES   ARRAY [1] allocated 20, used 2
3        PIN_FLD_NAME      STR [0] "Email"

You can customize the PCM_OP_PUBLISH_POL_PREP_EVENT policy opcode to look up or provide the missing delivery identifiers. The policy opcode can return a delivery identifier value in the PIN_FLD_VALUE output flist field of the PIN_FLD_NOTIFICATION_STATUS_INFO substruct. For example, this PIN_FLD_NOTIFICATION_STATUS_INFO substruct provides the @SampleName Twitter handle and abcd@sample.com email address:

1  PIN_FLD_NOTIFICATION_STATUS_INFO SUBSTRUCT [0] allocated 20, used 2
2     PIN_FLD_STATUS      ENUM [0] 1
2     PIN_FLD_STATUSES   ARRAY [0] allocated 20, used 2
3        PIN_FLD_NAME      STR [0] "TwitterHandle"
3        PIN_FLD_VALUE     STR [0] "@SampleName"
2     PIN_FLD_STATUSES   ARRAY [1] allocated 20, used 2
3        PIN_FLD_NAME      STR [0] "Email"
3        PIN_FLD_VALUE     STR [0] "abcd@sample.com"

If the policy opcode does not return a value in the PIN_FLD_VALUE field, the PCM_OP_PUBLISH_EVENT opcode returns an error and rolls back the entire transaction.

You can also customize the PCM_OP_PUBLISH_POL_PREP_EVENT policy opcode to not publish specific messages for specific publishers without having to roll back the entire transaction. To do so, customize the policy opcode to return in the PIN_FLD_NOTIFICATION_STATUS_INFO substruct, the PIN_FLD_STATUS output flist field set to one of the following:

  • 0: Indicates to not publish the message to this publisher. In this case, PCM_OP_PUBLISH_EVENT does not publish the event.

  • 1: Indicates that the message needs to be published to this publisher. In this case, PCM_OP_PUBLISH_EVENT returns an error and rolls back the entire transaction.

For example, this specifies to not publish the message to this publisher:

1  PIN_FLD_NOTIFICATION_STATUS_INFO SUBSTRUCT [0] allocated 20, used 2
2     PIN_FLD_STATUS  ENUM [0] 0