Go to main content

STREAMS Programming Guide

Exit Print View

Updated: November 2020
 
 

Data Flow Control

To support the STREAMS flow control mechanism, modules that use service procedures must invoke canputnext() before calling putnext(), and use appropriate values for the high-water and low-water marks. If your module has a service procedure, you manage the flow control. If you do not have a service procedure, then there is no need to do anything.

The queue hiwat and lowat values limit the amount of data that can be placed on a queue. These limits prevent depletion of buffers in the buffer pool. Flow control is advisory in nature and can be bypassed. It is managed by high-water and low-water marks and regulated by the utility routines getq(9F), putq(9F), putbq(9F), insq(9F), rmvq(9F), and canputnext(9F) man pages.

The following scenario takes place in flow control:

A driver sends data to a module using putnext(), and the module's put procedure queues data using putq(). Calling putq() enables the service procedure and executes it later. When the service procedure runs, it retrieves the data by calling getq().

If the module cannot process data at the rate at which the driver is sending the data, the following procedure is run:

When the message is queued, putq() increments the value of q_count by the size of the message and compares the result to the high-water limit value of the module, that is, q_hiwat, for the queue. If the count reaches q_hiwat, putq() sets the internal FULL indicator for the queue. This causes messages from upstream in the case of a write-side queue or downstream in the case of a read-side queue to be halted (canputnext() returns FALSE) until the queue count drops below q_lowat. getq() decrements the queue count. If the resulting count is below q_lowat, getq() re-enables and causes the service procedure to be called for any blocked queue. Flow control does not prevent reaching q_hiwat on a queue. Flow control can exceed its maximum value before canputnext() detects QFULL and flow is stopped.

The next example show a line discipline module's flow control. Example 55, Read-side Line Discipline Module shows a read-side line discipline module and a write-side line discipline module. Note that the read side is the same as the write side but without the M_IOCTL processing.

Example 55  Read-side Line Discipline Module
/* read side line discipline module flow control */
static mblk_t *read_canon(mblk_t *);

static int
ld_read_srv(
	queue_t *q)							/* pointer to read queue */
{
	mblk_t *mp;							/* original message */
	mblk_t *bp;							/* canonicalized message */

	while ((mp = getq(q)) != NULL) {
			switch (mp->b_datap->db_type) { /* type of msg */
			case M_DATA:	 /* data message */
				if (canputnext(q)) {
						bp = read_canon(mp);
						putnext(q, bp);
				} else {
						putbq(q, mp); /* put messagebackinqueue */
						return (0);
				}
				break;

			default:
				if (mp->b_datap->db_type >= QPCTL)
						putnext(q, mp); 		/* high-priority message */
				else { /* ordinary message */
						if (canputnext(q))
								 putnext(q, mp);
						else {
								 putbq(q, mp);
								 return (0);
						}
				}
				break;
			}
	}
return (0);
}

/* write side line discipline module flow control */
static int
ld_write_srv(
	queue_t *q)								/* pointer to write queue */
{
	mblk_t *mp;								/* original message */
	mblk_t *bp;								/* canonicalized message */

	while ((mp = getq(q)) != NULL) {
			switch (mp->b_datap->db_type) { /* type of msg */
			case M_DATA:			 /* data message */
				if (canputnext(q)) {
						bp = write_canon(mp);
						putnext(q, bp);
				} else {
						putbq(q, mp);
						return (0);
				}
				break;

			case M_IOCTL:
				ld_ioctl(q, mp);
				break;

			default:
				if (mp->b_datap->db_type >= QPCTL)
						putnext(q, mp);		/* high priority message */
				else { 						/* ordinary message */
						if (canputnext(q))		
								putnext(q, mp);
						else {
								putbq(q, mp);
								return (0);
						}
				}
				break;
			}
	}
return (0);
}