STREAMS Programming Guide

Chapter 10 STREAMS Modules

This chapter provides specific examples of how modules work, including code samples.

Module Overview

STREAMS modules process messages as they flow through the stream between an application and a character device driver. A STREAMS module is a pair of initialized queue structures and the specified kernel-level procedures that process data, status, and control information for the two queues. A stream can contain zero or more modules. Application processes push (stack) modules on a stream using the I_PUSH ioctl(2) and pop (unstack) them using the I_POP ioctl(2).

STREAMS Module Configuration

Like device drivers, STREAMS modules are dynamically linked and can be loaded into and unloaded from the running kernel.


Note –

The word module is used differently when talking about drivers. A device driver is a kernel-loadable module that provides the interface between a device and the Device Driver Interface, and is linked to the kernel when it is first invoked.


A loadable module must provide linkage information to the kernel in an initialized modlstrmod(9S) and three entry points: _init(9E), _info(9E), and _fini(9E).

STREAMS modules can be unloaded from the kernel when not pushed onto a stream. A STREAMS module can prevent itself from being unloaded by returning an error (selected from errno.h) from its _fini(9E) routine (EBUSY is a good choice).

Module Procedures

STREAMS module procedures (open, close, put, service) have already been described in the previous chapters. This section shows some examples and further describes attributes common to module put and service procedures.

A module's put procedure is called by the preceding module, driver, or stream head, and always before that queue's service procedure. The put procedure does any immediate processing (for example, high-priority messages), while the corresponding service procedure performs deferred processing.

The service procedure is used primarily for performing deferred processing, with a secondary task to implement flow control. Once the service procedure is enabled, it can start but not finish before running user-level code. The put and service procedures must not block because there is no thread synchronization being done.

Example 10–1 shows a STREAMS module read-side put procedure.


Example 10–1 Read-side put Procedure

static int
modrput (queue_t *q, mblk_t *mp)
{
		struct mod_prv *modptr;

		modptr = (struct mod_prv *) q->q_ptr;  /*state info*/

		if (mp->b_datap->db_type >= QPCTL){	/*proc pri msg*/
			putnext(q, mp); 						/* and pass it on */
			return (0);
		}

		switch(mp->b_datap->db_type) {
			case M_DATA:				/* can process message data */
				putq(q, mp);	/* queue msg for service procedure */
			return (0);

			case M_PROTO:			/* handle protocol control message */
					.
					.
					.

			default:
					putnext(q, mp);		
					return (0);
		}
}

The preceding code does the following:

Example 10–2 shows a module write-side put procedure.


Example 10–2 Write-side put Procedure

static int
modwput (queue_t *q, mblk_t *mp)
{
 	struct mod_prv *modptr;

 modptr = (struct mod_prv *) q->q_ptr;		/*state info*/

 if (mp->b_datap->db_type >= QPCTL){	/* proc pri msg and pass it on */
			putnext(q, mp);
			return (0);
		}

 switch(mp->b_datap->db_type) {
		case M_DATA:			/* can process message data queue msg */
			putq(q, mp);		/* for service procedure or pass message */
								/* along with putnext(q,mp) */
			return (0);

		case M_PROTO:
				.
				.
				.

		case M_IOCTL:		/* if cmd in msg is recognized */
							/* process message and send reply back */
							/* else pass message downstream */

		default:
			putnext(q, mp);
			return (0);
	 }
}

The write-side put procedure, unlike the read side, can be passed M_IOCTL messages. The module must recognize and process the ioctl(2) command, or pass the message downstream if it does not recognize the command.

Example 10–3 shows a general scenario employed by the module's service procedure.


Example 10–3 STREAMS Module Service Procedure

static int
modrsrv (queue_t *q)
{
		mblk_t *mp;

		while ((mp = getq(q)) != NULL) {
			/* flow control check */
			if (!(mp->b_datap->db_type >= QPCTL) && !canputnext(q)) {	
				putbq(q, mp);						/* return message */
				return (0);
			}
			/* process the message */
				.
				.
				.
			putnext(q, mp); /* pass the result */
		}
		return (0);
}

The steps are:

  1. Retrieve the first message from the queue using getq(9F).

  2. If the message is high priority, process it immediately and pass it along the stream.

    Otherwise, the service procedure should use canputnext(9F) to determine if the next module or driver that enqueues messages is within acceptable flow-control limits. canputnext(9F) searches the stream for the next module, driver, or the stream head with a service procedure. When it finds one, it looks at the total message space currently allocated to the queue for messages. If the amount of space currently used at that queue reaches the high-water mark, canputnext(9F) returns false (zero). If the next queue with a service procedure is within acceptable flow-control limits, canputnext(9F) returns true (nonzero).

  3. If canputnext(9F) returns false, the service procedure returns the message to its own queue with putbq(9F). The service procedure can do no further processing at this time, and it returns to the caller.

    If canputnext(9F) returns true, the service procedure completes any processing of the message. This can involve retrieving more messages from the queue, allocating and deallocating header and trailer information, and performing control functions for the module.

  4. When the service procedure is finished processing the message, it calls putnext(9F) to pass the resulting message to the next queue.

These steps are repeated until getq(9F) returns NULL (the queue is empty) or canputnext(9F) returns false.

Filter Module Example

The module shown next, crmod in Example 10–4, is an asymmetric filter. On the write side, a newline is changed to a carriage return followed by a newline. On the read side, no conversion is done.


Example 10–4 Filter Module

/* Simple filter
 * converts newline -> carriage return, newline
 */
#include <sys/types.h>
#include <sys/param.h>
#include <sys/stream.h>
#include <sys/stropts.h>
#include <sys/ddi.h>
#include <sys/sunddi.h>

static struct module_info minfo =
	{ 0x09, "crmod", 0, INFPSZ, 512, 128 };

static int modopen (queue_t*, dev_t*, int, int, cred_t*);
static int modrput (queue_t*, mblk_t*);
static int modwput (queue_t*, mblk_t*);
static int modwsrv (queue_t*);
static int modclose (queue_t*, int, cred_t*);

static struct qinit rinit = {
	modrput, NULL, modopen, modclose, NULL, &minfo, NULL};

static struct qinit winit = {
	modwput, modwsrv, NULL, NULL, NULL, &minfo, NULL};

struct streamtab crmdinfo={ &rinit, &winit, NULL, NULL};

stropts.h includes definitions of flush message options common to user applications. modrput is like modput from the null module.

In contrast to the null module example, a single module_info structure is shared by the read side and write side. The module_info includes the flow control high-water and low-water marks (512 and 128) for the write queue. (Though the same module_info is used on the read queue side, the read side has no service procedure so flow control is not used.) The qinit contains the service procedure pointer.

The write-side put procedure, the beginning of the service procedure, and an example of flushing a queue are shown in Example 10–5.


Example 10–5 Flushing a Queue

static int
modwput(queue_t *q, mblk_t *mp)
{
	if (mp->b_datap->db_type >= QPCTL && mp->b_datap->db_type != M_FLUSH)
			putnext(q, mp);
	else
			putq(q, mp);				 /* Put it on the queue */
	return (0);
}
static int 
modwsrv(queue_t *q)
{
	mblk_t *mp;

	while ((mp = getq(q)) != NULL) {
			switch (mp->b_datap->db_type) {
				default:
					if (canputnext(q)) {
							putnext(q, mp);
							break;
			 		} else {
							putbq(q, mp);
							return (0);
					 }

				case M_FLUSH:
				    if (*mp->b_rptr & FLUSHW)
						    flushq(q, FLUSHDATA);
				    putnext(q, mp);
				    break;

				case M_DATA: {
					mblk_t *nbp = NULL;
					mblk_t *next;
					if (!canputnext(q)) {
						putbq(q, mp);
						return (0);
					}
			/* Filter data, appending to queue */
			for (; mp != NULL; mp = next) {
					while (mp->b_rptr < mp->b_wptr) {
							if (*mp->b_rptr == '\n')
								if (!bappend(&nbp, '\r'))
										goto push;
							if (!bappend(&nbp, *mp->b_rptr))
								goto push;
							mp->b_rptr++;
							continue;
					push:
							if (nbp)
								putnext(q, nbp);
							nbp = NULL;
							if (!canputnext(q)) {
								if (mp->b_rptr>=mp->b_wptr){
										next = mp->b_cont;
										freeb(mp);
										mp=next;
								}
								if (mp)
										putbq(q, mp);
								return (0);
							}
					} /* while */
					next = mp->b_cont;
					freeb(mp);
					if (nbp)
						putnext(q, nbp);
				} /* for */
			}
		} /* switch */
	}
	return (0);
}					

modwsrv() is the write service procedure. It takes a single argument, which is a pointer to the write queue. modwsrv() switches on the message type, M_FLUSH or M_DATA. modwsrv() processes only one high-priority message, M_FLUSH. No other high-priority messages should reach modwsrv. High-priority messages other than type M_FLUSH use putnext(9F) to avoid scheduling. The others are queued for the service procedure. An M_FLUSH message is a request to remove messages on one or both queues. It can be processed in the put or service procedure.

For an M_FLUSH message, modwsrv() checks the first data byte. If FLUSHW is set, the write queue is flushed by flushq(9F), which takes two arguments, the queue pointer and a flag. The flag indicates what should be flushed, data messages (FLUSHDATA) or everything (FLUSHALL). Data includes M_DATA, M_DELAY, M_PROTO, and M_PCPROTO messages. The choice of what types of messages to flush is specific to each module.

Ordinary messages are returned to the queue if canputnext(9F) returns false, indicating the downstream path is blocked.

The differences in M_DATA processing between this and the example in Message Allocation and Freeing relate to the manner in which the new messages are forwarded and flow controlled. For the purpose of demonstrating alternative means of processing messages, this version creates individual new messages rather than a single message containing multiple message blocks. When a new message block is full, it is immediately forwarded with putnext(9F) rather than being linked into a single large message. This alternative is not desirable because message boundaries are altered, and because of the additional overhead of handling and scheduling multiple messages.

When the filter processing is performed (following push), flow control is checked (with canputnext(9F)) after each new message is forwarded. This is necessary because there is no provision to hold the new message until the queue becomes unblocked. If the downstream path is blocked, the remaining part of the original message is returned to the queue. Otherwise, processing continues.

Data Flow Control

To support the STREAMS flow control mechanism, modules that use service procedures must invoke canputnext(9F) before calling putnext(9F), and use appropriate values for the high-water and low-water marks. If your module has a service procedure, you manage the flow control. If you don't have a service procedure, then there is no need to do anything.

The queue hiwat and lowat values limit the amount of data that can be placed on a queue. These limits prevent depletion of buffers in the buffer pool. Flow control is advisory in nature and can be bypassed. It is managed by high-water and low-water marks and regulated by the utility routines getq(9F), putq(9F), putbq(9F), insq(9F), rmvq(9F), and canputnext(9F).

The following scenario takes place normally in flow control:

A driver sends data to a module using putnext(9F), and the module's put procedure queues data using putq(9F). Calling putq(9F) enables the service procedure and executes it at some indeterminate time in the future. When the service procedure runs, it retrieves the data by calling getq(9F).

If the module cannot process data at the rate at which the driver is sending the data, the following happens:

When the message is queued, putq(9F) increments the value of q_count by the size of the message and compares the result to the module's high-water limit (q_hiwat) value for the queue. If the count reaches q_hiwat, putq(9F) sets the internal FULL indicator for the queue. This causes messages from upstream in the case of a write-side queue or downstream in the case of a read-side queue to be halted (canputnext(9F) returns FALSE) until the queue count drops below q_lowat. getq(9F) decrements the queue count. If the resulting count is below q_lowat, getq(9F) back-enables and causes the service procedure to be called for any blocked queue. (Flow control does not prevent reaching q_hiwat on a queue. Flow control can exceed its maximum value before canputnext(9F) detects QFULL and flow is stopped.)

The next example show a line discipline module's flow control. Example 10–6 shows a read-side line discipline module and a write-side line discipline module. Note that the read side is the same as the write side but without the M_IOCTL processing.


Example 10–6 Read-side Line Discipline Module

/* read side line discipline module flow control */
static mblk_t *read_canon(mblk_t *);

static int
ld_read_srv(
	queue_t *q)							/* pointer to read queue */
{
	mblk_t *mp;							/* original message */
	mblk_t *bp;							/* canonicalized message */

	while ((mp = getq(q)) != NULL) {
			switch (mp->b_datap->db_type) { /* type of msg */
			case M_DATA:	 /* data message */
				if (canputnext(q)) {
						bp = read_canon(mp);
						putnext(q, bp);
				} else {
						putbq(q, mp); /* put messagebackinqueue */
						return (0);
				}
				break;

			default:
				if (mp->b_datap->db_type >= QPCTL)
						putnext(q, mp); 		/* high-priority message */
				else { /* ordinary message */
						if (canputnext(q))
								 putnext(q, mp);
						else {
								 putbq(q, mp);
								 return (0);
						}
				}
				break;
			}
	}
return (0);
}

/* write side line discipline module flow control */
static int
ld_write_srv(
	queue_t *q)								/* pointer to write queue */
{
	mblk_t *mp;								/* original message */
	mblk_t *bp;								/* canonicalized message */

	while ((mp = getq(q)) != NULL) {
			switch (mp->b_datap->db_type) { /* type of msg */
			case M_DATA:			 /* data message */
				if (canputnext(q)) {
						bp = write_canon(mp);
						putnext(q, bp);
				} else {
						putbq(q, mp);
						return (0);
				}
				break;

			case M_IOCTL:
				ld_ioctl(q, mp);
				break;

			default:
				if (mp->b_datap->db_type >= QPCTL)
						putnext(q, mp);		/* high priority message */
				else { 						/* ordinary message */
						if (canputnext(q))		
								putnext(q, mp);
						else {
								putbq(q, mp);
								return (0);
						}
				}
				break;
			}
	}
return (0);
}

Design Guidelines

Module developers should follow these guidelines:

htonl(3B) and ntohl(3B)

The htonl(3SOCKET) and ntohl(3SOCKET) conversion routines follow the XNS5 publications. The functions continue to convert 32-bit quantities between network byte order and host byte order.