Sun Java System Communications Services 6 2005Q4 Event Notification Service Guide

ENS Sample Code for Calendar Server

Calendar Server ships with a complete ENS implementation. If you wish to customize it, you may use the ENS APIs to do so. The following four code samples, a simple publisher and subscriber pair, and a reliable publisher and subscriber pair, illustrate how to use the ENS API. The sample code is provided with the product in the following directory:

/opt/SUNWics5/cal/csapi/samples/ens

Sample Publisher and Subscriber

This sample code pair establishes a simple interactive asynchronous publisher and subscriber.

Publisher Code Sample

/*
 * Copyright 2000 by Sun Microsystems, Inc.
 * All rights reserved
 *
 * apub : simple interactive asynchronous publisher using
 *
 * Syntax:
 *   apub host port
 */
#include <stdlib.h>
#include <stdio.h> 

#include "pasdisp.h"
#include "publisher.h" 

static pas_dispatcher_t *disp = NULL;
static publisher_t *_publisher = NULL;
static int _shutdown = 0; 

static void _read_stdin(); 

static void _exit_usage()
{
    printf("\nUsage:\napub host port\n");
    exit(5);
}
static void _exit_error(const char *msg)
{
    printf("%s\n", msg);
    exit(1);
}
static void _call_shutdown()
{
    _shutdown = 1;
    pas_shutdown(disp);
}
static void _open_ack(void *arg, int rc, void *enc)
{
    _publisher = (publisher_t *)enc;
    (void *)arg;
    if (!_publisher)
    {
        printf("Failed to create publisher with status %d\n", rc);
        _call_shutdown();
        return;
    }
    _read_stdin();
    return;
}
static void _publish_ack(void *arg, int rc, void *ignored)
{
    (void *)ignored;
    free(arg);
    if (rc != 0)
     {
        printf("Publish failed with status %d\n", rc);
        _call_shutdown();
        return;
    }
    _read_stdin();
    return;
}
static void _read_stdin()
{
    static char input[1024];
    printf("apub> ");
    fflush(stdout);
    while (!_shutdown)
    {
        if ( !fgets(input, sizeof(input), stdin) )
         {
             continue;
        } else {
            char *message;
            unsigned int message_len;
            input[strlen(input) - 1] = 0; /* Strip off the \n */
            if (*input == ’.’ && input[1] == 0)
            {
                publisher_delete(_publisher);
                _call_shutdown();
                break;
            }
            message = strdup(input);
            message_len = strlen(message);
            publish(_publisher, "enp://siroe.com/xyz",message,
                    message_len,
                    _publish_ack, NULL, (void *)message, 0);
            return;
        }
    }
    return;
}
main(int argc, char **argv)
{
    unsigned short port = 7997;
    char host[256];
    if (argc < 2) _exit_usage();
    if (*(argv[1]) == ’0’)
    {
        strcpy(host, "127.0.0.1");
    } else {
        strcpy(host, argv[1]);
    }
    if (argc > 2)
    {
        port = (unsigned short)atoi(argv[2]);
    }
    disp = pas_dispatcher_new(NULL);
    if (disp == NULL) _exit_error("Can’t create publisher");
    publisher_new_a(disp, NULL, host, port, _open_ack, disp);
    pas_dispatch(disp);
    _shutdown = 1;
    pas_dispatcher_delete(disp);
    exit(0);
}

Subscriber Code Sample

/*
 * Copyright 2000 by Sun Microsystems, Inc.
 * All rights reserved
 *
 * asub : example asynchronous subscriber
 *
 * Syntax:
 *   asub host port
 */
#include <stdlib.h>
#include <stdio.h> 

#include "pasdisp.h"
#include "subscriber.h" 

static pas_dispatcher_t *disp = NULL;
static subscriber_t *_subscriber = NULL;
static subscription_t *_subscription = NULL;
static renl_t *_renl = NULL; 

static void _exit_usage()
{
    printf("\nUsage:\nasub host port\n");
    exit(5);
}
static void _exit_error(const char *msg)
{
    printf("%s\n", msg);
    exit(1);
}
static void _subscribe_ack(void *arg, int rc, void *subscription)
{
    (void)arg;
    if (!rc)
    {
        _subscription = subscription;
        printf("Subscription successful\n");
    } else {
        printf("Subscription failed - status %d\n", rc);
        pas_shutdown(disp);
    }
}
static void _unsubscribe_ack(void *arg, int rc, void *ignored)
{
    (void *)ignored;
    (void *)arg;
    if (rc != 0)
    {
        printf("Unsubscribe failed - status %d\n", rc);
    }
    subscriber_delete(_subscriber);
    pas_shutdown(disp);
}
static int _handle_notify(void *arg, char *url, char *str, int len)
{
    (void *)arg;
    printf("[%s] %.*s\n", url, len, (str) ? str : "(null)");
    return 0;
}
static void _open_ack(void *arg, int rc, void *enc)
{
    _subscriber = (subscriber_t *)enc;
    (void *)arg;
    if (rc)
     {
        printf("Failed to create subscriber with status %d\n", rc);
        pas_shutdown(disp);
        return;
    }
    subscribe(_subscriber, "enp://siroe.com/xyz",
               _handle_notify, NULL,
               _subscribe_ack, NULL);
    return;
}
static void _unsubscribe(int sig)
{
    (int)sig;
    unsubscribe(_subscriber, _subscription, _unsubscribe_ack, NULL);
}
main(int argc, char **argv)
{
    unsigned short port = 7997;
    char host[256];
    if (argc < 2) _exit_usage();
    if (*(argv[1]) == ’0’)
     {
        strcpy(host, "127.0.0.1");
    } else {
        strcpy(host, argv[1]);
    }
    if (argc > 2)
     {
        port = (unsigned short)atoi(argv[2]);
    }
    disp = pas_dispatcher_new(NULL);
    if (disp == NULL) _exit_error("Can’t create publisher");
    subscriber_new_a(disp, NULL, host, port, _open_ack, NULL);
    pas_dispatch(disp);
    pas_dispatcher_delete(disp);
    exit(0);
}

Reliable Publisher and Subscriber

This sample code pair establishes a reliable asynchronous publisher and subscriber.

Reliable Publisher Sample

/*
 * Copyright 2000 by Sun Microsystems, Inc.
 * All rights reserved
 *
 * rpub : simple *reliable* interactive asynchronous publisher.
 *     It is designed to be used in combination with rsub,
 *     the reliable subscriber.
 *
 * Syntax:
 *   rpub host port
 */
#include <stdlib.h>
#include <stdio.h> 

#include "pasdisp.h"
#include "publisher.h" 

static pas_dispatcher_t *disp = NULL;
static publisher_t *_publisher = NULL;
static int _shutdown = 0;
static renl_t *_renl;
static void _read_stdin(); 

static void _exit_usage()
{
    printf("\nUsage:\nrpub host port\n");
    exit(5);
}
static void _exit_error(const char *msg)
{
    printf("%s\n", msg);
    exit(1);
}
static void _call_shutdown()
{
    _shutdown = 1;
    pas_shutdown(disp);
}
static void _renl_create_cb(void *arg, int rc, void *ignored)
{
    (void *)arg;
    (void *)ignored;
    if (!_publisher)
     {
        printf("Failed to create RENL - status %d\n", rc);
        _call_shutdown();
        return;
    }
    _read_stdin();
    return;
}
static void _publisher_new_cb(void *arg, int rc, void *enc)
{
    _publisher = (publisher_t *)enc;
    (void *)arg;
    if (!_publisher)
    {
        printf("Failed to create publisher - status %d\n", rc);
        _call_shutdown();
        return;
    }
    renl_create_publisher(_publisher, "renl_id", NULL,
                              _renl_create_cb,NULL);
    return;
}
static void _recv_ack(void *arg, int rc, void *ignored)
{
    (void *)ignored;
    if (rc < 0)
     {
        printf("Acknowledgment Timeout\n");
    } else if ( rc == 0) {
        printf("Acknowledgment Received\n");
    }
    fflush (stdout);
    _read_stdin();
    free(arg);
    return;
 }
static void _read_stdin()
{
    static char input[1024];
    printf("rpub> ");
    fflush(stdout);
    while (!_shutdown)
     {
        if ( !fgets(input, sizeof(input), stdin) )
        {
            continue;
        } else {
            char *message;
            unsigned int message_len;
            input[strlen(input) - 1] = 0; /* Strip off the \n */
            if (*input == ’.’ && input[1] == 0)
             {
                publisher_delete(_publisher);
                _call_shutdown();
                break;
            }
            message = strdup(input);
            message_len = strlen(message); 

            /* five seconds timeout */
            publish(_publisher, "enp://siroe.com/xyz",
                     message, message_len,
                     NULL, _recv_ack, message, 5000);
            return;
        }
    }
    return;
}
main(int argc, char **argv)
{
    unsigned short port = 7997;
    char host[256];
    if (argc < 2) _exit_usage();
    if (*(argv[1]) == ’0’)
     {
        strcpy(host, "127.0.0.1");
    } else {
        strcpy(host, argv[1]);
    }
    if (argc > 2)
     {
        port = (unsigned short)atoi(argv[2]);
    }
    disp = pas_dispatcher_new(NULL);
    if (disp == NULL) _exit_error("Can’t create publisher");
    publisher_new_a(disp, NULL, host, port, _publisher_new_cb,
                      NULL);
    pas_dispatch(disp);
    _shutdown = 1;
    pas_dispatcher_delete(disp);
    exit(0);
}

Reliable Subscriber Sample

/*
 * Copyright 2000 by Sun Microsystems, Inc.
 * All rights reserved
 *
 * asub : example asynchronous subscriber
 *
 * Syntax:
 *   asub host port
 */
#include <stdlib.h>
#include <stdio.h> 

#include "pasdisp.h"
#include "subscriber.h" 

static pas_dispatcher_t *disp = NULL;
static subscriber_t *_subscriber = NULL;
static subscription_t *_subscription = NULL;
static renl_t *_renl = NULL; 

static void _exit_usage()
{
    printf("\nUsage:\nasub host port\n");
    exit(5);
}
static void _exit_error(const char *msg)
{
    printf("%s\n", msg);
    exit(1);
}
static void _subscribe_ack(void *arg, int rc, void *subscription)
{
    (void)arg;
    if (!rc)
     {
        _subscription = subscription;
        printf("Subscription successful\n");
        _renl = renl_create_subscriber(_subscription, "renl_id", NULL);
    } else {
        printf("Subscription failed - status %d\n", rc)
        pas_shutdown(disp);
    }
}
static void _unsubscribe_ack(void *arg, int rc, void *ignored)
{
    (void *)ignored;
    (void *)arg;
    if (rc != 0)
    {
        printf("Unsubscribe failed - status %d\n", rc);
    }
    subscriber_delete(_subscriber);
    pas_shutdown(disp);
}
static int _handle_notify(void *arg, char *url, char *str, int len)
{
    (void *)arg;
    printf("[%s] %.*s\n", url, len, (str) ? str : "(null)");
    return 0;
}
static void _open_ack(void *arg, int rc, void *enc)
{
    _subscriber = (subscriber_t *)enc;
    (void *)arg;
    if (rc)
    {
        printf("Failed to create subscriber with status %d\n", rc);
        pas_shutdown(disp);
        return;
    }
    subscribe(_subscriber, "enp://siroe.com/xyz",_handle_notify,
                NULL,_subscribe_ack, NULL);
    return;
}
static void _unsubscribe(int sig)
{
    (int)sig;
    unsubscribe(_subscriber, _subscription, _unsubscribe_ack, NULL);
}
main(int argc, char **argv)
{
    unsigned short port = 7997;
    char host[256]; 

    if (argc < 2) _exit_usage();
    if (*(argv[1]) == ’0’)
     {
        strcpy(host, "127.0.0.1");
    } else {
        strcpy(host, argv[1]);
    }
    if (argc > 2)
     {
        port = (unsigned short)atoi(argv[2]);
    }
    disp = pas_dispatcher_new(NULL);
    if (disp == NULL) _exit_error("Can’t create publisher");
    subscriber_new_a(disp, NULL, host, port, _open_ack, NULL);
    pas_dispatch(disp);
    pas_dispatcher_delete(disp);
    exit(0);

}