STREAMS Programming Guide

Flow Control in Service Procedures

The STREAMS flow control mechanism is voluntary and operates between the two nearest queues in a stream containing service procedures (see Figure 7–13). Messages are held on a queue only if a service procedure is present in the associated queue.

Messages accumulate on a queue when the queue's service procedure processing does not keep pace with the message arrival rate, or when the procedure is blocked from placing its messages on the following STREAMS component by the flow control mechanism. Pushable modules can contain independent upstream and downstream limits. The stream head contains a preset upstream limit (which can be modified by a special message sent from downstream) and a driver can contain a downstream limit. See M_SETOPTS for more information.

Flow control operates as follows:

Figure 7–13 Flow Control Mechanism

Diagram shows three queues in a stream, two of which have service
procedures for hadling message queues.

Modules and drivers need to observe the message priority. High-priority messages, determined by the type of the first block in the message,


mp->b_datap->db_type >= QPCTL

are not subject to flow control. They should be processed immediately and forwarded, as appropriate.

For ordinary messages, flow control must be tested before any processing is performed. canputnext(9F) determines if the forward path from the queue is blocked by flow control.

This is the general flow control processing of ordinary messages:


Caution – Caution –

High-priority messages must be processed and not placed back on the queue.


The canonical representation of this processing within a service procedure is:

while (getq() != NULL)
	if (high priority message || no flow control) {
		process message
		putnext()
	} else {
		putbq()
		return
	}

Expedited data has its own flow control with the same processing method as that of ordinary messages. bcanputnext(9F) provides modules and drivers with a test of flow control in a priority band. It returns 1 if a message of the given priority can be placed in the queue. It returns 0 if the priority band is flow controlled. If the band does not exist in the queue in question, the routine returns 1.

If the band is flow controlled, the higher bands are not affected. However, lower bands are also stopped from sending messages. Without this, lower priority messages can be passed along ahead of the flow-controlled higher priority messages.

The call bcanputnext(q, 0); is equivalent to the call canputnext(q);.


Note –

A service procedure must process all messages in its queue unless flow control prevents this.


A service procedure must continue processing messages from its queue until getq(9F) returns NULL. When an ordinary message is queued by putq(9F), the service procedure is scheduled only if the queue was previously empty, and a previous getq(9F) call returns NULL (that is, the QWANTR flag is set). If there are messages in the queue, putq(9F) presumes the service procedure is blocked by flow control and the procedure is automatically rescheduled by STREAMS when the block is removed. If the service procedure cannot complete processing as a result of conditions other than flow control (for example, no buffers), it must ensure a later return (for example, by bufcall(9F)) or discard all messages in the queue. If this is not done, STREAMS never schedules the service procedure to be run unless the queue's put procedure queues a priority message with putq(9F).


Note –

High-priority messages are discarded only if there is already a high-priority message on the stream head read queue. That is, there can be only one high-priority message (PC_PROTO) present on the stream head read queue at any time.


putbq(9F) replaces a message at the beginning of the appropriate section of the message queue according to its priority. This might not be the same position at which the message was retrieved by the preceding getq(9F). A subsequent getq(9F) might return a different message.

putq(9F) checks only the priority band in the first message. If a high-priority message is passed to putq with a nonzero b_band value, b_band is reset to 0 before placing the message in the queue. If the message is passed to putq(9F) with a b_band value that is greater than the number of qband(9S)structures associated with the queue, putq(9F) tries to allocate a new qband(9S) structure for each band, up to and including the band of the message.

rmvq and insq work similarly. If you try to insert a message out of order in a queue with insq(9F), the message is not inserted and the routine fails.

putq(9F) does not schedule a queue if noenable(9F) was previously called for the queue. noenable(9F) forces putq(9F) to queue the message when called by this queue, but not to schedule the service procedure. noenable(9F) does not prevent the queue from being scheduled by a flow control back-enable. The inverse of noenable(9F) is enableok(9F).

The service procedure is written using the following algorithm:

while ((bp = getq(q)) != NULL) {
	if (queclass (bp) == QPCTL) {
		/* Process the message */
		putnext(q, bp);
	 } else if (bcanputnext(q, bp->b_band)) {
		/* Process the message */
		putnext(q, bp);
	 } else {
		putbq(q, bp);
		return;
	 }
 }

If the module or driver ignores priority bands, the algorithm is the same as described in the previous paragraphs, except that canputnext(q) is substituted for bcanputnex(q, bp->b_band).

qenable(9F), another flow-control utility, enables a module or driver to cause one of its queues, or another module's queues, to be scheduled. qenable(9F) can also be used to delay message processing. An example of this is a buffer module that gathers messages in its message queue and forwards them as a single, larger message. This module uses noenable(9F) to inhibit its service procedure and queues messages with its put procedure until a certain byte count or “in queue” time has been reached. When either of these conditions is met, the module calls qenable(9F) to cause its service procedure to run.

Another example is a communication line discipline module that implements end-to-end (for example, to a remote system) flow control. Outbound data is held on the write side message queue until the read side receives a transmit window from the remote end of the network.


Note –

STREAMS routines are called at different priority levels. Interrupt routines are called at the interrupt priority of the interrupting device. Service routines are called with interrupts enabled (so that service routines for STREAMS drivers can be interrupted by their own interrupt routines).