Transactions and Transitions
The Pipe model involves a stream of inputs that are sent either from a command or an alternative source and these inputs occasionally need to be separated into groups.
Using exec
batch
Consider a Pipe that monitors disk usage with df
:
$> df -x tmpfs
Filesystem 1K-blocks Used Available Use% Mounted on
udev 8120300 0 8120300 0% /dev
/dev/sda2 122030736 102279252 13509644 89% /
/dev/sda1 523248 6228 517020 2% /boot/efi
/dev/sdb1 960380628 366946504 544579664 41% /home/steve/hd
This will run intermittently and extract the Mounted
, Used
and Available
fields. exec
batch
is used here to mark the first and last field:
name: batch
input:
exec:
command: df -x tmpfs
interval: 1m
batch:
begin-marker-field: begin
end-marker-field: end
output:
write: console
# Output:
# {"_raw":"Filesystem 1K-blocks Used Available Use% Mounted on","begin":true}
# {"_raw":"udev 8120300 0 8120300 0% /dev"}
# {"_raw":"/dev/sda2 122030736 102287196 13501700 89% /"}
# {"_raw":"/dev/sda1 523248 6228 517020 2% /boot/efi"}
# {"_raw":"/dev/sdb1 960380628 367768744 543757424 41% /home/steve/hd","end":true}
expand
can extract these fields with delim: ' '
, which matches any whitespace characters. Use header: true
to ignore the header line, but this will only work on first use. Do the following to drop every row that has column names:
name: df-cols
input:
exec:
command: df -x tmpfs
batch:
begin-marker-field: begin
interval: 2s
actions:
- expand:
input-field: _raw
delim: ' '
remove: true
begin-marker-field: begin
csv:
header: true
relaxed-schema: true
output:
write: console
# Output:
# {"Filesystem":"udev","1K-blocks":8120300,"Used":0,"Available":8120300,"Use%":"0%","Mounted":"/dev"}
# {"Filesystem":"/dev/sda2","1K-blocks":122030736,"Used":102288448,"Available":13500448,"Use%":"89%","Mounted":"/"}
# {"Filesystem":"/dev/sda1","1K-blocks":523248,"Used":6228,"Available":517020,"Use%":"2%","Mounted":"/boot/efi"}
# {"Filesystem":"/dev/sdb1","1K-blocks":960380628,"Used":367768868,"Available":543757300,"Use%":"41%","Mounted":"/home/steve/hd"}
begin-marker-field
ensures that the column line is passed over. relaxed-schema
removes the issue related to the extra column on
.
Now we are able to drop or rename fields as needed.
Detecting Stalls And Grouping by Field
There are instances where data is not scheduled as with exec
or http-poll
. The tcp
input
allows data to arrive at any time — but it's useful to know if, and when, the data stops flowing.
name: stalled1
input:
tcp:
address: localhost:2020
plain: true
actions:
- stalled:
timeout: 2s
marker: [stalled]
output:
write: console
Another event is created if there are more than 2 seconds between events. This may appear as {"_marker":"stalled","streaming":"no"}
. When the data resumes its flow, we receive: {"_marker":"stalled","streaming":"yes"}
.
Now, imagine that various sources are connecting and writing events to TCP port 2020
. Each of the sources are identified by a particular field value, and our task is detecting when a particular source stops sending data. In this example, the field is b
and there are two sources: one
and two
, that send data every 100ms:
# Output:
# {"a":1,"b":"one"}
# {"a":2,"b":"one"}
# {"a":3,"b":"one"}
# {"a":4,"b":"one"}
# {"a":5,"b":"two"}
# {"a":6,"b":"two"}
# {"a":7,"b":"two"}
# {"a":8,"b":"one"}
# {"a":9,"b":"one"}
# {"a":10,"b":"one"}
# {"a":11,"b":"one"}
- stalled:
timeout: 200ms
marker: [stalled]
group-by: b
# Output:
# {"_marker":"stalled","streaming":"yes","stalled":"one"}
# {"a":1,"b":"one"}
# {"a":2,"b":"one"}
# {"a":3,"b":"one"}
# {"a":4,"b":"one"}
# {"_marker":"stalled","streaming":"yes","stalled":"two"}
# {"a":5,"b":"two"}
# {"a":6,"b":"two"}
# {"_marker":"stalled","streaming":"no","stalled":"one"}
# {"a":7,"b":"two"}
# {"_marker":"stalled","streaming":"yes","stalled":"one"}
# {"a":8,"b":"one"}
# {"a":9,"b":"one"}
# {"a":10,"b":"one"}
# {"_marker":"stalled","streaming":"no","stalled":"two"}
# {"a":11,"b":"one"}
The two sources are treated as separate groups — group one
is flowing away, but group two
was late.
Collecting Groups Together with Transaction
transaction
collects similar records. Using the above output stalled
allows us to collect sequences of events into arrays.
- transaction:
timeout: [stalled]
marker: [TRANS]
group-by: b
- remove: [duration,complete]
# Output:
# {"_marker":"TRANS","recs":[
# {"a":1,"b":"one"},
# {"a":2,"b":"one"},
# {"a":3,"b":"one"},
# {"a":4,"b":"one"}
# ]}
# {"_marker":"TRANS","recs":[
# {"a":5,"b":"two"},
# {"a":6,"b":"two"},
# {"a":7,"b":"two"}
# ]}
# {"_marker":"TRANS","recs":[
# {"a":8,"b":"one"},
# {"a":9,"b":"one"},
# {"a":10,"b":"one"},
# {"a":11,"b":"one"}
# ]}
Note that transaction
consumes the stalled
events.
The transaction model is well-suited to sessions, as seen in the below log:
{"action":"LOGIN","user":"Bob"}
{"action":"SEND","user":"Bob"}
{"action":"LOGIN","user":"Alice"}
{"action":"RECV","user":"Bob"}
{"action":"SEND","user":"Alice"}
{"action":"SEND-AGAIN","user":"Alice"}
{"action":"SEND-MORE","user":"Alice"}
{"action":"LOGOFF","user":"Alice"}
A session happens as follows:
- Users log in.
- Users perform actions.
- Users logout.
A session starts when the action
field matches LOGIN
and ends when it matches LOGOFF
. start-end
defines a pair of fields with a field name and a matching pattern.
A real-world complication here is that users often forget to logout. A timeout is needed to prevent this.
timeout: 200ms
marker: [stalled]
group-by: user
- transaction:
timeout: [stalled]
marker: [TRANS]
start-end:
start: [action, LOGIN]
end: [action, LOGOFF]
group-by: user
- remove: [duration]
# Output:
# {"_marker":"TRANS","complete":false,"recs":[
# {"action":"LOGIN","user":"Bob"},
# {"action":"SEND","user":"Bob"},
# {"action":"RECV","user":"Bob"}
# ]}
# {"_marker":"TRANS","complete":true,"recs":[
# {"action":"LOGIN","user":"Alice"},
# {"action":"SEND","user":"Alice"},
# {"action":"SEND-AGAIN","user":"Alice"},
# {"action":"SEND-MORE","user":"Alice"},
# {"action":"LOGOFF","user":"Alice"}
# ]}
The example above shows that user Bob
has forgotten to logout properly and timed out, resulting in the session returning "complete":false"
.