Pipes
A Pipe, a data processing pipeline, can be programmed using Pipe Language — a simple, standardized and easily repeatable set of instructions.
To make data processing more repeatable and accessible, Pipe Language provides a standardized approach to defining what a particular data processing pipeline must do.
Pipes are defined in YAML format. Here is an example of a simple Pipe that makes an HTTP request using http-poll
, parses a JSON response, applies rename
to a field, and writes the output
to a file
:
name: simple_http
input:
http-poll:
url: "http://httpbin.org/get"
raw: true
ignore-line-breaks: true
actions:
- rename:
- origin: source_ip
output:
file:
path: "/tmp/output-http.log"
All Pipe definitions follow the same pattern.
An
input
specifies where data comes from.Any number of optional
actions
that define the sequential transformations and manipulations that you wish to apply on theinput
data. Mostactions
work with JSON.An
output
that tells the Pipe where the processed data must be delivered to. This data can then be indexed by Elasticsearch and queried using analytics tools such as Kibana or Grafana.
Based on this, a Pipe is:
input
=> action1
=> action2
=> ...
=> output
actions
are optional, with the only required top-level fields being name
, input
, and output
.
A Pipe definition is declarative: you specify what needs to happen and it be determined how it must happen. In addition to data manipulation, transformation, parsing, and extraction capabilities, advanced features for scheduling, batching and enrichment are also available, making data streaming simpler.
Pipe definitions are attached to one or more Agents — the component used to execute and manage one or more Pipes. Pipes are deployed to Agents using the web-based UI, CLI, or HTTP API.
Here is another example of a Pipe, executing the uptime
command
:
name: uptime
input:
exec:
command: uptime
interval: 10s
actions:
- extract:
input-field: _raw
remove: true
pattern: 'load average: (\S+), (\S+), (\S+)'
output-fields: [m1, m5, m15]
output:
write: console
In this example, input
data comes from executing the uptime
command
every 10s
. Load averages of 1 minute
, 5 minutes
, 15 minutes
are then extracted under actions
using a regex pattern
from the input
line with extract
under actions
. Thereafter, output
is written to the console
.
So, from a typical input
:
$> uptime
09:07:36 up 16 min, 1 user, load average: 0.08, 0.23, 0.31
We receive a JSON record that appears as {"m1":"0.08","m5":"0.23","m15":"0.31"}
.
Time series events generally have an associated time and must be in an appropriate format in order to be processed and queried by data analytics systems. If our actions
were:
- extract:
input-field: _raw
remove: true
pattern: 'load average: (\S+), (\S+), (\S+)'
output-fields: [m1, m5, m15]
- convert:
- m1: num
- m5: num
- m15: num
- time:
output-field: '@timestamp'
Then the result will look like this:
{"m1":0.08,"m5":0.23,"m15":0.31,"@timestamp":"2018-08-21T09:30:36.000Z"}
Most actions
work using JSON data. Although real-world data normally outputs in raw
text format, the exec
input
will by default quote it as JSON: {"_raw":"output line"}
.
The various extraction steps work within this raw
data format and generate sensible event fields.
Available inputs, outputs, and actions
can be found here.
Adding Pipes
The CLI process is outlined below.
Assuming you are logged in, then:
$> hotrod agents add Joburg
agent Joburg has id e94ccdca-f379-447b-8c90-6976e77652ec
$> hotrod agents add Durban
agent Durban has id 8a1a0a29-d8f8-4098-a016-9d08f841f9a4
$> hotrod agents list
name | id | tags | pipes | last seen
-------+-------------------------------------- +------+-------+-----------
Joburg | e94ccdca-f379-447b-8c90-6976e77652ec | | |
Durban | 8a1a0a29-d8f8-4098-a016-9d08f841f9a4 | | |
Unless you explicitly specify --id
with hotrod agents add
, new Agents will be assigned a unique identifier (UUID).
Agent names and IDs must be unique!
The provided name of the Pipe is known as a Pipe definition (pipename.yml
). Pipe names must be unique and the CLI will enforce that the name of the Pipe corresponds to its filename.
Pipe names must be unique!
A Pipe is loaded with the hotrod pipes add
subcommand:
$> hotrod pipes add --file uptime.yml
$> hotrod pipes list
name
------
uptime
$> hotrod pipes show uptime
name: uptime
input:
exec:
command: uptime
interval: 2s
actions:
- extract:
input-field: _raw
remove: true
pattern: 'load average: (\S+), (\S+), (\S+)'
output-fields: [m1, m5, m15]
- convert:
- m1: num
- m5: num
- m15: num
- time:
output-field: '@timestamp'
output:
write: console
Remember, the Pipe is not associated with any Agent yet. Therefore, we update a particular Agent as follows:
$> hotrod agents update Joburg --add-pipe uptime
$> hotrod agents list
name | id | tags | pipes | last seen
-------+-------------------------------------- +------+--------+-----------
Joburg | e94ccdca-f379-447b-8c90-6976e77652ec | | uptime |
Durban | 8a1a0a29-d8f8-4098-a016-9d08f841f9a4 | | |
The Pipe is now staged for the Joburg Agent and will be deployed using the configured Agent.
If you do hotrod agents update Joburg --remove-pipe uptime
it will then be removed from the staging area and stopped by the Agent.