STREAMS Programming Guide

Message Queueing and Priorities

Any delay in processing messages causes message queues to grow. Normally, queued messages are handled in a first-in, first-out (FIFO) manner. However, certain conditions can require that associated messages (for instance, an error message) reach their stream destination as rapidly as possible. For this reason messages are assigned priorities using a priority band associated with each message. Ordinary messages have a priority of zero. High-priority messages are high priority by nature of their message type. Their priority band is ignored. By convention, they are not affected by flow control. Figure 3–1 shows how messages are ordered in a queue according to priority.

Figure 3–1 Message Ordering in a Queue

Diagram shows how messages are ordered in a queue according to priority.

When a message is queued, it is placed after the messages of the same priority already in the queue (in other words, FIFO within their order of queueing). This affects the flow-control parameters associated with the band of the same priority. Message priorities range from 0 (normal) to 255 (highest). This provides up to 256 bands of message flow within a stream. Expedited data can be implemented with one extra band of flow (priority band 1) of data. This is shown in Figure 3–2.

Figure 3–2 Message Ordering With One Priority Band

Diagram shows a message queue with expedited messages.

Controlling Data Flow and Priorities

The I_FLUSHBAND, I_CKBAND, I_GETBAND, I_CANPUT, and I_ATMARK ioctl(2)s support multiple bands of data flow. The I_FLUSHBAND ioctl(2) allows a user to flush a particular band of messages. Flush Handling discusses it in more detail.

The I_CKBAND ioctl(2) checks if a message of a given priority exists on the stream head read queue. Its interface is:


ioctl (fd, I_CKBAND, pri); 

The call returns 1 if a message of priority pri exists on the stream head read queue and 0 if no message of priority pri exists. If an error occurs, -1 is returned. Note that pri should be of type int.

The I_GETBAND ioctl(2) checks the priority of the first message on the stream head read queue. The interface is:


ioctl (fd, I_GETBAND, prip); 

The call results in the integer referenced by prip being set to the priority band of the message on the front of the stream head read queue.

The I_CANPUT ioctl(2) checks if a certain band is writable. Its interface is:


ioctl (fd, I_CANPUT, pri); 

The return value is 0 if the priority band pri is flow controlled, 1 if the band is writable, and -1 on error.

A module or driver can mark a message. This supports the ability of the Transmission Control Protocol (TCP) to indicate to the user the last byte of out-of-band data. Once marked, a message sent to the stream head causes the stream head to remember the message. A user can check whether the message on the front of its stream head read queue is marked with the I_ATMARK ioctl(2). If a user is reading data from the stream head, there are multiple messages on the read queue, and one of those messages is marked, the read(2) terminates when it reaches the marked message and returns the data only up to the marked message. Successive reads can return the rest of the data. Chapter 4, Application Access to the STREAMS Driver and Module Interfaces discusses this in more detail.

The I_ATMARK ioctl(2) has the format:


ioctl (fd, I_ATMARK, flag); 

where flag can be either ANYMARK or LASTMARK. ANYMARK indicates that the user wants to check whether any message is marked. LASTMARK indicates that the user wants to see whether the message is the one and only one marked in the queue. If the test succeeds, 1 is returned. On failure, 0 is returned. If an error occurs, -1 is returned.

Accessing the Service Provider

The first routine presented, inter_open, opens the protocol driver device file specified by path and binds the protocol address contained in addr so that it can receive data. On success, the routine returns the file descriptor associated with the open stream; on failure, it returns -1 and sets errno to indicate the appropriate UNIX system error value. Example 3–1 shows the inter_open routine.


Example 3–1 inter_open Routine

inter_open (char *path, oflags, addr)
{
 	int fd;
 	struct bind_req bind_req;
 	struct strbuf ctlbuf;
 	union  primitives rcvbuf;
 	struct error_ack *error_ack;
 	int flags;

		if ((fd = open(path, oflags)) < 0)
			return(-1);

	/* send bind request msg down stream */

		bind_req.PRIM_type = BIND_REQ;
		bind_req.BIND_addr = addr;
		ctlbuf.len = sizeof(struct bind_req);
 	ctlbuf.buf = (char *)&bind_req;

		if (putmsg(fd, &ctlbuf, NULL, 0) < 0) {
			close(fd);
			return(-1);
		}
}

After opening the protocol driver, inter_open packages a bind request message to send downstream. putmsg is called to send the request to the service provider. The bind request message contains a control part that holds a bind_req structure, but it has no data part. ctlbuf is a structure of type strbuf, and it is initialized with the primitive type and address. Notice that the maxlen field of ctlbuf is not set before calling putmsg. That is because putmsg ignores this field. The dataptr argument to putmsg is set to NULL to indicate that the message contains no data part. The flags argument is 0, which specifies that the message is not a high-priority message.

After inter_open sends the bind request, it must wait for an acknowledgement from the service provider, as Example 3–2 shows.


Example 3–2 Service Provider

/* wait for ack of request */

 ctlbuf.maxlen = sizeof(union primitives);
 ctlbuf.len = 0;
 ctlbuf.buf = (char *)&rcvbuf;
 flags = RS_HIPRI;

 if (getmsg(fd, &ctlbuf, NULL, &flags) < 0) {
 	close(fd);
 	return(-1);
}

 /* did we get enough to determine type? */
 if (ctlbuf.len < sizeof(long)) {
 	close(fd);
 	errno = EPROTO;
 	return(-1);
}

 /* switch on type (first long in rcvbuf) */
 	switch(rcvbuf.type) {
 	default:
 			close(fd);
 			errno = EPROTO;
 			return(-1);

	case OK_ACK:
 			return(fd);

	case ERROR_ACK:
 			if (ctlbuf.len < sizeof(struct error_ack)) {
 				close(fd);
 				errno = EPROTO;
 				return(-1);
 			}
 			error_ack = (struct error_ack *)&rcvbuf;
 			close(fd);
 			errno = error_ack->UNIX_error;
 			return(-1);
	}
}

getmsg is called to retrieve the acknowledgement of the bind request. The acknowledgement message consists of a control part that contains either an OK_ACK or an error_ack structure, and no data part.

The acknowledgement primitives are defined as high-priority messages. Messages are queued in a first-in, first-out (FIFO) manner within their priority at the stream head. The STREAMS mechanism allows only one high-priority message per stream at the stream head at one time. Any additional high-priority messages are discarded on reaching the stream head. (There can be only one high-priority message present on the stream head read queue at any time.) High-priority messages are particularly suitable for acknowledging service requests when the acknowledgement should be placed ahead of any other messages at the stream head.

Before calling getmsg, this routine must initialize the strbuf structure for the control part. buf should point to a buffer large enough to hold the expected control part, and maxlen must be set to indicate the maximum number of bytes this buffer can hold.

Because neither acknowledgement primitive contains a data part, the dataptr argument to getmsg is set to NULL. The flagsp argument points to an integer containing the value RS_HIPRI. This flag indicates that getmsg should wait for a STREAMS high-priority message before returning. This catches the acknowledgement primitives that are priority messages. Otherwise, if the flag is zero, the first message is taken. With RS_HIPRI set, even if a normal message is available, getmsg blocks until a high-priority message arrives.

On return from getmsg, check the len field to ensure that the control part of the retrieved message is an appropriate size. The example then checks the primitive type and takes appropriate actions. An OK_ACK indicates a successful bind operation, and inter_open returns the file descriptor of the open stream. An error_ack indicates a bind failure, and errno is set to identify the problem with the request.

Closing the Service Provider

The next routine in the service interface library example is inter_close, which closes the stream to the service provider.


inter_close(fd)
{
 	close(fd);
}

The routine closes the given file descriptor. This causes the protocol driver to free any resources associated with that stream. For example, the driver can unbind the protocol address that had previously been bound to that stream, thereby freeing that address for use by another service user.

Sending Data to the Service Provider

The third routine, inter_snd, passes data to the service provider for transmission to the user at the address specified in addr. The data to be transmitted is contained in the buffer pointed to by buf and contains len bytes. On successful completion, this routine returns the number of bytes of data passed to the service provider; on failure, it returns -1.


Example 3–3 inter_snd

inter_snd(int fd, char *buf, int len, long *addr)
{
 	struct strbuf ctlbuf;
 	struct strbuf databuf;
 	struct unitdata_req unitdata_req;

 	unitdata_req.PRIM_type = UNITDATA_REQ;
 	unitdata_req.DEST_addr = addr;

 	ctlbuf.len = sizeof(struct unitdata_req);
 	ctlbuf.buf = (char *)&unitdata_req;
 	databuf.len = len;
 	databuf.buf = buf;

 	if (putmsg(fd, &ctlbuf, &databuf, 0) < 0)
 		return(-1);
	 	return(len);
}

In this example, the data request primitive is packaged with both a control part and a data part. The control part contains a unitdata_req structure that identifies the primitive type and the destination address of the data. The data to be transmitted is placed in the data part of the request message.

Unlike the bind request, the data request primitive requires no acknowledgement from the service provider. In the example, this choice was made to minimize the overhead during data transfer. If the putmsg call succeeds, this routine returns the number of bytes passed to the service provider.

Receiving Data

The final routine in Example 3–4, inter_rcv, retrieves the next available data. buf points to a buffer where the data should be stored, len indicates the size of that buffer, and addr points to a long integer where the source address of the data is placed. On successful completion, inter_rcv returns the number of bytes of retrieved data; on failure, it returns -1 and an appropriate UNIX system error value.


Example 3–4 Receiving Data

int inter_rcv (int fd, char *buf, int len, long *addr, int *errorp)
{
 	struct strbuf ctlbuf;
 	struct strbuf databuf;
 	struct unitdata_ind unitdata_ind;
 	int retval;
 	int flagsp;

	ctlbuf.maxlen = sizeof(struct unitdata_ind);
	ctlbuf.len = 0;
	ctlbuf.buf = (char *)&unitdata_ind;
	databuf.maxlen = len;
	databuf.len = 0;
	databuf.buf = buf;
	flagsp = 0;

	if((retval=getmsg(fd,&ctlbuf,&databuf,&flagsp))<0) {
			*errorp = EIO;
			return(-1);
	}
	if (retval) {
			*errorp = EIO;
			return(-1)
	}
	if (unitdata_ind.PRIM_type != UNITDATA_IND) {
			*errorp = EPROTO;
			return(-1);
	}
	*addr = unitdata_ind.SRC_addr;
	return(databuf.len);
}

getmsg is called to retrieve the data indication primitive, where that primitive contains both a control and data part. The control part consists of a unitdata_ind structure that identifies the primitive type and the source address of the data sender. The data part contains the data itself. In ctlbuf, buf points to a buffer containing the control information, and maxlen indicates the maximum size of the buffer. Similar initialization is done for databuf.

The integer pointed to by flagsp in the getmsg call is set to zero, indicating that the next message should be retrieved from the stream head regardless of its priority. Data arrives in normal priority messages. If there is no message at the stream head, getmsg blocks until a message arrives.

The user's control and data buffers should be large enough to hold any incoming data. If both buffers are large enough, getmsg processes the data indication and returns 0, indicating that a full message was retrieved successfully. However, if neither buffer is large enough, getmsg only returns the part of the message that fits into each user buffer. The remainder of the message is saved for subsequent retrieval (in message non-discard mode), and a positive, nonzero value is returned to the user. A return value of MORECTL indicates that more control information is waiting for retrieval. A return value of MOREDATA indicates that more data is waiting for retrieval. A return value of (MORECTL | MOREDATA) indicates that data from both parts of the message remain. In the example, if the user buffers are not large enough (that is, getmsg returns a positive, nonzero value), the function sets errno to EIO and fails.

The type of the primitive returned by getmsg is checked to make sure it is a data indication (UNITDATA_IND in the example). The source address is then set and the number of bytes of data is returned.

The example presented is a simplified service interface. It shows typical uses of putmsg(2) and getmsg(2). The state transition rules for the interface are not presented and this example does not handle expedited data.