STREAMS Programming Guide

Module Overview

STREAMS modules process messages as they flow through the stream between an application and a character device driver. A STREAMS module is a pair of initialized queue structures and the specified kernel-level procedures that process data, status, and control information for the two queues. A stream can contain zero or more modules. Application processes push (stack) modules on a stream using the I_PUSH ioctl(2) and pop (unstack) them using the I_POP ioctl(2).

STREAMS Module Configuration

Like device drivers, STREAMS modules are dynamically linked and can be loaded into and unloaded from the running kernel.


Note -

The word module is used differently when talking about drivers. A device driver is a kernel-loadable module that provides the interface between a device and the Device Driver Interface, and is linked to the kernel when it is first invoked.


A loadable module must provide linkage information to the kernel in an initialized modlstrmod(9S) and three entry points: _init(9E), _info(9E), and _fini(9E).

STREAMS modules can be unloaded from the kernel when not pushed onto a stream. A STREAMS module can prevent being unloaded by returning an error (selected from errno.h) from its _fini(9E) routine (EBUSY is a good choice).

Module Procedures

STREAMS module procedures (open, close, put, service) have already been described in the previous chapters. This section shows some examples and further describes attributes common to module put and service procedures.

A module's put procedure is called by the preceding module, driver, or stream head, and always before that queue's service procedure. The put procedure does any immediate processing (for example, high-priority messages), while the corresponding service procedure performs deferred processing.

The service procedure is used primarily for performing deferred processing, with a secondary task to implement flow control. Once the service procedure is enabled, it can start but not complete before running user-level code. The put and service procedures must not block because there is no thread synchronization being done.

Example 10-1 shows a STREAMS module read-side put procedure.


Example 10-1 Read- side put Procedure

static int
modrput (queue_t *q, mblk_t *mp)
{
		struct mod_prv *modptr;

		modptr = (struct mod_prv *) q->q_ptr;  /*state info*/

		if (mp->b_datap->db_type >= QPCTL){ /*proc pri msg*/
			putnext(q, mp); 						/* and pass it on */
			return (0);
		}

		switch(mp->b_datap->db_type) {
			case M_DATA:							/* can process message data */
					putq(q, mp); /* queue msg for service procedure */
			r		eturn (0);

			case M_PROTO:						/* handle protocol control message */
					.
					.
					.

			default:
					putnext(q, mp);		
					return (0);
		}
}

The preceding code does the following:

Example 10-2 shows a module write-side put procedure.


Example 10-2 Write-side put Procedure

static int
modwput (queue_t *q, mblk_t *mp)
{
 	struct mod_prv *modptr;

 	modptr = (struct mod_prv *) q->q_ptr;		/*state info*/

 	if (mp->b_datap->db_type >= QPCTL){	/* proc pri msg and pass it on */
			putnext(q, mp);
			return (0);
		}

		switch(mp->b_datap->db_type) {
			case M_DATA:					/* can process message data */
					putq(q, mp);			/* queue msg for service procedure or */
												/* pass message along with putnext(q,mp) */
					return (0);

			case M_PROTO:
c				.
					.
					.

			case M_IOCTL:					/* if cmd in msg is recognized */
												/* process message and send reply back */
												/* else pass message downstream */

			default:
					putnext(q, mp);
					return (0);
		}
}

The write-side put procedure, unlike the read side, can be passed M_IOCTL messages. It is up to the module to recognize and process the ioctl(2) command, or pass the message downstream if it does not recognize the command.

Example 10-3 shows a general scenario employed by the module's service procedure.


Example 10-3 Service Procedure

static int
modrsrv (queue_t *q)
{
		mblk_t *mp;

		while ((mp = getq(q)) != NULL) {
			if (!(mp->b_datap->db_type >= QPCTL) && !canputnext(q)) {	
																	/* flow control check */
				putbq(q, mp);									/* return message */
				return (0);
			}
			/* process the message */
				.
				.
				.
			putnext(q, mp); /* pass the result */
		}
		return (0);
}

The steps are:

These steps are repeated until getq(9F) returns NULL (the queue is empty) or canputnext(9F) returns false.

Filter Module Example

The module shown next, crmod in Example 10-4, is an asymmetric filter. On the write side, a newline is changed to a carriage return followed by a newline. On the read side, no conversion is done.


Example 10-4 crmod

/* Simple filter
 * converts newline -> carriage return, newline
 */
#include <sys/types.h>
#include <sys/param.h>
#include <sys/stream.h>
#include <sys/stropts.h>
#include <sys/ddi.h>
#include <sys/sunddi.h>

static struct module_info minfo =
	{ 0x09, "crmod", 0, INFPSZ, 512, 128 };

static int modopen (queue_t*, dev_t*, int, int, cred_t*);
static int modrput (queue_t*, mblk_t*);
static int modwput (queue_t*, mblk_t*);
static int modwsrv (queue_t*);
static int modclose (queue_t*, int, cred_t*);

static struct qinit rinit = {
	modrput, NULL, modopen, modclose, NULL, &minfo, NULL};

static struct qinit winit = {
	modwput, modwsrv, NULL, NULL, NULL, &minfo, NULL};

struct streamtab crmdinfo={ &rinit, &winit, NULL, NULL};

stropts.h includes definitions of flush message options common to user applications. modrput is like modput from the null module.

Note that, in contrast to the null module example, a single module_info structure is shared by the read side and write side. The module_info includes the flow control high-water and low-water marks (512 and 128) for the write queue. (Though the same module_info is used on the read queue side, the read side has no service procedure so flow control is not used.) The qinit contains the service procedure pointer.

The write-side put procedure, the beginning of the service procedure, and an example of flushing a queue are shown in Example 10-5.


Example 10-5

static int
modwput(queue_t *q, mblk_t *mp)
{
	if (mp->b_datap->db_type >= QPCTL && mp->b_datap->db_type != M_FLUSH)
			putnext(q, mp);
	else
			putq(q, mp);				 /* Put it on the queue */
	return (0);
}
static int 
modwsrv(queue_t *q)
{
	mblk_t *mp;

	while ((mp = getq(q)) != NULL) {
			switch (mp->b_datap->db_type) {
				default:
					if (canputnext(q)) {
							putnext(q, mp);
							break;
			 		} else {
							putbq(q, mp);
							return (0);
					 }

			    case M_FLUSH:
				    if (*mp->b_rptr & FLUSHW)
						    flushq(q, FLUSHDATA);
				    putnext(q, mp);
				    break;

modwput, the write put procedure, switches on the message type. High-priority messages other than type M_FLUSH use putnext(9F) to avoid scheduling. The others are queued for the service procedure. An M_FLUSH message is a request to remove messages on one or both queues. It can be processed in the put or service procedure.

modwsrv is the write service procedure. It takes a single argument, a pointer to the write queue. modwsrv processes only one high-priority message, M_FLUSH. No other high priority-messages should reach modwsrv.

For an M_FLUSH message, modwsrv checks the first data byte. If FLUSHW is set, the write queue is flushed by flushq(9F), which takes two arguments, the queue pointer and a flag. The flag indicates what should be flushed, data messages (FLUSHDATA) or everything (FLUSHALL). Data includes M_DATA, M_DELAY, M_PROTO, and M_PCPROTO messages. The choice of what types of messages to flush is module specific.

Ordinary messages are returned to the queue if canputnext(9F) returns false, indicating the downstream path is blocked. The example continues with the remainder of modwsrv processing M_DATA messages:

		case M_DATA: {
			mblk_t *nbp = NULL;
			mblk_t *next;
			if (!canputnext(q)) {
					putbq(q, mp);
					return (0);
			}
			/* Filter data, appending to queue */
			for (; mp != NULL; mp = next) {
					while (mp->b_rptr < mp->b_wptr) {
							if (*mp->b_rptr == '\n')
								if (!bappend(&nbp, '\r'))
										goto push;
							if (!bappend(&nbp, *mp->b_rptr))
								goto push;
							mp->b_rptr++;
							continue;
					push:
							if (nbp)
								putnext(q, nbp);
							nbp = NULL;
							if (!canputnext(q)) {
								if (mp->b_rptr>=mp->b_wptr){
										next = mp->b_cont;
										freeb(mp);
										mp=next;
								}
								if (mp)
										putbq(q, mp);
								return (0);
							}
					} /* while */
					next = mp->b_cont;
					freeb(mp);
					if (nbp)
						putnext(q, nbp);
				}
			}
		}
	}
	return (0);
}

The differences in M_DATA processing between this and the example in "Message Allocation and Freeing" relate to the manner in which the new messages are forwarded and flow controlled. For the purpose of demonstrating alternative means of processing messages, this version creates individual new messages rather than a single message containing multiple message blocks. When a new message block is full, it is immediately forwarded with putnext(9F) rather than being linked into a single large message. This alternative is not desirable because message boundaries are altered, and because of the additional overhead of handling and scheduling multiple messages.

When the filter processing is performed (following push), flow control is checked (with canputnext(9F)) after each new message is forwarded. This is done because there is no provision to hold the new message until the queue becomes unblocked. If the downstream path is blocked, the remaining part of the original message is returned to the queue. Otherwise, processing continues.