STREAMS Programming Guide

Filter Module Example

The module shown next, crmod in Example 10–4, is an asymmetric filter. On the write side, a newline is changed to a carriage return followed by a newline. On the read side, no conversion is done.


Example 10–4 Filter Module

/* Simple filter
 * converts newline -> carriage return, newline
 */
#include <sys/types.h>
#include <sys/param.h>
#include <sys/stream.h>
#include <sys/stropts.h>
#include <sys/ddi.h>
#include <sys/sunddi.h>

static struct module_info minfo =
	{ 0x09, "crmod", 0, INFPSZ, 512, 128 };

static int modopen (queue_t*, dev_t*, int, int, cred_t*);
static int modrput (queue_t*, mblk_t*);
static int modwput (queue_t*, mblk_t*);
static int modwsrv (queue_t*);
static int modclose (queue_t*, int, cred_t*);

static struct qinit rinit = {
	modrput, NULL, modopen, modclose, NULL, &minfo, NULL};

static struct qinit winit = {
	modwput, modwsrv, NULL, NULL, NULL, &minfo, NULL};

struct streamtab crmdinfo={ &rinit, &winit, NULL, NULL};

stropts.h includes definitions of flush message options common to user applications. modrput is like modput from the null module.

In contrast to the null module example, a single module_info structure is shared by the read side and write side. The module_info includes the flow control high-water and low-water marks (512 and 128) for the write queue. (Though the same module_info is used on the read queue side, the read side has no service procedure so flow control is not used.) The qinit contains the service procedure pointer.

The write-side put procedure, the beginning of the service procedure, and an example of flushing a queue are shown in Example 10–5.


Example 10–5 Flushing a Queue

static int
modwput(queue_t *q, mblk_t *mp)
{
	if (mp->b_datap->db_type >= QPCTL && mp->b_datap->db_type != M_FLUSH)
			putnext(q, mp);
	else
			putq(q, mp);				 /* Put it on the queue */
	return (0);
}
static int 
modwsrv(queue_t *q)
{
	mblk_t *mp;

	while ((mp = getq(q)) != NULL) {
			switch (mp->b_datap->db_type) {
				default:
					if (canputnext(q)) {
							putnext(q, mp);
							break;
			 		} else {
							putbq(q, mp);
							return (0);
					 }

				case M_FLUSH:
				    if (*mp->b_rptr & FLUSHW)
						    flushq(q, FLUSHDATA);
				    putnext(q, mp);
				    break;

				case M_DATA: {
					mblk_t *nbp = NULL;
					mblk_t *next;
					if (!canputnext(q)) {
						putbq(q, mp);
						return (0);
					}
			/* Filter data, appending to queue */
			for (; mp != NULL; mp = next) {
					while (mp->b_rptr < mp->b_wptr) {
							if (*mp->b_rptr == '\n')
								if (!bappend(&nbp, '\r'))
										goto push;
							if (!bappend(&nbp, *mp->b_rptr))
								goto push;
							mp->b_rptr++;
							continue;
					push:
							if (nbp)
								putnext(q, nbp);
							nbp = NULL;
							if (!canputnext(q)) {
								if (mp->b_rptr>=mp->b_wptr){
										next = mp->b_cont;
										freeb(mp);
										mp=next;
								}
								if (mp)
										putbq(q, mp);
								return (0);
							}
					} /* while */
					next = mp->b_cont;
					freeb(mp);
					if (nbp)
						putnext(q, nbp);
				} /* for */
			}
		} /* switch */
	}
	return (0);
}					

modwsrv() is the write service procedure. It takes a single argument, which is a pointer to the write queue. modwsrv() switches on the message type, M_FLUSH or M_DATA. modwsrv() processes only one high-priority message, M_FLUSH. No other high-priority messages should reach modwsrv. High-priority messages other than type M_FLUSH use putnext(9F) to avoid scheduling. The others are queued for the service procedure. An M_FLUSH message is a request to remove messages on one or both queues. It can be processed in the put or service procedure.

For an M_FLUSH message, modwsrv() checks the first data byte. If FLUSHW is set, the write queue is flushed by flushq(9F), which takes two arguments, the queue pointer and a flag. The flag indicates what should be flushed, data messages (FLUSHDATA) or everything (FLUSHALL). Data includes M_DATA, M_DELAY, M_PROTO, and M_PCPROTO messages. The choice of what types of messages to flush is specific to each module.

Ordinary messages are returned to the queue if canputnext(9F) returns false, indicating the downstream path is blocked.

The differences in M_DATA processing between this and the example in Message Allocation and Freeing relate to the manner in which the new messages are forwarded and flow controlled. For the purpose of demonstrating alternative means of processing messages, this version creates individual new messages rather than a single message containing multiple message blocks. When a new message block is full, it is immediately forwarded with putnext(9F) rather than being linked into a single large message. This alternative is not desirable because message boundaries are altered, and because of the additional overhead of handling and scheduling multiple messages.

When the filter processing is performed (following push), flow control is checked (with canputnext(9F)) after each new message is forwarded. This is necessary because there is no provision to hold the new message until the queue becomes unblocked. If the downstream path is blocked, the remaining part of the original message is returned to the queue. Otherwise, processing continues.