STREAMS Programming Guide

Module Overview

STREAMS modules process messages as they flow through the stream between an application and a character device driver. A STREAMS module is a pair of initialized queue structures and the specified kernel-level procedures that process data, status, and control information for the two queues. A stream can contain zero or more modules. Application processes push (stack) modules on a stream using the I_PUSH ioctl(2) and pop (unstack) them using the I_POP ioctl(2).

STREAMS Module Configuration

Like device drivers, STREAMS modules are dynamically linked and can be loaded into and unloaded from the running kernel.


Note –

The word module is used differently when talking about drivers. A device driver is a kernel-loadable module that provides the interface between a device and the Device Driver Interface, and is linked to the kernel when it is first invoked.


A loadable module must provide linkage information to the kernel in an initialized modlstrmod(9S) and three entry points: _init(9E), _info(9E), and _fini(9E).

STREAMS modules can be unloaded from the kernel when not pushed onto a stream. A STREAMS module can prevent itself from being unloaded by returning an error (selected from errno.h) from its _fini(9E) routine (EBUSY is a good choice).

Module Procedures

STREAMS module procedures (open, close, put, service) have already been described in the previous chapters. This section shows some examples and further describes attributes common to module put and service procedures.

A module's put procedure is called by the preceding module, driver, or stream head, and always before that queue's service procedure. The put procedure does any immediate processing (for example, high-priority messages), while the corresponding service procedure performs deferred processing.

The service procedure is used primarily for performing deferred processing, with a secondary task to implement flow control. Once the service procedure is enabled, it can start but not finish before running user-level code. The put and service procedures must not block because there is no thread synchronization being done.

Example 10–1 shows a STREAMS module read-side put procedure.


Example 10–1 Read-side put Procedure

static int
modrput (queue_t *q, mblk_t *mp)
{
		struct mod_prv *modptr;

		modptr = (struct mod_prv *) q->q_ptr;  /*state info*/

		if (mp->b_datap->db_type >= QPCTL){	/*proc pri msg*/
			putnext(q, mp); 						/* and pass it on */
			return (0);
		}

		switch(mp->b_datap->db_type) {
			case M_DATA:				/* can process message data */
				putq(q, mp);	/* queue msg for service procedure */
			return (0);

			case M_PROTO:			/* handle protocol control message */
					.
					.
					.

			default:
					putnext(q, mp);		
					return (0);
		}
}

The preceding code does the following:

Example 10–2 shows a module write-side put procedure.


Example 10–2 Write-side put Procedure

static int
modwput (queue_t *q, mblk_t *mp)
{
 	struct mod_prv *modptr;

 modptr = (struct mod_prv *) q->q_ptr;		/*state info*/

 if (mp->b_datap->db_type >= QPCTL){	/* proc pri msg and pass it on */
			putnext(q, mp);
			return (0);
		}

 switch(mp->b_datap->db_type) {
		case M_DATA:			/* can process message data queue msg */
			putq(q, mp);		/* for service procedure or pass message */
								/* along with putnext(q,mp) */
			return (0);

		case M_PROTO:
				.
				.
				.

		case M_IOCTL:		/* if cmd in msg is recognized */
							/* process message and send reply back */
							/* else pass message downstream */

		default:
			putnext(q, mp);
			return (0);
	 }
}

The write-side put procedure, unlike the read side, can be passed M_IOCTL messages. The module must recognize and process the ioctl(2) command, or pass the message downstream if it does not recognize the command.

Example 10–3 shows a general scenario employed by the module's service procedure.


Example 10–3 STREAMS Module Service Procedure

static int
modrsrv (queue_t *q)
{
		mblk_t *mp;

		while ((mp = getq(q)) != NULL) {
			/* flow control check */
			if (!(mp->b_datap->db_type >= QPCTL) && !canputnext(q)) {	
				putbq(q, mp);						/* return message */
				return (0);
			}
			/* process the message */
				.
				.
				.
			putnext(q, mp); /* pass the result */
		}
		return (0);
}

The steps are:

  1. Retrieve the first message from the queue using getq(9F).

  2. If the message is high priority, process it immediately and pass it along the stream.

    Otherwise, the service procedure should use canputnext(9F) to determine if the next module or driver that enqueues messages is within acceptable flow-control limits. canputnext(9F) searches the stream for the next module, driver, or the stream head with a service procedure. When it finds one, it looks at the total message space currently allocated to the queue for messages. If the amount of space currently used at that queue reaches the high-water mark, canputnext(9F) returns false (zero). If the next queue with a service procedure is within acceptable flow-control limits, canputnext(9F) returns true (nonzero).

  3. If canputnext(9F) returns false, the service procedure returns the message to its own queue with putbq(9F). The service procedure can do no further processing at this time, and it returns to the caller.

    If canputnext(9F) returns true, the service procedure completes any processing of the message. This can involve retrieving more messages from the queue, allocating and deallocating header and trailer information, and performing control functions for the module.

  4. When the service procedure is finished processing the message, it calls putnext(9F) to pass the resulting message to the next queue.

These steps are repeated until getq(9F) returns NULL (the queue is empty) or canputnext(9F) returns false.

Filter Module Example

The module shown next, crmod in Example 10–4, is an asymmetric filter. On the write side, a newline is changed to a carriage return followed by a newline. On the read side, no conversion is done.


Example 10–4 Filter Module

/* Simple filter
 * converts newline -> carriage return, newline
 */
#include <sys/types.h>
#include <sys/param.h>
#include <sys/stream.h>
#include <sys/stropts.h>
#include <sys/ddi.h>
#include <sys/sunddi.h>

static struct module_info minfo =
	{ 0x09, "crmod", 0, INFPSZ, 512, 128 };

static int modopen (queue_t*, dev_t*, int, int, cred_t*);
static int modrput (queue_t*, mblk_t*);
static int modwput (queue_t*, mblk_t*);
static int modwsrv (queue_t*);
static int modclose (queue_t*, int, cred_t*);

static struct qinit rinit = {
	modrput, NULL, modopen, modclose, NULL, &minfo, NULL};

static struct qinit winit = {
	modwput, modwsrv, NULL, NULL, NULL, &minfo, NULL};

struct streamtab crmdinfo={ &rinit, &winit, NULL, NULL};

stropts.h includes definitions of flush message options common to user applications. modrput is like modput from the null module.

In contrast to the null module example, a single module_info structure is shared by the read side and write side. The module_info includes the flow control high-water and low-water marks (512 and 128) for the write queue. (Though the same module_info is used on the read queue side, the read side has no service procedure so flow control is not used.) The qinit contains the service procedure pointer.

The write-side put procedure, the beginning of the service procedure, and an example of flushing a queue are shown in Example 10–5.


Example 10–5 Flushing a Queue

static int
modwput(queue_t *q, mblk_t *mp)
{
	if (mp->b_datap->db_type >= QPCTL && mp->b_datap->db_type != M_FLUSH)
			putnext(q, mp);
	else
			putq(q, mp);				 /* Put it on the queue */
	return (0);
}
static int 
modwsrv(queue_t *q)
{
	mblk_t *mp;

	while ((mp = getq(q)) != NULL) {
			switch (mp->b_datap->db_type) {
				default:
					if (canputnext(q)) {
							putnext(q, mp);
							break;
			 		} else {
							putbq(q, mp);
							return (0);
					 }

				case M_FLUSH:
				    if (*mp->b_rptr & FLUSHW)
						    flushq(q, FLUSHDATA);
				    putnext(q, mp);
				    break;

				case M_DATA: {
					mblk_t *nbp = NULL;
					mblk_t *next;
					if (!canputnext(q)) {
						putbq(q, mp);
						return (0);
					}
			/* Filter data, appending to queue */
			for (; mp != NULL; mp = next) {
					while (mp->b_rptr < mp->b_wptr) {
							if (*mp->b_rptr == '\n')
								if (!bappend(&nbp, '\r'))
										goto push;
							if (!bappend(&nbp, *mp->b_rptr))
								goto push;
							mp->b_rptr++;
							continue;
					push:
							if (nbp)
								putnext(q, nbp);
							nbp = NULL;
							if (!canputnext(q)) {
								if (mp->b_rptr>=mp->b_wptr){
										next = mp->b_cont;
										freeb(mp);
										mp=next;
								}
								if (mp)
										putbq(q, mp);
								return (0);
							}
					} /* while */
					next = mp->b_cont;
					freeb(mp);
					if (nbp)
						putnext(q, nbp);
				} /* for */
			}
		} /* switch */
	}
	return (0);
}					

modwsrv() is the write service procedure. It takes a single argument, which is a pointer to the write queue. modwsrv() switches on the message type, M_FLUSH or M_DATA. modwsrv() processes only one high-priority message, M_FLUSH. No other high-priority messages should reach modwsrv. High-priority messages other than type M_FLUSH use putnext(9F) to avoid scheduling. The others are queued for the service procedure. An M_FLUSH message is a request to remove messages on one or both queues. It can be processed in the put or service procedure.

For an M_FLUSH message, modwsrv() checks the first data byte. If FLUSHW is set, the write queue is flushed by flushq(9F), which takes two arguments, the queue pointer and a flag. The flag indicates what should be flushed, data messages (FLUSHDATA) or everything (FLUSHALL). Data includes M_DATA, M_DELAY, M_PROTO, and M_PCPROTO messages. The choice of what types of messages to flush is specific to each module.

Ordinary messages are returned to the queue if canputnext(9F) returns false, indicating the downstream path is blocked.

The differences in M_DATA processing between this and the example in Message Allocation and Freeing relate to the manner in which the new messages are forwarded and flow controlled. For the purpose of demonstrating alternative means of processing messages, this version creates individual new messages rather than a single message containing multiple message blocks. When a new message block is full, it is immediately forwarded with putnext(9F) rather than being linked into a single large message. This alternative is not desirable because message boundaries are altered, and because of the additional overhead of handling and scheduling multiple messages.

When the filter processing is performed (following push), flow control is checked (with canputnext(9F)) after each new message is forwarded. This is necessary because there is no provision to hold the new message until the queue becomes unblocked. If the downstream path is blocked, the remaining part of the original message is returned to the queue. Otherwise, processing continues.