Go to main content

man pages section 3: Basic Library Functions

Exit Print View

Updated: Wednesday, July 27, 2022
 
 

msgqipc (3)

Name

msgqipc - Fujitsu MSGQIPC API

Description

MSGQIPC is an IPC API for Fujitsu MSGQ, a custom messaging implementation with hardware support for sending small messages between CPUs. It is intended to be used in conjunction with Fujitsu CMI, Fujitsu's implementation of CMI, a distributed shared memory abstraction.

This API is designed for low latency, kernel-bypass messaging. Although not part of the MSGQIPC API, MSGQIPC is intended to supplement the CMI implementation, which provides extremely low latency access to remote memory, which a local process can map into its address space.

The primary functionality provided by MSGQIPC is based on message queues. A message queue, msgqipc_queue is a receive endpoint, resembling a queue capable of receiving messages from arbitrary senders. Message queues are backed by actual hardware constructs that support low level messaging primitives.

Message queues are created dynamically by an application, and are accessible from the entire cluster (Fujitsu-specific) through a globally unique 'address', msgqipc_addr. Once a queue is created using the msgqipc_create() function, and its hardware-backed state is allocated, its address is known and can be used as the target of send operations.

Once a queue is created, it has a fixed size (configurable, MSGQIPC_CTL_GET_MAX_MESSAGES), which is how many sends it can accommodate before it becomes full, and how many sends are blocked. As the receiver dequeues messages, slots become available again and senders can continue, retrying their failed sends. Each message also has a fixed size, which is 64 bytes (in MSGQIPC_V1). However, this does not prevent a client from building an arbitrary sized messaging layer over MSGQIPC. CMI can be used to support arbitrary sized payloads (through very low latency shared memory) using MSGQIPC sends as control messages. This can be done without fragmentation and also in userspace.

A message msgqipc_send() is a synchronous operation, with the assurance that on return of the function, the message is either in the receive queue of the target, or that the operation has failed and the message is NOT in the queue. These characteristics make MSGQIPC's sends reliable, as well as ordered. With the exception of flow control related failures, error MSGQIPC_ERR_RNR, which may be transient, a send failure usually indicates a fatal error with respect to the target such as the target queue is deleted, or the owning process or entire remote node has crashed.

The owner of a receive queue can receive or dequeue messages from its queue directly with the msgqipc_recv() call, from userspace. Queues can be marked as blocking or non-blocking, which dictates the behavior of receive if the queue is empty and there are no more messages to dequeue. A non-blocking queue would return failure in the case of an empty queue, while a blocking queue would wait in the kernel to be woken up when a message arrives. Note that blocking queues only affect the behavior of receive when a queue is empty. It does not impact the case where messages are available, and also has no effect on sends and senders.

For the purpose of waiting for an event, such as an empty queue receiving a new message, MSGQIPC provides a mechanism to obtain an fd (file descriptor) that can be signalled when an activity occurs on any resource created or registered by a particular client, which also includes any message queues it has created. This fd can be used in standard system wait calls, such as poll(2). This is the preferred method for waiting for activity. Blocking receives are provided for simple clients or proof of concepts.

In latency sensitive applications, waiting for an activity should be avoided unless necessary, as this will cause the thread or process to enter the scheduler (kernel) and can incur significant overhead and non-deterministic latency. Depending on an applications design, it can likely achieve much better performance by using a CPU instruction to 'pause' without yielding (avoid entering kernel). The latency of the sends, application response time, and system load should be taken into account. An application will likely use a combination of pause or polling mode, and poll(2) or event mode, to handle various scenarios.

The API provides some additional functionality to aid clients. For example, to avoid race conditions or similar issues that are otherwise impossible to workaround. There is also a control interface that is used to query attributes, modify state or behavior, and add future functionality without modifying the API function vector.

MSGQIPC API

The API is accessed through function pointers obtained through the msgqipc_load() call. Only a few functions are exported, and the bulk of the API is accessed through the returned function vector from msgqipc_load().

The implementation may need to set up state prior to the API being accessed, and this is done by having the client call the msgqipc_thread_init() function. This must be called in each thread that accesses the API, including the first thread of the process. It is acceptable to call it multiple times from the same thread. There is no thread exit function needed.

Most MSGQIPC interfaces can return success or failure. The reason for failure is obtained through the msgqipc_get_error() function. The actual error code returned may have a specific meaning to the actual function that generated the error. The status is stored per-thread, and applies to the last function called in that thread, so should be inspected similar to how one would use errno.

The main API, msgqipc_api(), can accessed through a 'context' object, msgqipc_ctx, which is created through the msgqipc_init() function. This is a handle that is used to access all of the interfaces of the API vector, with the obvious exception of the msgqipc_init() function. The context can be thought of as an additional layer of isolation and ownership for an application component. State and resources/objects created or modified when using the API are associated with context provided to the interface.

The context allows multiple components to coexist in the same process or thread, and use the MSGQIPC API without impacting each other. Any objects, resources, or state created or modified by use of the API is isolated to the context provided to the API.


Note -  The current implementation (MSGQIPC_V1) restricts the use of a context to the OS thread which created it. That means if one thread creates a context, another thread cannot use that context, even if no other thread is accessing it (i.e. exclusive access to the context). This may be relaxed in future implementations to allow shared work queue client implementations, where a pool of worker threads can service any context initialized in the same OS process.

Independent components which use MSGQIPC should create their own context to access the API. Also, while the API itself is thread-safe, the context itself is not. This means that an application component needs to initialize a context in each thread that may need concurrent access to the API. Since a context cannot be shared between threads in the current implementation, MSGQIPC_V1, each thread needs its own context regardless of concurrent API access.

Objects, such as message queues, msgqipc_queue, belong to their creating context. Although this ownership is implied, functions which operate on a queue, such as the msgqipc_close() function, still require a context argument be passed in, and must be the one that created it. The implementation may assert this, and behavior is undefined if the wrong context is provided.

A context is destroyed through the msgqipc_destroy() function. This also destroys all associated objects and state. Use of those objects after they are destroyed can cause undefined behavior.

Version Information

#define MSGQIPC_API_V1   (1)
         MSGQIPC API version 1.
#define MSGQIPC_API_V2   (2)
        MSGQIPC API version 2.
#define MSGQIPC_API_VERNO   (MSGQIPC_API_V2)
         header version

API Objects

typedef struct msgqipc_ctx msgqipc_ctx
         msgqipc api client
typedef uint64_t msgqipc_key
         key for named objects
typedef struct msgqipc_queue msgqipc_queue
         message queue
typedef struct msgqipc_addr msgqipc_addr
         address of message queue
#define MSGQIPC_SENDID_SPC   (30)

Error Codes

#define MSGQIPC_ERR_SUCC           0
         operation successful
#define MSGQIPC_ERR_INVAL          1
         invalid argument
#define MSGQIPC_ERR_NORES          2
         out of resources
#define MSGQIPC_ERR_RNR            3
         receiver not ready
#define MSGQIPC_ERR_FAILOVER       4
         hardware failover occurred
#define MSGQIPC_ERR_FAILURE        5
         general failure occurred (?)
#define MSGQIPC_ERR_NOTSUPP        6
         not supported
#define MSGQIPC_ERR_TRUNC          7
         object truncated
#define MSGQIPC_ERR_SIZE           8
         object too large
#define MSGQIPC_ERR_INACTIVE       9
         target exists but inactive
#define MSGQIPC_ERR_EXISTS        11
         object already exists
#define MSGQIPC_ERR_CANCEL        12
         object or operation cancelled
#define MSGQIPC_ERR_NOTFOUND      13
         object not found
#define MSGQIPC_ERR_NRQR          14
        NRQR value is invalid

Client Initialization

msgqipc_ctx * msgqipc_ini (msgqipc_api *api)
         initialize a context for given API
int msgqipc_destroy (msgqipc_ctx *ctx)
         destroy context and its resources/state

Queue Creation Operations

msgqipc_queue * msgqipc_create (msgqipc_ctx *ctx,
         msgqipc_key key, msgqipc_addr *addr, uint32_t entries,
         uint64_t cookie, uint32_t flags)
         create a queue
int msgqipc_activate (msgqipc_ctx *ctx, msgqipc_queue
         *queue)
         enable queue for incoming sends
int msgqipc_deactivate (msgqipc_ctx *ctx, msgqipc_queue
         *queue)
         disable sends for queue
int msgqipc_close (msgqipc_ctx *ctx, msgqipc_queue *queue,
         uint32_t flags)
         close a queue and release its resources

#define MSGQIPC_KEY_PRIVATE           ((msgqipc_key) 0)
         null key - private queue
#define MSGQIPC_CREATE_BIND_CPU       (0x00000001)
         bind to local cpu
#define MSGQIPC_CREATE_CANCEL_CAPABLE (0x00000008)
         fail if name exists

Queue Receive Operations

ssize_t msgqipc_recv (msgqipc_ctx *ctx, msgqipc_queue
         *queue, void *msg, size_t len)
         receive message at head of queue, if any
ssize_t msgqipc_peek (msgqipc_ctx *ctx, msgqipc_queue
         *queue, void *msg, size_t len, size_t index)
         peek at message at head of queue

Send Operations

int msgqipc_send (msgqipc_ctx *ctx, msgqipc_addr *addr, void
         *msg, size_t len)
         send a message to a queue
int msgqipc_cancel (msgqipc_ctx *ctx, msgqipc_addr *addr)
         mark messages in target queue as cancelled, if sender id
         matches
int msgqipc_poke (msgqipc_ctx *ctx, msgqipc_addr *addr)
        Send an empty message to a queue.
        This is used for status checking for given queue.

Event Notification

enum msgqipc_event_type { MSGQIPC_EVT_CANSEND = 1,
         MSGQIPC_EVT_DATAREADY = 2, MSGQIPC_EVT_FAILOVER = 3 }
         Types of events generated for a context.
int msgqipc_get_events (msgqipc_ctx *ctx, msgqipc_event
         *events, size_t len)
         get events pending for context

Control Operations

int msgqipc_ctl (msgqipc_ctx *ctx, msgqipc_queue *queue, int
         cmd, uint64_t arg, void *ptr)
         control context or object (queries or modifies
         attributes/behaviors)
#define MSGQIPC_CTL_GET_MAX_MSG_SIZE   (1)
         get max message size
#define MSGQIPC_CTL_GET_MAX_MESSAGES   (2)
         get max queue size
#define MSGQIPC_CTL_GET_ADDR_SIZE      (3)
         get msgqipc_addr size
#define MSGQIPC_CTL_NONBLOCK           (103)
         modify recv() behavior
#define MSGQIPC_CTL_GET_FD             (104)
         get poll(2) fd
#define MSGQIPC_CTL_ARM                (105)
         arm ctx for interrupts
#define MSGQIPC_CTL_LISTEN_ADDR        (107)
         monitor queue status
#define MSGQIPC_CTL_FORGET_ADDR        (108)
         stop monitoring queue
#define MSGQIPC_CTL_NAMED_OPTS         (109)
         named queue support
#define MSGQIPC_CTL_SET_SENDER_ID      (110)
         set sender id - send()
#define MSGQIPC_CTL_GET_LAST_MSG_SIZE  (111)
         size of last recv()
#define MSGQIPC_CTL_SET_NRQR           (112)
        Set value of NRQR, which is a threshold of number
        available for empty message entries to get
        MSGQIPC_EVT_CANSEND event after queue gets full state.
        The NRQR value should be set into 'arg' (msgqipc_nrqr_t).

typedef enum {
         MSGQIPC_NRQR_DEFAULT = 0,
         MSGQIPC_NRQR_2 = 1,
         MSGQIPC_NRQR_8 = 3,
         MSGQIPC_NRQR_32 = 5
} msgqipc_nrqr_t;

         NRQR value to set through MSGQIPC_CTL_SET_NRQR.

         MSGQIPC_NRQR_DEFAULT: 1 message entry
         MSGQIPC_NRQR_2      : 1/2  of total message entries
         MSGQIPC_NRQR_8      : 1/8  of total message entries
         MSGQIPC_NRQR_32     : 1/32 of total message entries

General API

typedef msgqipc_ctx(* msgqipc_ini_t )(msgqipc_api *)
typedef int(* msgqipc_destroy_t )(msgqipc_ctx *)
typedef msgqipc_queue *(* msgqipc_create_t )(msgqipc_ctx *,
         msgqipc_key, msgqipc_addr *, uint32_t, uint64_t,
         uint32_t)
typedef int(* msgqipc_activate_t )(msgqipc_ctx *,
         msgqipc_queue *)
typedef int(* msgqipc_deactivate_t )(msgqipc_ctx *,
         msgqipc_queue *)
typedef int(* msgqipc_close_t )(msgqipc_ctx *, msgqipc_queue
         *, uint32_t)
typedef ssize_t(* msgqipc_recv_t )(msgqipc_ctx *,
         msgqipc_queue *, void *, size_t)
typedef ssize_t(* msgqipc_peek_t )(msgqipc_ctx *,
         msgqipc_queue *, msgqipc_sendid *, void *, size_t,
         size_t)
typedef int(* msgqipc_send_t )(msgqipc_ctx *, msgqipc_addr
         *, void *, size_t)
typedef int(* msgqipc_cancel_t )(msgqipc_ctx *, msgqipc_addr
         *)
typedef int(* msgqipc_get_events_t )(msgqipc_ctx *,
         msgqipc_event *, size_t)
typedef int(* msgqipc_ctl_t )(msgqipc_ctx *, msgqipc_queue
         *, int, uint64_t, void *)
typedef msgqipc_api(* msgqipc_load_t )(uint16_t)
typedef int(* msgqipc_get_error_t )()
typedef int(* msgqipc_thead_init_t )()
msgqipc_api * msgqipc_load (uint16_t verno)
         load api for verno
int msgqipc_get_error ()
         get error for last function called in this thread
int msgqipc_thread_init ()
         initialize current process thread for access to MSGQIPC
         api

Data Structures

struct msgqipc_sendid
         Client specified id associated with a send().
struct msgqipc_event
         Event descriptor.
struct msgqipc_v1
         api version 1 vector
struct msgqipc_api
         loaded api

Enumeration Type Documentation

enum msgqipc_event_type
     Types of events generated for a context.

     Enumerator

     MSGQIPC_EVT_CANSEND
          monitored addr became ready for send()

     MSGQIPC_EVT_DATAREADY
          queue has messages ready for recv()

     MSGQIPC_EVT_FAILOVER
          failover occurred in the cluster

     Definition at line 389 of file msgqipc.h.

Function Documentation

msgqipc_ctx* msgqipc_ini (msgqipc_api *api)
     initialize a context for given API

     Parameters:
         api loaded api to create context for

int msgqipc_destroy (msgqipc_ctx *ctx)
     destroy context and its resources/state

     Parameters:
         ctx context to destroy

msgqipc_queue* msgqipc_create (msgqipc_ctx *ctx,
     msgqipc_keykey, msgqipc_addr *addr, uint32_tentries,
     uint64_tcookie, uint32_tflags)
     create a queue

     Parameters:
         ctx client context
         key key for named queue (optional)
         addr address of created queue
         entries queue size (max messages)
         cookie client cookie for queue
         flags creation options

int msgqipc_activate (msgqipc_ctx *ctx, msgqipc_queue *queue)
     enable queue for incoming sends

     Parameters:
         ctx client context
         queue queue to activate

int msgqipc_deactivate (msgqipc_ctx *ctx, msgqipc_queue *queue)
     disable sends for queue

     Parameters:
         ctx client context
         queue queue to deactivate

int msgqipc_close (msgqipc_ctx *ctx, msgqipc_queue *queue,
     uint32_tflags)
     close a queue and release its resources

     Parameters:
         ctx client context
         queue queue to close
         flags closure options

ssize_t msgqipc_recv (msgqipc_ctx *ctx, msgqipc_queue *queue,
     void *msg, size_tlen)
     receive message at head of queue, if any

     Parameters:
         ctx client context
         queue queue to receive on
         msg buffer for received message
         len size of msg

ssize_t msgqipc_peek (msgqipc_ctx *ctx, msgqipc_queue *queue,
     void *msg, size_tlen, size_tindex)
     peek at message at head of queue

     Parameters:
         ctx client context
         queue queue to receive on
         msg buffer for received message
         len size of msg
         index index to peek at

int msgqipc_send (msgqipc_ctx *ctx, msgqipc_addr *addr, void
     *msg, size_tlen)
     send a message to a queue

     Parameters:
         ctx client context
         addr address of queue to send to
         msg message data
         len length of message

int msgqipc_cancel (msgqipc_ctx *ctx, msgqipc_addr *addr)
     mark messages in target queue as cancelled, if sender id
     matches

     Parameters:
         ctx client context
         addr address of target queue

int msgqipc_get_events (msgqipc_ctx *ctx, msgqipc_event
     *events, size_tlen)
     get events pending for context

     Parameters:
         ctx client context
         events array of events populate
         len number of entries in array

int msgqipc_ctl (msgqipc_ctx *ctx, msgqipc_queue *queue,
     intcmd, uint64_targ, void *ptr)
     control context or object (queries or modifies
     attributes/behaviors)

     Parameters:
         ctx client context
         queue queue to operate on (cmd specific)
         cmd command to perform
         arg argument for cmd
         ptr pointer argument for cmd

msgqipc_api* msgqipc_load (uint16_tverno)
     load api for verno

     Parameters:
         verno version of api to load

int msgqipc_poke (msgqipc_ctx *ctx, msgqipc_addr *addr)
    Send an empty message to a queue. The sent message will not
    be saved in queue (no space consumed).
    This would be used for the given queue's status checking.

    Parameters:
        ctx client context
        addr address of queue to send to

Attributes

See attributes(7) for descriptions of the following attributes:

ATTRIBUTE TYPE
ATTRIBUTE VALUE
Interface Stability
Uncommitted
Availability
system/fjcmi
MT-Level
MT-Safe

See Also

fjgmu(4D)