Skip to main content

Testing Pipes

It's useful to test Pipes, in their entirety or sections, prior to acually deploying them.

Consider this basic Pipe that has the following context expansion:

name: echo_msg

context:
msg: hello

input:
exec:
command: 'echo {{msg}}'

output:
write: console

Run the Pipe:

$> hotrod pipes run --file test.yml

{"_raw":"hello"}

To experiment with overriding context, create echo_msg_context.yml, containing a context msg:

context:
msg: goodbye

Then, override the Pipe context:

$> hotrod pipes run --file test.yml --context-files echo_msg_context.yml

{"_raw":"goodbye"}

Experimenting with Pipes in this way builds confidence. For the more complicated Pipes, it is always easier to troubleshoot problems on localhost.

The following Pipe consumes a file times.txt containing times in Unix time (epoch) format. The Pipe must be used to detect any gaps in this time record:

1592928057
1592928058
1592928059
1592928060
1592928061
1592928062
1592928063
1592928064
1592928065
1592928066

The time is depicted as a string with the default field _raw. We rename and convert it into a number:

name: epoch_stream

input:
exec:
command: cat times.txt

actions:
- rename:
- _raw: epoch

- convert:
- epoch: num

output:
write: console

Run the Pipe:

$> hotrod pipes run -f epoch_stream.yml

{"epoch":1592928057}
{"epoch":1592928058}
{"epoch":1592928059}
{"epoch":1592928060}
{"epoch":1592928061}
{"epoch":1592928062}
{"epoch":1592928063}
{"epoch":1592928064}
{"epoch":1592928065}
{"epoch":1592928066}

Use the stream action to find the difference between times (just after convert):

name: epoch_stream

input:
exec:
command: cat times.txt

actions:
- rename:
- _raw: epoch

- convert:
- epoch: num

- stream:
operation: delta
watch: epoch

output:
write: console

The resulting output:

{"epoch":1592928057,"delta":1592928057,"elapsed":0}
{"epoch":1592928058,"delta":1,"elapsed":0}
{"epoch":1592928059,"delta":1,"elapsed":0}
{"epoch":1592928060,"delta":1,"elapsed":0}
{"epoch":1592928061,"delta":1,"elapsed":0}
{"epoch":1592928062,"delta":1,"elapsed":0}
{"epoch":1592928063,"delta":1,"elapsed":0}
{"epoch":1592928064,"delta":1,"elapsed":0}
{"epoch":1592928065,"delta":1,"elapsed":0}
{"epoch":1592928066,"delta":1,"elapsed":0}

Since 1592928057 has no preceeding time, the delta is 1592928057, thereafter we have a delta of 1 for each subsequent line as the time difference between each line is 1s.

To detect gaps, only pass events through where delta is not equal to 1 and where not the first event using the filter action:

name: epoch_stream

input:
exec:
command: cat times.txt

actions:
- rename:
- _raw: epoch

- convert:
- epoch: num

- stream:
operation: delta
watch: epoch

- filter:
condition: delta != 1 and count() > 1

output:
write: console

The Pipe will output nothing, but we can now provoke output by putting gaps into time.txt (1592928061 and 1592928062 have been removed):

1592928057
1592928058
1592928059
1592928060
1592928063
1592928064
1592928065
1592928066

The resulting output:

{"epoch":1592928063,"delta":3,"elapsed":0}

This is an example of a Pipe monitoring data and detecting anomalies. Therefore, the best way to build non-trivial Pipes is step-by-step.

It’s possible to override the input of a Pipe. This is useful for testing Pipes, which get their input from other sources e.g., HTTP requests, TCP etc. For instance, the version of epoch_stream.yml that ends with the stream action, can be fed a single timestamp:

$> hotrod pipes run -f epoch_stream.yml --input 1592928057

The resulting output:

{"epoch":1592928057,"delta":1592928057,"elapsed":0}

Using --input @times.txt gives us the same result as before — if the argument starts with @, it is considered a file which contains the data (same as curl, see man curl).

Use --output <filename> to save the output to a file.

Using trace in a Pipe will show you how each step transforms the data:

name: epoch_stream

input:
# Step 0.
exec:
command: cat times.txt

actions:
# Step 1.
- rename:
- _raw: epoch

# Step 2.
- convert:
- epoch: num

# Step 3.
- stream:
operation: delta
watch: epoch

output:
# Step 4.
write: console

Run the last example with tracing enabled:

$> hotrod pipes run -f epoch_stream.yml --input 1592928057 --trace

[TRACE] action-rename step 1
LINE: {"_raw":"1592928057"} -> [{"epoch":"1592928057"}]
[TRACE] action-convert step 2
LINE: {"epoch":"1592928057"} -> [{"epoch":1592928057}]
[TRACE] action-stream step 3
LINE: {"epoch":1592928057} -> [{"epoch":1592928057,"delta":1592928057,"elapsed":0}]
[TRACE] output-exec step 4
LINE: {"epoch":1592928057,"delta":1592928057,"elapsed":0}
{"epoch":1592928057,"delta":1592928057,"elapsed":0}