Joining Data From Multiple Aggregations

DTrace offers the option to process data from multiple aggregations with similar keys, for example, processing statistical data on system call latency, which is the minimum, maximum, average, and standard deviation of time spent in system calls. The printa() action enables you to print multiple aggregations, as shown in the following example.

# cat syscall-latency-stats.d
#!/usr/sbin/dtrace -qs

#pragma D option aggsortpos=2

syscall:::entry
{
        self->ts = timestamp;
}
syscall:::return
/ self->ts /
{
        this->lat = timestamp - self->ts;
        @m[probefunc] = min(this->lat);
        @M[probefunc] = max(this->lat);
        @a[probefunc] = avg(this->lat);
        self->ts = 0;
}
END
{
        printa("%-20s min: %12@d max:%12@d avg:%12@d\n", @m, @M, @a);
}#
# ./syscall-latency-stats.d
^C
[ ... ]
close             min:             19559 max:         38758 avg:         29158
schedctl          min:             36407 max:         36407 avg:         36407
write             min:              5156 max:        170056 avg:         87716
send              min:             97028 max:         97028 avg:         97028
connect           min:            169528 max:        169528 avg:        169528
lwp_cond_wait     min:             75977 max:    1001221741 avg:      47341037
read              min:              1253 max:    1000786548 avg:      55212840
lwp_park          min:              2275 max:    2000410123 avg:     521297430
pollsys           min:              2611 max:    5000232030 avg:     545102592
#

A custom consumer can use the dtrace_aggregation_walk_joined() function to process multiple aggregations similar to this DTrace script. This function has the following signature:

int dtrace_aggregate_walk_joined(dtrace_hdl_t *,
    dtrace_aggvarid_t *, int, dtrace_aggregate_walk_joined_f *, void *)

The arguments to the dtrace_aggregate_walk_joined()function are as follows:

  • DTrace handle

  • Array of dtrace_aggvarid_t – The identifiers for the aggregations to be joined

  • Number of elements in that array – The number of aggregations to be joined

  • Function to process each entry in the aggregation

  • Private argument to be passed

The dtrace_aggregate_walk_joined() function bundles aggregations data with the values containing the same key. The specified function is called on each bundle. The function syntax is as follows:

typedef int dtrace_aggregate_walk_joined_f(const dtrace_aggdata_t **,
    const int, void *);

The arguments to the dtrace_aggregate_walk_joined_f() function are:

  • Array of dtrace_aggdata_t pointers. The bundle of data with similar values to be processed.

  • Number of elements in the bundle

  • Private argument originally passed to the dtrace_aggregate_walk_joined() function

The dtrace_aggregate_walk_joined() function is similar to the walk() function that is passed to the other aggregation walkers except that the function passes an array of pointers to aggregation data rather than a single pointer. This function processes the aggregation data from each of the elements in this array.

The following example shows the dtrace_aggregate_walk_joined() function. This function processes the data to gather statistics on system call latency and then prints the data after the data is extracted.

In this example, the key for this bundle is stored in the zeroth element of data, but the values start at data[1]. The value in data[1] contains the same value as data[0].

static int
walk_joined(const dtrace_aggdata_t **data, const int naggs, void *arg)
{
        dtrace_aggdesc_t *aggdesc;
        dtrace_recdesc_t *keyrec, *datarec;
        char *syscall;
        int64_t stats[4], *avgdata;
        int i;

        aggdesc = data[0]->dtada_desc; 
        keyrec = &aggdesc->dtagd_rec[1];
        syscall = data[0]->dtada_data + keyrec->dtrd_offset;

        for (i = 1; i < naggs; i++) {
                aggdesc = data[i]->dtada_desc;
                datarec = &aggdesc->dtagd_rec[2];

                switch (datarec->dtrd_action) {
                case DTRACEAGG_MIN:
                case DTRACEAGG_MAX:
                        stats[i] = *((int64_t *)(data[i]->dtada_data +
                            datarec->dtrd_offset));
                        break;
                case DTRACEAGG_AVG:
                        avgdata = (int64_t *)(data[i]->dtada_data +
                            datarec->dtrd_offset);
                        stats[i] = avgdata[0] ? avgdata[1] / avgdata[0] : 0;
                        break;
                default:
                        fatal("Incorrect record type in walk_joined()\n");
                        break;
                }       
         }

         printf("%-20s min: %12lld max: %12lld avg: %12lld\n", syscall,
             stats[1], stats[2], stats[3]);

         return (DTRACE_AGGWALK_NEXT);
}

For the call to dtrace_aggregate_walk_joined(), you must know the IDs for the aggregations you wish to process. In the following example, the chewrec() function shows how to extract the aggregation IDs when processing the record for the printa() action.

static int 
chewrec(const dtrace_probedata_t *data, const dtrace_recdesc_t *rec,
    void *arg)
{
       dtrace_actkind_t act;
       dtrace_eprobedesc_t *epd = data->dtpda_edesc;
       dtrace_aggvarid_t aggvars[3];
       const void *buf;
       int i, nagv;

       if (rec == NULL)
               return (DTRACE_CONSUME_NEXT);
       act = rec->dtrd_action;
       buf = data->dtpda_data - rec->dtrd_offset;
 
       if (act == DTRACEACT_EXIT)
               return (DTRACE_CONSUME_NEXT);

       if (act == DTRACEACT_PRINTA) {
               for (nagv = 0, i = 0; i < 3; i++) {
                       const dtrace_recdesc_t *nrec = &rec[i];

                       if (nrec->dtrd_uarg != rec->dtrd_uarg)
                               break;

                       aggvars[nagv++] = *((dtrace_aggvarid_t *)
                       ((caddr_t)buf + nrec->dtrd_offset));
                }
                if (nagv == 3)
                       if (dtrace_aggregate_walk_joined(g_dtp, aggvars, nagv,
                            walk_joined, NULL) == -1)
                               fatal("dtrace_aggregate_walk_joined failed");
        }
        return (DTRACE_CONSUME_NEXT);
}