Enriching Events with Data from External Databases

You can optionally enrich Oracle Communications Unified Assurance events with data from an external MySQL, PostgreSQL, or Oracle database by using any of the following methods to get the data from the external database:

Caution:

The samples provided in this document are intended as basic starting points for potential customizations, to illustrate the different options for external database lookups. They would require modification to be adapted for your environment and specific use cases, including specific implementation of database connections, error handling, and where to add the enrichment data. You would also need to test and fine-tune to avoid unintended consequences or performance degradation.

Using Custom Rules

When using custom rules to enrich events with external data, you can:

You must also declare any additional modules that you use in the rules, such as LWP::UserAgent and HTTP::Request for the REST API use case, in the base.load file.

See Viewing and Editing Rules for more information about rules.

If your environment does not need to process messages at a very high rate (spending only milliseconds for each event), you can use custom rules similar to Sample Custom Rules for Database Lookups. The samples use a REST or a SQL request to look up interface information in the external database.

However, if your environment has a high incoming message rate that requires rapid processing, REST API lookups in external databases could introduce too much delay. In this case, if the enrichment data is relatively stable (updated in minutes rather than the milliseconds required for event processing), you can alternatively store the enrichment data in a local multi-level hash, updated periodically by a custom script run as a job. Your rule can then quickly get the data locally, rather than making a slower call to an external database. See Sample Custom Rule for Locally Stored Data.

Sample Custom Rules for Database Lookups

You can use these sample custom rules if you do not need to prioritize processing speed and can query the external database for each incoming event. The first sample uses a REST API to query the data, and the second uses a direct connection to the database to make a SQL query.

Sample Custom Rule for REST API Database Lookup

This sample custom rule, added to the base.rules file for trap messages, does the following when an event comes in:

  1. Extracts the ifIndex and IP values from a trap message.

  2. Makes an API request to get the data from the external database based on the extracted IP and ifIndex.

  3. Adds the new ifDescr to the Event SubNode property.

To use this sample, at minimum you would need to adjust the following elements:

# Use IP address variable provided by Trapd
my $ip

# Extract ifIndex from @varbinds provided by Trapd
$ifoid = '1.3.6.1.2.1.2.2.1.1';
my $ifIndex;
for my $oidiid (@varbinds) {
    if ($oidiid =~ /^\Q$ifoid.\E(\d+)$/) {
        $ifIndex = $1;
        last;
    }
}

# Replace the paths below with the actual paths to the SSL certificates and keys to use with the external database, or implement credentials in your preferred way. 

if ($ifIndex && $ip) {
    my $ua = LWP::UserAgent->new(ssl_opts => {
        SSL_ca_file => '/path/to/your/ca/cert.pem',
        SSL_cert_file => '/path/to/your/client/cert.pem',
        SSL_key_file => '/path/to/your/client/key.pem'
    });

# Replace the API URL and adjust the request if necessary to match the expected endpoint and request for your external database.

    my $api_url = "https://example.com/lookup?ifIndex=$ifIndex&ip=$ip";
    my $req = HTTP::Request->new('GET', $api_url);

    my $response = $ua->request($req);

    if ($response->is_success) {
        my $decoded_response = decode_json($response->decoded_content);
        my $new_ifDescr = $decoded_response->{ifDescr}; # Assuming the API returns ifDescr in this format

        # Update SubNode with the new ifDescr
        $Event->{SubNode} = $new_ifDescr;
    } 
    else {
        $Log->Message('ERROR', "Failed to retrieve data from API: " . $response->status_line);
    }
} 
else {
    $Log->Message('DEBUG', "ifIndex or IP not found in traplog");
}

Sample Custom Rule for SQL Database Lookup

This sample custom rule, added to the base.rules file for trap messages, does the following when an event comes in:

  1. Extracts the ifIndex and IP values from a trap message.

  2. Makes a SQL request to get the data from the external database based on the extracted IP and ifIndex.

  3. Adds the new ifDescr to the Event SubNode property.

To use this sample, at minimum you would need to:

# Connect to the external database configured in the Databases UI. Replace ExternalDB with the the external database schema that you configured.
my $ExternalDBH = DBConnect($Config, 'ExternalDB', {AutoCommit => 1});

# Use IP address variable provided by Trapd
my $ip

# Extract ifIndex from $traplog
$ifoid = '1.3.6.1.2.1.2.2.1.1';
my $ifIndex;
for my $oidiid (@varbinds) {
    if ($oidiid =~ /^\Q$ifoid.\E(\d+)$/) {
        $ifIndex = $1;
        last;
    }
}

if ($ifIndex && $ip) {
    my $SelectSQL = "SELECT ifDescr FROM Interface WHERE ifIndex = ? AND ip = ?";
    my $sth = $ExternalDBH->prepare($SelectSQL);

    $sth->execute($ifIndex, $ip);
    my $data = $sth->fetchrow_hashref;

    if (defined $data && defined $data->{ifDescr}) {
        my $new_ifDescr = $data->{ifDescr};

        # Update SubNode with the new ifDescr
        $Event->{SubNode} = $new_ifDescr;
    } 
    else {
        $Log->Message('DEBUG', "No ifDescr found for ifIndex $ifIndex and IP $ip");
    }

    $sth->finish();
} 
else {
    $Log->Message('DEBUG', "ifIndex or IP not found in traplog");
}

$ExternalDBH->disconnect();

Sample Custom Rule for Locally Stored Data

This scenario is adapted for environments that require rapid processing of incoming events. It involves:

By storing the data locally, querying it from the database on its own schedule, and loading it once for all the rules, your custom rule can rapidly query the data from $SharedHash rather than making a slower external request.

To use the sample custom rule:

  1. Create a custom script to:

    1. Query the external database for the enrichment data.

    2. Create a lookup file with the keys and values from the enrichment data. The sample assumes the lookup file is a JSON file in the following format:

      {
        "<IP Address>": {
          "1": "<ifDescr>"
        }
      }
      

      For example:

      {
        "192.0.2.1": {
          "1": "eth0",
          "2": "Link to main router"
        },
        "192.0.2.2": {
          "1": "eth1",
          "2": "Loopback Interface"
        }
      }
      
    3. Compare the queried data to the most recent local data, and update the local data if it is different, working with your version control system (for example, SVN) to track and commit changes.

  2. Set up a custom job in Unified Assurance to run this script at regular intervals.

  3. Add custom logic in base.load to load the lookup file once, before the rules are run. For example, add the following to base.load for traps:

    use File::Slurp;
    use JSON;
    
    my $ifDescrFile = "<path_to_lookup_json>";
    
    if (-f $ifDescrFile) {
        my $json_text = read_file($ifDescrFile);
        my $data = decode_json($json_text);
        foreach my $ipAddress (keys %$data) {
            $SharedHash{'ifDescr_per_ifIndex'}{$ipAddress} = $data->{$ipAddress};
    
        }
        $Log->Message('INFO', 'Loaded ifDescr_per_ifIndex from ' . $ifDescrFile);
        $Log->Message('DEBUG', "ifDescr_per_ifIndex Dump\n-------------\n" . Dumper($SharedHash{'ifDescr_per_ifIndex'}) . "\n-------------");
    }
    else {
        $Log->Message('ERROR', "Failed to load ifDescr_per_ifIndex from $ifDescrFile: File not found");
    }
    
  4. Add the sample custom rule defined below to the base.rules file for trap messages. To use this sample, at minimum you would need to adjust the following elements:

    • The location to store the new ifDescr. The example uses SubNode, but you could put it in Summary (and enhance with additional logic to handle existing data in Summary.)

    • The error handling, enhanced for any specific error scenarios for your environment.

    # Use IP address variable provided by Trapd
    my $ip;
    
    # Extract ifIndex from @varbinds provided by Trapd
    $ifoid = '1.3.6.1.2.1.2.2.1.1';
    my $ifIndex;
    for my $oidiid (@varbinds) {
        if ($oidiid =~ /^\Q$ifoid.\E(\d+)$/) {
            $ifIndex = $1;
            last;
        }
    }
    
    if ($ifIndex && $ip) {
        my $key = $ip;
        if (exists $SharedHash{ifDescr_per_ifIndex}{$key} && exists $SharedHash{ifDescr_per_ifIndex}{$key}{$ifIndex}) {
            my $new_ifDescr = $SharedHash{ifDescr_per_ifIndex}{$key}{$ifIndex};
    
            # Update SubNode with the new ifDescr
            $Event->{SubNode} = $new_ifDescr;
        } 
        else {
            $Log->Message('DEBUG', "No matching ifDescr found for $ip/$ifIndex in SharedHash");
        }
    } 
    else {
        $Log->Message('DEBUG', "ifIndex or IP not found in traplog");
    }
    

When a trap message comes in, the sample rule does the following:

  1. Extracts the ifIndex and IP values from the trap.

  2. Uses the ifIndex and IP values to get the ifDescr from $SharedHash.

  3. Adds the new ifDescr to the Event SubNode property.

Using CAPE Policies

To use CAPE policies to enrich events with external data:

  1. Optionally, create a CAPE-specific device zone. This is not required, but recommended. See About CAPE Device Zones for more information.

  2. Create a CAPE node to connect to the external database, get the enrichment data, and update the events. See Sample Cape Node Text for a simple example of node text to use to update an event's Customer field with data from an external database.

    If minimizing processing time is important, you can alternatively implement a solution similar to the one described in Sample Custom Rule for Locally Stored Data, where a job runs a separate script to store the enrichment data locally, and the node gets the data from the local hash. However, usually the delay introduced by querying an external database directly is not as important in a CAPE solution, where processing happens in batches after events are already in the Event database.

    If you have a more complex set of CAPE nodes querying multiple databases, you can also optionally set up separate nodes to handle the database connections. The processing nodes can call these rather than repeatedly opening and closing the database connections individually.

  3. Create a CAPE policy to select the events to enrich, and set it to run the new CAPE node.

    For example, to enrich events that have no data in the Customer field, create a CAPE policy and add the following to the Event Select Statement:

    SELECT *
      FROM Events
      WHERE Customer = ""
    

    This is a simple example for filtering by a single field, but you can modify it to perform more complex filtering as needed, or select only relevant fields from the Events table.

  4. Create a custom CAPE service to run the policies for the CAPE-specific device zone.

For more examples of creating CAPE devices zones, nodes, and policies, see Configuring Custom Actions with CAPE Policies.

For information about the UIs you use to create zones, nodes, policies, and custom services, see the following topics in Unified Assurance User's Guide:

Sample REST API CAPE Node Text

This sample CAPE node does the following:

  1. For each event selected by the CAPE policy, makes an API request to get the Customer data from the external database based on the DNS or IP (the value from the Node event field) for the associated device.

    This is a simple example for enriching a single field, but you can modify it to perform more complex enrichment as needed.

  2. Updates the event with the customer data in the Event database.

  3. Adds an event journal entry for the CAPE action.

To use this sample, you would need to adjust the following things at minimum:

use LWP::UserAgent;
use HTTP::Request;

my $EventDBH = DBConnect($Config, 'Event', {AutoCommit => 1});

my $CurrentTime = sprintf "%.3f", Time::HiRes::time;

# REST API endpoint details: Update these for your actual external database API interface.
my $api_url = 'http://example.com/api/devices';
my $user_agent = LWP::UserAgent->new;
$user_agent->timeout(10);

# SSL/TLS configuration: Update these for your environment, or adjust for your preferred authentication method.
$user_agent->ssl_opts(
    verify_hostname => 1,
    SSL_ca_file     => '/path/to/ca/cert.pem', # Path to your CA certificate file
    SSL_cert_file   => '/path/to/client/cert.pem', # Path to your client certificate file (if required)
    SSL_key_file    => '/path/to/client/key.pem', # Path to your client private key file (if required)
);

foreach my $Event (@{$EventData->{Events}}) {
    my $EventID = $Event->{EventID};
    my $Node    = $Event->{Node};

    $Log->Message("WARN", "-> UpdateCustomer -> Processing Event [$EventID]");

    # Construct the API request
    my $req = HTTP::Request->new(GET => "$api_url/$Node");
    $req->header('Accept' => 'application/json');

    # Send the request and get the response
    my $res = $ua->request($req);

    if ($res->is_success) {
        # Decode the JSON response
        my $data = eval { decode_json($res->decoded_content); };
        if ($@) {
            $Log->Message("ERROR", "Failed to decode API response: $@");
            next;
        }

        # Set the value from the response. This sample assumes the Customer key is directly within the JSON object; adjust according to your expected API response. 
        my $Customer = $data->{Customer};

        if (defined $Customer) {
            # Update the Customer field in the internal Event table
            my ($ErrorFlag, $Message) = UpdateEvent({
                DBH      => \$EventDBH,
                EventID  => $EventID,
                Values   => {
                    Action        => 'CustomerEnrichedExternally',
                    Actor         => 'CAPE',
                    LastChanged   => $CurrentTime,
                    Customer      => $Customer
                }
            });

            $Log->Message("WARN", "-> UpdateCustomer -> Updated Customer for Event [$EventID] to [$Customer]");
        } 
        else {
            $Log->Message("WARN", "-> UpdateCustomer -> No Customer found for Event [$EventID]");
        }
    } 
    else {
        $Log->Message("ERROR", "-> UpdateCustomer -> Failed to retrieve data for Event [$EventID]: " . $res->status_line);
    }
    # EventJournal for enrichment
    my ($ErrorFlag, $Message) = AddJournal({
        DBH       => \$EventDBH,
        EventID   => $EventID,
        TimeStamp => $CurrentTime,
        Username  => 'api',
        Entry     => 'Enriched event [' . $EventID . '] with Customer details from external database.'
    });
}

$EventDBH->disconnect();

Sample SQL CAPE Node Text

This sample CAPE node does the following:

  1. For each event selected by the CAPE policy, makes an API request to get the Customer data from the external database based on the DNS or IP (the value from the Node event field) for the associated device.

    This is a simple example for enriching a single field, but you can modify it to perform more complex enrichment as needed.

  2. Updates the event with the customer data in the Event database.

  3. Adds an event journal entry for the CAPE action.

To use this sample, at minimum you would need to:

# Connect to the Event and external database configured in the Databases UI. Replace ExternalDB with the the external database schema that you configured.

my $EventDBH = DBConnect($Config, 'Event', {AutoCommit => 1});
my $ExternalDBH = DBConnect($Config, 'ExternalDB', {AutoCommit => 1});

my $CurrentTime = sprintf "%.3f", Time::HiRes::time;

# Adjust the SELECT statement according to your database.
my $SelectSQL = "SELECT Customer FROM Devices WHERE Node = ?";
my $sth = $ExternalDBH->prepare($SelectSQL);

foreach my $Event (@{$EventData->{Events}}) {
    my $EventID = $Event->{EventID};
    my $Node    = $Event->{Node};

    $Log->Message("WARN", "-> UpdateCustomer -> Processing Event [$EventID]");

    $sth->execute($Node);
    my $data = $sth->fetchrow_hashref;

    if (defined $data && defined $data->{Customer}) {
        my $Customer = $data->{Customer};

        # Update the Customer field in the internal Event table
        my ($ErrorFlag, $Message) = UpdateEvent({
            DBH      => \$EventDBH,
            EventID  => $EventID,
            Values   => {
                Action        => 'CustomerEnrichedExternally',
                Actor         => 'CAPE',
                LastChanged   => $CurrentTime,
                Customer      => $Customer
            }
        });

        $Log->Message("WARN", "-> UpdateCustomer -> Updated Customer for Event [$EventID] to [$Customer]");
    } 
    else {
        $Log->Message("WARN", "-> UpdateCustomer -> No Customer found for Event [$EventID]");
    }

    # EventJournal for enrichment
    my ($ErrorFlag, $Message) = AddJournal({
        DBH       => \$EventDBH,
        EventID   => $EventID,
        TimeStamp => $CurrentTime,
        Username  => 'api',
        Entry     => 'Enriched event [' . $EventID . '] with Customer details from external database.'
    });
}

$sth->finish();
$EventDBH->disconnect();
$ExternalDBH->disconnect();

Using Custom Microservices

You can create custom microservices to enrich events with data from external databases. You can either fit these into the existing Unified Assurance microservice pipelines, or use them in combination with service oriented architecture (SOA) applications.

You can use any programming language you want to create the microservices, but the Unified Assurance SDK provides samples in Java (recommended) and Go. See Java Microservice SDK for information about creating microservices in Java and Microservice SDK Overview for information about creating microservices in Go.

Using the Event Microservice Pipeline

If you are using a complete microservice architecture, you can create a custom microservice that fits into the Event pipeline. The following image shows a simplified Event pipeline that includes a custom microservice to enrich events before they are processed by the FCOM processor:

Custom Event Enrichment Microservice

Description of custom-event-enrichment-microservice.png

As shown in the image, the microservice should:

  1. Get events in JSON format from an Apache Pulsar topic.

    The image shows getting the events from the assure1/event/collection topic, to enrich events before they are processed by the FCOM Processor microservice. You could alternatively include the custom microservice between the FCOM Processor and the Event Sink, or, in a non-FCOM flow, between the pollers and collectors and the Event Sink. The FCOM Processor or the pollers and collectors would publish to a custom topic, where the custom microservice would read.

  2. Make an API request to get enrichment data from the external database based on the DNS or IP for the device associated with an event.

  3. Enrich the JSON event with the new data.

  4. Publish the event to another Pulsar topic.

    The image shows publishing to a custom Pulsar topic, assure1/event/enrichment, where the FCOM processor picks it up. If you included the microservice after the FCOM Processor or the non-FCOM pollers and collectors, the custom microservice would publish to the assure1/event/sink topic.

For more information about the complete Event pipeline, see Understanding the Event Pipeline in Unified Assurance Concepts.

Coordinating with SOA Applications

Instead of integrating with the pipeline, if you are primarily using service oriented architecture (SOA) applications, you can still use a custom microservice to get the enrichment data. In this scenario:

  1. Use the ingress controller to add an API endpoint for the microservice.

  2. Use the custom microservice to:

    1. Make an API request to get enrichment data from the external database based on the DNS or IP (the value from the Node event field) for the device associated with an event.

    2. Cache the data in memory using the device name (IP address or DNS from the Node field) as the key.

  3. Use rules or CAPE nodes to make a request to the microservice's API endpoint to get the enrichment data from the microservice memory cache, and enrich the event.