Transform¶
This example demonstrates using Tremor as a proxy and aggregator for InfluxDB data. As such it coveres three topics. Ingesting and decoding influx data is the first part. Then grouping this data and aggregating over it.
The demo starts up a local Chronograf. This allows browsing the data stored in influxdb. When first connecting you'll be asked to specify the database to use, please change the **Connection URL** to http://influxdb:8086
. For all other questions select Skip
as we do not need to configure those.
Once in Chronograf, look at the tremor
database to see the metrics and rollups. Since rollups do roll up over time you might have to wait a few minutes untill aggregated data propagates.
Depending on the performance of the system the demo is run on metrics may be shed due to tremors over load protection.
Environment¶
In the example.trickle
we process the data in multiple steps, since this is somewhat more complex then the prior examples we'll discuss each step in the Business Logic section.
Business Logic¶
Grouping¶
select {
"measurement": event.measurement,
"tags": event.tags,
"field": group[2],
"value": event.fields[group[2]],
"timestamp": event.timestamp,
}
from in
group by set(event.measurement, event.tags, each(record::keys(event.fields)))
into aggregate
having type::is_number(event.value);
This step groups the data for aggregation. This is required since the Influx Line protocol allows for multiple values within one message. The grouping step ensures that we do not aggregate cpu_idle
and cpu_user
into the same value despite them being in the same result.
In other words we normalise an event like this
measurement tag1=value1,tag2=value2 field1=42,field2="snot",field3=0.2 123587512345513
into the three distinct series it represents, namely:
measurement tag1=value1,tag2=value2 field1=42 123587512345513
measurement tag1=value1,tag2=value2 field2="snot" 123587512345513
measurement tag1=value1,tag2=value2 field3=0.2 123587512345513
The second part that happens in this query is removing non numeric values from our aggregated series since they are not able to be aggregated.
Aggregation¶
select
{
"measurement": event.measurement,
"tags": patch event.tags of insert "window" => window end,
"stats": aggr::stats::hdr(event.value, [ "0.5", "0.9", "0.99", "0.999" ]),
"field": event.field,
"timestamp": aggr::win::first(event.timestamp), # we can't use min since it's a float
}
from aggregate[`10secs`, `1min`, ]
group by set(event.measurement, event.tags, event.field)
into normalize;
In this section we aggregate the different serieses we created in the previous section.
Most notably are the aggr::stats::hdr
and aggr::win::first
functions which do the aggregation. aggr::stats::hdr
uses a optimized HDR Histogram algorithm to generate the values requested of it. aggr::win::first
gives the timestamp of the first event in the window.
Normalisation to Influx Line Protocol¶
select {
"measurement": event.measurement,
"tags": event.tags,
"fields": {
"count_#{event.field}": event.stats.count,
"min_#{event.field}": event.stats.min,
"max_#{event.field}": event.stats.max,
"mean_#{event.field}": event.stats.mean,
"stdev_#{event.field}": event.stats.stdev,
"var_#{event.field}": event.stats.var,
"p50_#{event.field}": event.stats.percentiles["0.5"],
"p90_#{event.field}": event.stats.percentiles["0.9"],
"p99_#{event.field}": event.stats.percentiles["0.99"],
"p99.9_#{event.field}": event.stats.percentiles["0.999"]
},
"timestamp": event.timestamp,
}
from normalize
into batch;
The last part normalises the data to a format that can be encoded into influx line protocol. And name the fields accordingly. This uses string interpolation for the recortd fields and simle value access for their values.
Command line testing during logic development¶
$ docker-compose up
... lots of logs ...
Open the Chronograf and connect the database.
Visualization with Grafana¶
The docker compose file deploys a grafana image that can be used with the
influx data source configured to make a server connection to http://influxdb:8086
and explored via the logs or metrics explorer modes.
Discussion¶
It is noteworthy that in the aggregation context only aggr::stats::hdr
and aggr::win::first
are being evaluated for events, resulting record and the associated logic is only ever evaluated on emit.
We are using having
in the goruping step, however this could also be done with a where
clause on the aggregation step. In this example we choose having
over were as it is worth discarding events as early as possible. If the requirement were to handle non numeric fields in a different manner routing the output of the grouping step to two different select statements we would have used where
instead.
Tip
Using aggr::win::first
over aggr::stats::min
is a debatable choice as we use the timestamp of the first event not the minimal timestamp. Inside of tremor we do not re-order events so those two would result in the same result with aggr::win::first
being cheaper to run. In addition stats functions are currently implemented to return floating point numbers so aggr::stats::min
could lead incorrect timestamps we'd rather avoid.