msgqipc - Fujitsu MSGQIPC API
MSGQIPC is an IPC API for Fujitsu MSGQ, a custom messaging implementation with hardware support for sending small messages between CPUs. It is intended to be used in conjunction with Fujitsu CMI, Fujitsu's implementation of CMI, a distributed shared memory abstraction.
This API is designed for low latency, kernel-bypass messaging. Although not part of the MSGQIPC API, MSGQIPC is intended to supplement the CMI implementation, which provides extremely low latency access to remote memory, which a local process can map into its address space.
The primary functionality provided by MSGQIPC is based on message queues. A message queue, msgqipc_queue is a receive endpoint, resembling a queue capable of receiving messages from arbitrary senders. Message queues are backed by actual hardware constructs that support low level messaging primitives.
Message queues are created dynamically by an application, and are accessible from the entire cluster (Fujitsu-specific) through a globally unique 'address', msgqipc_addr. Once a queue is created using the msgqipc_create() function, and its hardware-backed state is allocated, its address is known and can be used as the target of send operations.
Once a queue is created, it has a fixed size (configurable, MSGQIPC_CTL_GET_MAX_MESSAGES), which is how many sends it can accommodate before it becomes full, and how many sends are blocked. As the receiver dequeues messages, slots become available again and senders can continue, retrying their failed sends. Each message also has a fixed size, which is 64 bytes (in MSGQIPC_V1). However, this does not prevent a client from building an arbitrary sized messaging layer over MSGQIPC. CMI can be used to support arbitrary sized payloads (through very low latency shared memory) using MSGQIPC sends as control messages. This can be done without fragmentation and also in userspace.
A message msgqipc_send() is a synchronous operation, with the assurance that on return of the function, the message is either in the receive queue of the target, or that the operation has failed and the message is NOT in the queue. These characteristics make MSGQIPC's sends reliable, as well as ordered. With the exception of flow control related failures, error MSGQIPC_ERR_RNR, which may be transient, a send failure usually indicates a fatal error with respect to the target such as the target queue is deleted, or the owning process or entire remote node has crashed.
The owner of a receive queue can receive or dequeue messages from its queue directly with the msgqipc_recv() call, from userspace. Queues can be marked as blocking or non-blocking, which dictates the behavior of receive if the queue is empty and there are no more messages to dequeue. A non-blocking queue would return failure in the case of an empty queue, while a blocking queue would wait in the kernel to be woken up when a message arrives. Note that blocking queues only affect the behavior of receive when a queue is empty. It does not impact the case where messages are available, and also has no effect on sends and senders.
For the purpose of waiting for an event, such as an empty queue receiving a new message, MSGQIPC provides a mechanism to obtain an fd (file descriptor) that can be signalled when an activity occurs on any resource created or registered by a particular client, which also includes any message queues it has created. This fd can be used in standard system wait calls, such as poll(2). This is the preferred method for waiting for activity. Blocking receives are provided for simple clients or proof of concepts.
In latency sensitive applications, waiting for an activity should be avoided unless necessary, as this will cause the thread or process to enter the scheduler (kernel) and can incur significant overhead and non-deterministic latency. Depending on an applications design, it can likely achieve much better performance by using a CPU instruction to 'pause' without yielding (avoid entering kernel). The latency of the sends, application response time, and system load should be taken into account. An application will likely use a combination of pause or polling mode, and poll(2) or event mode, to handle various scenarios.
The API provides some additional functionality to aid clients. For example, to avoid race conditions or similar issues that are otherwise impossible to workaround. There is also a control interface that is used to query attributes, modify state or behavior, and add future functionality without modifying the API function vector.
The API is accessed through function pointers obtained through the msgqipc_load() call. Only a few functions are exported, and the bulk of the API is accessed through the returned function vector from msgqipc_load().
The implementation may need to set up state prior to the API being accessed, and this is done by having the client call the msgqipc_thread_init() function. This must be called in each thread that accesses the API, including the first thread of the process. It is acceptable to call it multiple times from the same thread. There is no thread exit function needed.
Most MSGQIPC interfaces can return success or failure. The reason for failure is obtained through the msgqipc_get_error() function. The actual error code returned may have a specific meaning to the actual function that generated the error. The status is stored per-thread, and applies to the last function called in that thread, so should be inspected similar to how one would use errno.
The main API, msgqipc_api(), can accessed through a 'context' object, msgqipc_ctx, which is created through the msgqipc_init() function. This is a handle that is used to access all of the interfaces of the API vector, with the obvious exception of the msgqipc_init() function. The context can be thought of as an additional layer of isolation and ownership for an application component. State and resources/objects created or modified when using the API are associated with context provided to the interface.
The context allows multiple components to coexist in the same process or thread, and use the MSGQIPC API without impacting each other. Any objects, resources, or state created or modified by use of the API is isolated to the context provided to the API.
Independent components which use MSGQIPC should create their own context to access the API. Also, while the API itself is thread-safe, the context itself is not. This means that an application component needs to initialize a context in each thread that may need concurrent access to the API. Since a context cannot be shared between threads in the current implementation, MSGQIPC_V1, each thread needs its own context regardless of concurrent API access.
Objects, such as message queues, msgqipc_queue, belong to their creating context. Although this ownership is implied, functions which operate on a queue, such as the msgqipc_close() function, still require a context argument be passed in, and must be the one that created it. The implementation may assert this, and behavior is undefined if the wrong context is provided.
A context is destroyed through the msgqipc_destroy() function. This also destroys all associated objects and state. Use of those objects after they are destroyed can cause undefined behavior.
#define MSGQIPC_API_V1 (1) MSGQIPC API version 1. #define MSGQIPC_API_V2 (2) MSGQIPC API version 2. #define MSGQIPC_API_VERNO (MSGQIPC_API_V2) header version
typedef struct msgqipc_ctx msgqipc_ctx msgqipc api client typedef uint64_t msgqipc_key key for named objects typedef struct msgqipc_queue msgqipc_queue message queue typedef struct msgqipc_addr msgqipc_addr address of message queue #define MSGQIPC_SENDID_SPC (30)
#define MSGQIPC_ERR_SUCC 0 operation successful #define MSGQIPC_ERR_INVAL 1 invalid argument #define MSGQIPC_ERR_NORES 2 out of resources #define MSGQIPC_ERR_RNR 3 receiver not ready #define MSGQIPC_ERR_FAILOVER 4 hardware failover occurred #define MSGQIPC_ERR_FAILURE 5 general failure occurred (?) #define MSGQIPC_ERR_NOTSUPP 6 not supported #define MSGQIPC_ERR_TRUNC 7 object truncated #define MSGQIPC_ERR_SIZE 8 object too large #define MSGQIPC_ERR_INACTIVE 9 target exists but inactive #define MSGQIPC_ERR_EXISTS 11 object already exists #define MSGQIPC_ERR_CANCEL 12 object or operation cancelled #define MSGQIPC_ERR_NOTFOUND 13 object not found #define MSGQIPC_ERR_NRQR 14 NRQR value is invalid
msgqipc_ctx * msgqipc_ini (msgqipc_api *api) initialize a context for given API int msgqipc_destroy (msgqipc_ctx *ctx) destroy context and its resources/state
msgqipc_queue * msgqipc_create (msgqipc_ctx *ctx, msgqipc_key key, msgqipc_addr *addr, uint32_t entries, uint64_t cookie, uint32_t flags) create a queue int msgqipc_activate (msgqipc_ctx *ctx, msgqipc_queue *queue) enable queue for incoming sends int msgqipc_deactivate (msgqipc_ctx *ctx, msgqipc_queue *queue) disable sends for queue int msgqipc_close (msgqipc_ctx *ctx, msgqipc_queue *queue, uint32_t flags) close a queue and release its resources #define MSGQIPC_KEY_PRIVATE ((msgqipc_key) 0) null key - private queue #define MSGQIPC_CREATE_BIND_CPU (0x00000001) bind to local cpu #define MSGQIPC_CREATE_CANCEL_CAPABLE (0x00000008) fail if name exists
ssize_t msgqipc_recv (msgqipc_ctx *ctx, msgqipc_queue *queue, void *msg, size_t len) receive message at head of queue, if any ssize_t msgqipc_peek (msgqipc_ctx *ctx, msgqipc_queue *queue, void *msg, size_t len, size_t index) peek at message at head of queue
int msgqipc_send (msgqipc_ctx *ctx, msgqipc_addr *addr, void *msg, size_t len) send a message to a queue int msgqipc_cancel (msgqipc_ctx *ctx, msgqipc_addr *addr) mark messages in target queue as cancelled, if sender id matches int msgqipc_poke (msgqipc_ctx *ctx, msgqipc_addr *addr) Send an empty message to a queue. This is used for status checking for given queue.
enum msgqipc_event_type { MSGQIPC_EVT_CANSEND = 1, MSGQIPC_EVT_DATAREADY = 2, MSGQIPC_EVT_FAILOVER = 3 } Types of events generated for a context. int msgqipc_get_events (msgqipc_ctx *ctx, msgqipc_event *events, size_t len) get events pending for context
int msgqipc_ctl (msgqipc_ctx *ctx, msgqipc_queue *queue, int cmd, uint64_t arg, void *ptr) control context or object (queries or modifies attributes/behaviors) #define MSGQIPC_CTL_GET_MAX_MSG_SIZE (1) get max message size #define MSGQIPC_CTL_GET_MAX_MESSAGES (2) get max queue size #define MSGQIPC_CTL_GET_ADDR_SIZE (3) get msgqipc_addr size #define MSGQIPC_CTL_NONBLOCK (103) modify recv() behavior #define MSGQIPC_CTL_GET_FD (104) get poll(2) fd #define MSGQIPC_CTL_ARM (105) arm ctx for interrupts #define MSGQIPC_CTL_LISTEN_ADDR (107) monitor queue status #define MSGQIPC_CTL_FORGET_ADDR (108) stop monitoring queue #define MSGQIPC_CTL_NAMED_OPTS (109) named queue support #define MSGQIPC_CTL_SET_SENDER_ID (110) set sender id - send() #define MSGQIPC_CTL_GET_LAST_MSG_SIZE (111) size of last recv() #define MSGQIPC_CTL_SET_NRQR (112) Set value of NRQR, which is a threshold of number available for empty message entries to get MSGQIPC_EVT_CANSEND event after queue gets full state. The NRQR value should be set into 'arg' (msgqipc_nrqr_t). typedef enum { MSGQIPC_NRQR_DEFAULT = 0, MSGQIPC_NRQR_2 = 1, MSGQIPC_NRQR_8 = 3, MSGQIPC_NRQR_32 = 5 } msgqipc_nrqr_t; NRQR value to set through MSGQIPC_CTL_SET_NRQR. MSGQIPC_NRQR_DEFAULT: 1 message entry MSGQIPC_NRQR_2 : 1/2 of total message entries MSGQIPC_NRQR_8 : 1/8 of total message entries MSGQIPC_NRQR_32 : 1/32 of total message entries
typedef msgqipc_ctx(* msgqipc_ini_t )(msgqipc_api *) typedef int(* msgqipc_destroy_t )(msgqipc_ctx *) typedef msgqipc_queue *(* msgqipc_create_t )(msgqipc_ctx *, msgqipc_key, msgqipc_addr *, uint32_t, uint64_t, uint32_t) typedef int(* msgqipc_activate_t )(msgqipc_ctx *, msgqipc_queue *) typedef int(* msgqipc_deactivate_t )(msgqipc_ctx *, msgqipc_queue *) typedef int(* msgqipc_close_t )(msgqipc_ctx *, msgqipc_queue *, uint32_t) typedef ssize_t(* msgqipc_recv_t )(msgqipc_ctx *, msgqipc_queue *, void *, size_t) typedef ssize_t(* msgqipc_peek_t )(msgqipc_ctx *, msgqipc_queue *, msgqipc_sendid *, void *, size_t, size_t) typedef int(* msgqipc_send_t )(msgqipc_ctx *, msgqipc_addr *, void *, size_t) typedef int(* msgqipc_cancel_t )(msgqipc_ctx *, msgqipc_addr *) typedef int(* msgqipc_get_events_t )(msgqipc_ctx *, msgqipc_event *, size_t) typedef int(* msgqipc_ctl_t )(msgqipc_ctx *, msgqipc_queue *, int, uint64_t, void *) typedef msgqipc_api(* msgqipc_load_t )(uint16_t) typedef int(* msgqipc_get_error_t )() typedef int(* msgqipc_thead_init_t )() msgqipc_api * msgqipc_load (uint16_t verno) load api for verno int msgqipc_get_error () get error for last function called in this thread int msgqipc_thread_init () initialize current process thread for access to MSGQIPC api
struct msgqipc_sendid Client specified id associated with a send(). struct msgqipc_event Event descriptor. struct msgqipc_v1 api version 1 vector struct msgqipc_api loaded api
enum msgqipc_event_type Types of events generated for a context. Enumerator MSGQIPC_EVT_CANSEND monitored addr became ready for send() MSGQIPC_EVT_DATAREADY queue has messages ready for recv() MSGQIPC_EVT_FAILOVER failover occurred in the cluster Definition at line 389 of file msgqipc.h.
msgqipc_ctx* msgqipc_ini (msgqipc_api *api) initialize a context for given API Parameters: api loaded api to create context for int msgqipc_destroy (msgqipc_ctx *ctx) destroy context and its resources/state Parameters: ctx context to destroy msgqipc_queue* msgqipc_create (msgqipc_ctx *ctx, msgqipc_keykey, msgqipc_addr *addr, uint32_tentries, uint64_tcookie, uint32_tflags) create a queue Parameters: ctx client context key key for named queue (optional) addr address of created queue entries queue size (max messages) cookie client cookie for queue flags creation options int msgqipc_activate (msgqipc_ctx *ctx, msgqipc_queue *queue) enable queue for incoming sends Parameters: ctx client context queue queue to activate int msgqipc_deactivate (msgqipc_ctx *ctx, msgqipc_queue *queue) disable sends for queue Parameters: ctx client context queue queue to deactivate int msgqipc_close (msgqipc_ctx *ctx, msgqipc_queue *queue, uint32_tflags) close a queue and release its resources Parameters: ctx client context queue queue to close flags closure options ssize_t msgqipc_recv (msgqipc_ctx *ctx, msgqipc_queue *queue, void *msg, size_tlen) receive message at head of queue, if any Parameters: ctx client context queue queue to receive on msg buffer for received message len size of msg ssize_t msgqipc_peek (msgqipc_ctx *ctx, msgqipc_queue *queue, void *msg, size_tlen, size_tindex) peek at message at head of queue Parameters: ctx client context queue queue to receive on msg buffer for received message len size of msg index index to peek at int msgqipc_send (msgqipc_ctx *ctx, msgqipc_addr *addr, void *msg, size_tlen) send a message to a queue Parameters: ctx client context addr address of queue to send to msg message data len length of message int msgqipc_cancel (msgqipc_ctx *ctx, msgqipc_addr *addr) mark messages in target queue as cancelled, if sender id matches Parameters: ctx client context addr address of target queue int msgqipc_get_events (msgqipc_ctx *ctx, msgqipc_event *events, size_tlen) get events pending for context Parameters: ctx client context events array of events populate len number of entries in array int msgqipc_ctl (msgqipc_ctx *ctx, msgqipc_queue *queue, intcmd, uint64_targ, void *ptr) control context or object (queries or modifies attributes/behaviors) Parameters: ctx client context queue queue to operate on (cmd specific) cmd command to perform arg argument for cmd ptr pointer argument for cmd msgqipc_api* msgqipc_load (uint16_tverno) load api for verno Parameters: verno version of api to load int msgqipc_poke (msgqipc_ctx *ctx, msgqipc_addr *addr) Send an empty message to a queue. The sent message will not be saved in queue (no space consumed). This would be used for the given queue's status checking. Parameters: ctx client context addr address of queue to send to
See attributes(7) for descriptions of the following attributes:
|