Go to main content

STREAMS Programming Guide

Exit Print View

Updated: March 2019
 
 

Multiplexing Driver Example

This section contains an example of a multiplexing driver that implements an N-to-1 configuration. This configuration might be used for terminal windows, where each transmission to or from the terminal identifies the window. This resembles a typical device driver, with two differences: the device-handling functions are performed by a separate driver, connected as a lower stream, and the device information (that is, relevant user process) is contained in the input data rather than in an interrupt call.

Each upper stream is created by open(2). A single lower stream is opened and then it is linked by use of the multiplexing facility. This lower stream might connect to the TTY driver. The implementation of this example is a foundation for an M-to-N multiplexer.

As in the loop-around driver (STREAMS Drivers), flow control requires the use of standard and special code because physical connectivity among the streams is broken at the driver. Different approaches are used for flow control on the lower stream, for messages coming upstream from the device driver, and on the upper streams, for messages coming downstream from the user processes.


Note - The code presented here for the multiplexing driver represents a single-threaded, uniprocessor implementation. See Multithreaded STREAMS for details on multiprocessor and multithreading issues such as locking for data corruption and to prevent race conditions.

Example 63, Multiplexer Declarations is of multiplexer declarations:

Example 63  Multiplexer Declarations
#include <sys/types.h>
#include <sys/param.h>
#include <sys/stream.h>
#include <sys/stropts.h>
#include <sys/errno.h>
#include <sys/cred.h>
#include <sys/ddi.h>
#include <sys/sunddi.h>

static int muxopen (queue_t*, dev_t*, int, int, cred_t*);
static int muxclose (queue_t*, int, cred_t*);
static int muxuwput (queue_t*, mblk_t*);
static int muxlwsrv (queue_t*);
static int muxlrput (queue_t*, mblk_t*);
static int muxuwsrv (queue_t*);

static struct module_info info = {
	0xaabb, "mux", 0, INFPSZ, 512, 128 };

static struct qinit urinit = {							/* upper read */
	NULL, NULL, muxopen, muxclose, NULL, &info, NULL };

static struct qinit uwinit = {							/* upper write */
	muxuwput, muxuwsrv, NULL, NULL, NULL, &info, NULL };

static struct qinit lrinit = { /* lower read */
	muxlrput, NULL, NULL, NULL, NULL, &info, NULL };

static struct qinit lwinit = { /* lower write */
	NULL, muxlwsrv, NULL, NULL, NULL, &info, NULL };

struct streamtab muxinfo = {
	&urinit, &uwinit, &lrinit, &lwinit };

struct mux {
	queue_t *qptr;					/* back pointer to read queue */
	int		bufcid;					/* bufcall return value */
};
extern struct mux mux_mux[];
extern int mux_cnt;     /* max number of muxes */

static queue_t *muxbot; 			/* linked lower queue */
static int muxerr;					/* set if error of hangup on
lower strm */

The four streamtab entries correspond to the upper read, upper write, lower read, and lower write qinit structures. The multiplexing qinit structures replace those in each lower stream head (in this case there is only one) after the I_LINK has concluded successfully. In a multiplexing configuration, the processing performed by the multiplexing driver can be partitioned between the upper and lower queues. There must be an upper-stream write put procedure and lower-stream read put procedure. If the queue procedures of the opposite upper/lower queue are not needed, the queue can be skipped, and the message put to the following queue.

In the example, the upper read-side procedures are not used. The lower-stream read queue put procedure transfers the message directly to the read queue upstream from the multiplexer. There is no lower write put procedure because the upper write put procedure directly feeds the lower write queue downstream from the multiplexer.

The driver uses a private data structure, mux. mux_mux[dev] points back to the opened upper read queue. This is used to route messages coming upstream from the driver to the appropriate upper queue. It is also used to find a free major or minor device for a CLONEOPEN driver open case.

Example 64, Upper Queue Open, the upper queue open, contains the canonical driver open code.

Example 64  Upper Queue Open
static int
muxopen(queue_t *q, dev_t *devp, int flag,
				 int sflag, cred_t *credp)
{
	struct mux *mux;
	minor_t device;

	if (q->q_ptr)
			return(EBUSY);

	if (sflag == CLONEOPEN) {
			for (device = 0; device < mux_cnt; device++)
				if (mux_mux[device].qptr == 0)
						break;

			*devp=makedevice(getmajor(*devp), device);
	}
	else {
			device = getminor(*devp);
			if (device >= mux_cnt)
				return ENXIO;
	}

	mux = &mux_mux[device];
	mux->qptr = q;
	q->q_ptr = (char *) mux;
	WR(q)->q_ptr = (char *) mux;
	qprocson(q);
	return (0);
}

muxopen checks for a clone or ordinary open call. It initializes q_ptr to point at the mux_mux[] structure.

The core multiplexer processing is as follows: downstream data written to an upper stream is queued on the corresponding upper write message queue if the lower stream is flow controlled. This allows flow control to propagate toward the stream head for each upper stream. A lower write service procedure, rather than a write put procedure, is used so that flow control, coming up from the driver below, may be handled.

On the lower read side, data coming up the lower stream are passed to the lower read put procedure. The procedure routes the data to an upper stream based on the first byte of the message. This byte holds the minor device number of an upper stream. The put procedure handles flow control by testing the upper stream at the first upper read queue beyond the driver.

Upper Write put Procedure Sample

muxuwput, the upper-queue write put procedure, traps ioctl calls, in particular I_LINK and I_UNLINK:

Example 65  bufcall Callback Routine
static int
/*
* This is our callback routine used by bufcall() to inform us
* when buffers become available
*/
static void mux_qenable(long ql)
{
		queue_t *q = (queue_t *ql);
		struct mux *mux;

		mux = (struct mux *)(q->q_ptr);
		mux->bufcid = 0;
		qenable(q);
}
muxuwput(queue_t *q, mblk_t *mp)
{
	struct mux *mux;

	mux = (struct mux *)q->q_ptr;
	switch (mp->b_datap->db_type) {
	case M_IOCTL: {
			struct iocblk *iocp;
			struct linkblk *linkp;
			/*
			 * ioctl. Only channel 0 can do ioctls. Two
			 * calls are recognized: LINK, and UNLINK
			 */
			if (mux != mux_mux)
				goto iocnak;

			iocp = (struct iocblk *) mp->b_rptr;
			switch (iocp->ioc_cmd) {
			case I_LINK:
				/*
				 *Link. The data contains a linkblk structure
				 *Remember the bottom queue in muxbot.
				 */
				if (muxbot != NULL)
						goto iocnak;

				linkp=(struct linkblk *) mp->b_cont->b_rptr;
				muxbot = linkp->l_qbot;
				muxerr = 0;

				mp->b_datap->db_type = M_IOCACK;
				iocp->ioc_count = 0;
				qreply(q, mp);
				break;
			case I_UNLINK:
				/*
				 * Unlink. The data contains a linkblk struct.
				 * Should not fail an unlink. Null out muxbot.
				 */
				linkp=(struct linkblk *) mp->b_cont->b_rptr;
				muxbot = NULL;
				mp->b_datap->db_type = M_IOCACK;
				iocp->ioc_count = 0;
				qreply(q, mp);
				break;
			default:
			iocnak:
				/* fail ioctl */
				mp->b_datap->db_type = M_IOCNAK;
				qreply(q, mp);
			}
			break;
	}
	case M_FLUSH:
			if (*mp->b_rptr & FLUSHW)
				flushq(q, FLUSHDATA);
			if (*mp->b_rptr & FLUSHR) {
				*mp->b_rptr &= ~FLUSHW;
				qreply(q, mp);
			} else
				freemsg(mp);
			break;





	case M_DATA:{
			 */
			 * Data. If we have no lower queue --> fail
			 * Otherwise, queue the data and invoke the lower
			 * service procedure.
		mblk_t *bp;
			if (muxerr || muxbot == NULL)
				goto bad;
		if ((bp = allocb(1, BPRI_MED)) == NULL) {
				putbq(q, mp);
				mux->bufcid = bufcall(1, BPRI_MED,
					mux_qenable, (long)q);
				break;
			}
			*bp->b_wptr++ = (struct mux *)q->ptr - mux_mux;
			bp->b_cont = mp;
			putq(q, bp);
			break;
}
	default:
	bad:
			/*
			 * Send an error message upstream.
			 */
			mp->b_datap->db_type = M_ERROR;
			mp->b_rptr = mp->b_wptr = mp->b_datap->db_base;
			*mp->b_wptr++ = EINVAL;
			qreply(q, mp);
 	}
}

First, there is a check to enforce that the stream associated with minor device 0 will be the single, controlling stream. The ioctls are only accepted on this stream. As described previously, a controlling stream is the one that issues the I_LINK. There should be only a single control stream. I_LINK and I_UNLINK include a linkblk structure containing the following fields:

l_qtop is the upper write queue from which the ioctl(2) comes. It always equals q for an I_LINK, and NULL for I_PLINK.

l_qbot is the new lower write queue. It is the former stream head write queue and is where the multiplexer gets and puts its data.

l_index is a unique (system-wide) identifier for the link. It can be used for routing or during selective unlinks. Since the example only supports a single link, l_index is not used.

For I_LINK, l_qbot is saved in muxbot and a positive acknowledgement is generated. From this point on, until an I_UNLINK occurs, data from upper queues will be routed through muxbot. Note that when an I_LINK, is received, the lower stream has already been connected. This enables the driver to send messages downstream to perform any initialization functions. Returning an M_IOCNAK message (negative acknowledgement) in response to an I_LINK causes the lower stream to be disconnected.

The I_UNLINK handling code nulls out muxbot and generates a positive acknowledgement. A negative acknowledgement should not be returned to an I_UNLINK. The stream head ensures that the lower stream is connected to a multiplexer before sending an I_UNLINK M_IOCTL.

Drivers can handle the persistent link requests I_PLINK and I_PUNLINK ioctl(2) in the same manner, except that l_qtop in the linkblk structure passed to the put routine is NULL instead of identifying the controlling stream.

muxuwput handles M_FLUSH messages as a normal driver does, except that there are no messages queued on the upper read queue, so there is no need to call flushq if FLUSHR is set.

M_DATA messages are not placed on the lower write message queue. They are queued on the upper write message queue. When flow control subsides on the lower stream, the lower service procedure, muxlwsrv, is scheduled to start output. This is similar to starting output on a device driver.

Upper Write service Procedure Sample

The following example shows the code for the upper multiplexer write service procedure:

Example 66  Upper Multiplexer Write Service Procedure
static int muxuwsrv(queue_t *q)
{
	mblk_t *mp;
	struct mux *muxp;
	muxp = (struct mux *)q->q_ptr;

	if (!muxbot) {
			flushq(q, FLUSHALL);
			return (0);
	}
	if (muxerr) {
			flushq(q, FLUSHALL);
			return (0);
	}
	while (mp = getq(q)) {
			if (canputnext(muxbot))
				putnext(muxbot, mp);
			else {
				putbq(q, mp);
				return(0);
			}
	}
	return (0);
}

As long as there is a stream still linked under the multiplexer and there are no errors, the service procedure will take a message off the queue and send it downstream, if flow control allows.

Lower Write service Procedure

muxlwsrv, the lower (linked) queue write service procedure is scheduled as a result of flow control subsiding downstream (it is back-enabled).

static int muxlwsrv(queue_t *q)
{
	int i;

	for (i = 0; i < mux_cnt; i++)
			if (mux_mux[i].qptr && mux_mux[i].qptr->q_first)
				qenable(mux_mux[i].qptr);
	return (0);
}

muxlwsrv steps through all possible upper queues. If a queue is active and there are messages on the queue, then its upper write service procedure is enabled through qenable.

Lower Read put Procedure

The lower (linked) queue read put procedure is shown in the following example:

Example 67  Lower Read put Procedure
static int
muxlrput(queue_t *q, mblk_t *mp)
{
	queue_t *uq;
	int device;

	if(muxerr) {
			freemsg(mp);
			return (0);
	}

	switch(mp->b_datap->db_type) {
	case M_FLUSH:
			/*
			 * Flush queues. NOTE: sense of tests is reversed
			 * since we are acting like a "stream head"
			 */
			if (*mp->b_rptr & FLUSHW) {
				*mp->b_rptr &= ~FLUSHR;
				qreply(q, mp);
			} else
				freemsg(mp);
			break;
	case M_ERROR:
	case M_HANGUP:
			muxerr = 1;
			freemsg(mp);
			break;
	case M_DATA:
			/*
			 * Route message. First byte indicates
			 * device to send to. No flow control.
			 *
			 * Extract and delete device number. If the
			 * leading block is now empty and more blocks
			 * follow, strip the leading block.
			 */
			device = *mp->b_rptr++;

			/* Sanity check. Device must be in range */
			if (device < 0 || device >= mux_cnt) {
				freemsg(mp);
				break;
			}
			/*
			 * If upper stream is open and not backed up,
			 * send the message there, otherwise discard it.
			 */
			uq = mux_mux[device].qptr;
			if (uq != NULL && canputnext(uq))
				putnext(uq, mp);
			else
				freemsg(mp);
			break;
	default:
			freemsg(mp);
	}
	return (0);
}

muxlrput receives messages from the linked stream. In this case, it is acting as a stream head and handles M_FLUSH messages. The code is the reverse of a driver, handling M_FLUSH messages from upstream. There is no need to flush the read queue because no data is ever placed in it.

muxlrput also handles M_ERROR and M_HANGUP messages. If one is received, it locks up the upper streams by setting muxerr.

M_DATA messages are routed by checking the first data byte of the message. This byte contains the minor device of the upper stream. Several checks examine whether:

  • The device is in range

  • The upper stream is open

  • The upper stream is full

This multiplexer does not support flow control on the read side; it is merely a router. If the message passes all checks, it is put to the proper upper queue. Otherwise, the message is discarded.

The upper stream close routine clears the mux entry so this queue will no longer be found. Outstanding bufcalls are not cleared.

/*
* Upper queue close
*/
static int
muxclose(queue_t *q, int flag, cred_t *credp)
{
	struct mux *mux;

	mux = (struct mux *) q->q_ptr;
	qprocsoff(q);
	if (mux->bufcid != 0)
		unbufcall(mux->bufcid);
	mux->bufcid = 0;
	mux->ptr = NULL;
	q->q_ptr = NULL;
	WR(q)->q_ptr = NULL;
	return(0);
}