Skip to main content

Pipes

A Pipe, a data processing pipeline, can be programmed using Pipe Language — a simple, standardized and easily repeatable set of instructions.

To make data processing more repeatable and accessible, Pipe Language provides a standardized approach to defining what a particular data processing pipeline must do.

Pipes are defined in YAML format. Here is an example of a simple Pipe that makes an HTTP request using http-poll, parses a JSON response, applies rename to a field, and writes the output to a file:

name: simple_http

input:
http-poll:
url: "http://httpbin.org/get"
raw: true
ignore-line-breaks: true

actions:
- rename:
- origin: source_ip

output:
file:
path: "/tmp/output-http.log"

All Pipe definitions follow the same pattern.

  1. An input specifies where data comes from.

  2. Any number of optional actions that define the sequential transformations and manipulations that you wish to apply on the input data. Most actions work with JSON.

  3. An output that tells the Pipe where the processed data must be delivered to. This data can then be indexed by Elasticsearch and queried using analytics tools such as Kibana or Grafana.

Based on this, a Pipe is:

input => action1 => action2 => ... => output

note

actions are optional, with the only required top-level fields being name, input, and output.

A Pipe definition is declarative: you specify what needs to happen and it be determined how it must happen. In addition to data manipulation, transformation, parsing, and extraction capabilities, advanced features for scheduling, batching and enrichment are also available, making data streaming simpler.

Pipe definitions are attached to one or more Agents — the component used to execute and manage one or more Pipes. Pipes are deployed to Agents using the web-based UI, CLI, or HTTP API.

Here is another example of a Pipe, executing the uptime command:

name: uptime

input:
exec:
command: uptime
interval: 10s

actions:
- extract:
input-field: _raw
remove: true
pattern: 'load average: (\S+), (\S+), (\S+)'
output-fields: [m1, m5, m15]

output:
write: console

In this example, input data comes from executing the uptimecommand every 10s. Load averages of 1 minute, 5 minutes, 15 minutes are then extracted under actions using a regex pattern from the input line with extract under actions. Thereafter, output is written to the console.

So, from a typical input:

$> uptime

09:07:36 up 16 min, 1 user, load average: 0.08, 0.23, 0.31

We receive a JSON record that appears as {"m1":"0.08","m5":"0.23","m15":"0.31"}.

Time series events generally have an associated time and must be in an appropriate format in order to be processed and queried by data analytics systems. If our actions were:

- extract:
input-field: _raw
remove: true
pattern: 'load average: (\S+), (\S+), (\S+)'
output-fields: [m1, m5, m15]

- convert:
- m1: num
- m5: num
- m15: num

- time:
output-field: '@timestamp'

Then the result will look like this:

{"m1":0.08,"m5":0.23,"m15":0.31,"@timestamp":"2018-08-21T09:30:36.000Z"}

Most actions work using JSON data. Although real-world data normally outputs in raw text format, the execinput will by default quote it as JSON: {"_raw":"output line"}.

The various extraction steps work within this raw data format and generate sensible event fields.

Available inputs, outputs, and actions can be found here.

Adding Pipes

The CLI process is outlined below.

Assuming you are logged in, then:

$> hotrod agents add Joburg

agent Joburg has id e94ccdca-f379-447b-8c90-6976e77652ec

$> hotrod agents add Durban

agent Durban has id 8a1a0a29-d8f8-4098-a016-9d08f841f9a4

$> hotrod agents list

name | id | tags | pipes | last seen
-------+-------------------------------------- +------+-------+-----------
Joburg | e94ccdca-f379-447b-8c90-6976e77652ec | | |
Durban | 8a1a0a29-d8f8-4098-a016-9d08f841f9a4 | | |

Unless you explicitly specify --id with hotrod agents add, new Agents will be assigned a unique identifier (UUID).

note

Agent names and IDs must be unique!

The provided name of the Pipe is known as a Pipe definition (pipename.yml). Pipe names must be unique and the CLI will enforce that the name of the Pipe corresponds to its filename.

note

Pipe names must be unique!

A Pipe is loaded with the hotrod pipes add subcommand:

$> hotrod pipes add --file uptime.yml

$> hotrod pipes list

name
------
uptime

$> hotrod pipes show uptime

name: uptime
input:
exec:
command: uptime
interval: 2s
actions:
- extract:
input-field: _raw
remove: true
pattern: 'load average: (\S+), (\S+), (\S+)'
output-fields: [m1, m5, m15]
- convert:
- m1: num
- m5: num
- m15: num
- time:
output-field: '@timestamp'
output:
write: console

Remember, the Pipe is not associated with any Agent yet. Therefore, we update a particular Agent as follows:

$> hotrod agents update Joburg --add-pipe uptime

$> hotrod agents list

name | id | tags | pipes | last seen
-------+-------------------------------------- +------+--------+-----------
Joburg | e94ccdca-f379-447b-8c90-6976e77652ec | | uptime |
Durban | 8a1a0a29-d8f8-4098-a016-9d08f841f9a4 | | |

The Pipe is now staged for the Joburg Agent and will be deployed using the configured Agent.

If you do hotrod agents update Joburg --remove-pipe uptime it will then be removed from the staging area and stopped by the Agent.