Skip to main content

Scheduling Inputs

The majority of inputs, such as exec and http-poll, schedule inputs by time. Those that don’t, schedule based on external data, e.g., tcp.

Scheduling with Intervals

interval allows for equal intervals between command invocations with execution starting immediately. For example, if we start at 11:54:59 with interval: 1m, the next time will be 11:55:59 and so forth.

name: interval

input:
exec:
command: date
interval: 1m

output:
write: console

# Output:
# {"_raw":"Fri 31 Mar 2023 11:54:59 SAST"}
# {"_raw":"Fri 31 Mar 2023 11:55:59 SAST"}
# {"_raw":"Fri 31 Mar 2023 11:56:59 SAST"}

cron enables complete control for precise scheduling.

note

The cron expression uses a 6-token format. The first token, allows for scheduling seconds. Using standard 5-token (see man 5 crontab) cron expressions will result in a parse error.

name: cron

input:
exec:
command: date
cron: '13 * * * * *'

output:
write: console

# Output:
# {"_raw":"Fri 31 Mar 2023 12:09:13 SAST"}
# {"_raw":"Fri 31 Mar 2023 12:10:13 SAST"}
# {"_raw":"Fri 31 Mar 2023 12:11:13 SAST"}

The command runs at each minute. See here for a more detailed explanation.

note

There are pre-defined values such as @hourly, @daily, @weekly... see man 5 crontab.

As cron commands occur at precise clock times, it is often necessary to introduce a random-offset. This ensures that a thousand Agents do not overwhelm an endpoint by polling it at the same time:

input:
exec:
command: date
# Each minute: 16:00:00, 16:01:00, ...
interval: 1m
# Run at: 16:00:05, 16:01:05, ...
random-offset: 5s

Notice that cron scheduling allows for immediate: true. This adjusts it to an immediate start regardless of the schedule.

Windowing

Windowing is a useful feature for querying a resource that needs a specified time window.

input:
exec:
command: 'echo ${start_time_secs} ${end_time_secs}'
cron: '*/5 * * * * *'
#interval: 5s
window:
size: 60s

The end_time is "now", which is expressed as a Unix timestamp. The start_time is "now minus 60s".

window also has offset that can be used in cases where you need to wait for the data to settle, such as when Elasticsearch is ingesting real-time data. This will shift the start_time and end_time interval back by the offset (same seconds unit as interval and size). start_time and end_time occur before the offset.

If setting interval or cron and window (window: with any options), then size must be set. If setting interval or cron and window (window: with no options), then size is automatically set.

In addition to windowing intervals, the scheduler (either cron or interval), supports start-time. This allows the windowing to start at a specified time — which must be in the following format: 2019-07-10 18:45:00.000 +02:00. start-time requires highwatermark-file, which enables us to track the last successful window time and resume from the last point.

This means that if a Pipe using highwatermark-file is stopped and restarted, the scheduler will "catch up" and run backfills for any intervals that were missing. offset is ignored with start-time until the backfill is completed with "now" time being the current time, then offset is applied once again.

warn

start-time:

  • in the future results in an error
  • and highwatermark-file are mutually inclusive, else error
  • results in offset being ignored with backfills
  • is ignored if highwatermark-file contains a timestamp
note

interval and cron are mutually exclusive, both require size when window is set with any options

Scheduling Variables

Special variables are available, including those that support custom time formats, e.g., ${start_time_fmt %F %T} will give a local date-time such as 2019-09-28 15:36:02. The start_* and end_* refer to the current window.

  • now_time_secs: current time in seconds since epoch
  • now_time_msecs: current time in milliseconds since epoch
  • now_time_iso: current time in "%Y-%m-%dT%H:%M:%SZ" format
  • start_time_secs: start time in seconds since epoch
  • start_time_msecs: start time in milliseconds since epoch
  • start_time_iso: start time in "%Y-%m-%dT%H:%M:%SZ" format
  • end_time_secs: end time in seconds since epoch
  • end_time_msecs: end time in seconds since epoch
  • end_time_iso: end time in "%Y-%m-%dT%H:%M:%SZ" format
  • now_time_fmt <fmt>: e.g., ${now_time_fmt %F %T}
  • start_time_fmt <fmt>: e.g., ${start_time_fmt %Y-%m-%dT%H:%M:%SZ}
  • end_time_fmt <fmt>: e.g., ${end_time_fmt %s}

This example shows these special fields in use. We need start_time and end_time in a particular format for a complex VoIP statistics endpoint:

context:
address: http://XXXX:4444
parms:
- filter.application=rtp
- filter.custom_filter=ip=X.X.X.X
- info.aggregate_level=precise
- pager.nb_elements=10000
- sorter.order_column=capture_begin
- sorter.order_dir=sort_asc
- filter.capture_begin='${start_time_fmt %F%%20%T}'
- filter.capture_end='${end_time_fmt %F%%20%T}'
path: /nevrax/network/conv_voip_details.html/voip_table_descriptor/table.csv
password: XXX:XXX
filter: "sed -e 's/-;/0;/g'|sed -e 's/;user/ and user/g'|sed -e 's/\"//g'"

input:
exec:
command: "curl -k -u {{password}} '{{address}}{{path}}?{{parms '&'}}' | {{filter}}"
interval: 10s
window:
size: 10s
offset: 2s
...

This illustrates how Context fields can make complicated commands clearer. Also note how the query parameters are constructed by joining the parms array with a '&'.

It is recommended to use http-post wherever possible, however, the principles remain the same.

Marking Batches

Scheduled commands introduce the concept of a batch of events that occur together at intervals. A uuid field is added to each batch. If the data was JSON (i.e., json: true), then the field is merged into the incoming event.

For example, Netter Corp has a secure Linux host, secure.netter.lan with the following restrictions:

  • console (tty) login only (no remote access)
  • auditd log in /var/log/audit/auditd.log
  • host is rebooted every 24 hours to enforce security policy

The main issue is, Linux PAM (Pluggable Authentication Modules) session IDs begin at 1 after each reboot (ses=1 in audit.log). Between reboots we have duplicate IDs. There is a need to assign a uuid to distinguish sessions with the same IDs. The log is rotated at each reboot too.

A uuid field is added to each session batch. If the data was JSON (i.e., json: true), then the field is merged into the incoming event. Assume for now we obtain the session ID via an external mechanism (see man pam_exec) and pass it as a context variable (e.g., session=9) to the Pipe. pam_exec will obtain the session ID upon user logout at session close time and run hotrod context tag ausearch session=11. The Pipe with a ausearch tag will be updated witht the fresh context and restarted by the Agent after polling the Server for updates. Now, strictly speaking, this is not a scheduled input like the above examples as the input interval is determined outside of the Pipe.

If the session ID is not found (possible log lag or other delays), Exited(1): <no matches> is the ausearch output. The Pipe will retry for a count of 100 with a pause of 10s between retries. There is no need for a command count or interval as only 1 successful command run is required. The ausearch argument (see man ausearch) --start $(date '+%x'), is today's date (locale date representation, see man date).

The Pipe definition:

name: ausearch_batch

context:
session: <int>

input:
exec:
command: sudo ausearch -m USER_START,USER_END,USER_CMD,USER_LOGIN,USER_LOGOUT,LOGIN --format text --session {{session}} --start $(date '+%x')
retry:
count: 100
pause: 10s
ignore-line-breaks: true
no-strip-linefeeds: true
batch:
uuid-field: uuid

output:
file:
path: /tmp/ausearch_batch-${now_time_fmt %s}.log

Run the Pipe:

$> hotrod pipes run --file ausearch_batch.yml session=39                                 2 ↵

2023-05-02T11:42:08.846Z INFO pipe > shutting down aggregator

The resulting output in /tmp/ausearch_batch-1683028631.log (by default, sudo events are not logged):

{"_raw":"At 13:56:28 02/05/2023 system, acting as root, successfully changed-login-id-to robertg ","uuid":"cf54cb34-f216-4fc4-9333-be7b66c4e4e5"}
{"_raw":"At 13:56:28 02/05/2023 robertg, acting as root, successfully started-session /dev/tty1 using /usr/bin/login","uuid":"cf54cb34-f216-4fc4-9333-be7b66c4e4e5"}
{"_raw":"At 13:56:28 02/05/2023 robertg successfully logged-in tty1 using /usr/bin/login","uuid":"cf54cb34-f216-4fc4-9333-be7b66c4e4e5"}
{"_raw":"At 13:56:34 02/05/2023 robertg, acting as root, successfully ended-session /dev/tty1 ","uuid":"cf54cb34-f216-4fc4-9333-be7b66c4e4e5"}

The available batch fields. Note that while all are optional, a minimum of one must be specified:

  • uuid-field: field containing unique ID for each batch
  • invocation-time-field: field containing the start time of the batch
  • completion-time-field: field containing the end time of the batch
  • begin-marker-field: field set to true in first event
  • end-marker-field: field set to true in last event
  • line-count-field: each event gets the size of the batch
  • line-num-field: each event gets the line number within the batch

The markers are particularly useful, as evidenced in cases where you may want to only see the last line from each run (e.g., the session end time):

name: ausearch_batch

context:
session: <int>

input:
exec:
command: sudo ausearch -m USER_START,USER_END,USER_CMD,USER_LOGIN,USER_LOGOUT,LOGIN --format text --session {{session}} --start $(date '+%x')
retry:
count: 100
pause: 10s
ignore-line-breaks: true
no-strip-linefeeds: true
batch:
uuid-field: uuid
end-marker-field: last_line

actions:
- filter:
condition: last_line

output:
file:
path: /tmp/ausearch_batch-${now_time_fmt %s}.log

The resulting output:

{"_raw":"At 13:56:34 02/05/2023 robertg, acting as root, successfully ended-session /dev/tty1 ","last_line":true,"uuid":"fc074085-5975-448d-932f-7ef5d7007167"}

The filter condition is a Lua expression.

note

There are work-arounds for certain restrictions imposed by the use of Lua here. For example: We cannot say last-line or end because last-line isn't a valid Lua identifier and end is a Lua keyword.

There is always another approach — we can filter by schema, which is not restricted by Lua rules:

name: ausearch_batch

context:
session: <int>

input:
exec:
command: sudo ausearch -m USER_START,USER_END,USER_CMD,USER_LOGIN,USER_LOGOUT,LOGIN --format text --session {{session}} --start $(date '+%x')
retry:
count: 100
pause: 10s
ignore-line-breaks: true
no-strip-linefeeds: true
batch:
uuid-field: uuid
end-marker-field: last_line

actions:
- filter:
schema:
- _raw
- last_line

output:
file:
path: /tmp/ausearch_batch-${now_time_fmt %s}.log

The resulting output is the same as using condition:

{"_raw":"At 13:56:34 02/05/2023 robertg, acting as root, successfully ended-session /dev/tty1 ","last_line":true,"uuid":"fc074085-5975-448d-932f-7ef5d7007167"}

Using both markers allows us to collect all the lines generated into an array, using transaction:

name: ausearch_batch

context:
session: <int>

input:
exec:
command: sudo ausearch -m USER_START,USER_END,USER_CMD,USER_LOGIN,USER_LOGOUT,LOGIN --format text --session {{session}} --start $(date '+%x')
retry:
count: 100
pause: 10s
ignore-line-breaks: true
no-strip-linefeeds: true
batch:
uuid-field: uuid
begin-marker-field: first_line
end-marker-field: last_line

actions:
- transaction:
start-end:
start: [first_line]
end: [last_line]
marker: [session]

output:
file:
path: /tmp/ausearch_batch-${now_time_fmt %s}.log

The resulting output:

{"_marker":"session","complete":true,"duration":0,"recs":[{"_raw":"At 13:56:28 02/05/2023 system, acting as root, successfully changed-login-id-to robertg ","first_line":true,"uuid":"55819eec-4628-4684-abe9-0df4d97ff38e"},{"_raw":"At 13:56:28 02/05/2023 robertg, acting as root, successfully started-session /dev/tty1 using /usr/bin/login","uuid":"55819eec-4628-4684-abe9-0df4d97ff38e"},{"_raw":"At 13:56:28 02/05/2023 robertg successfully logged-in tty1 using /usr/bin/login","uuid":"55819eec-4628-4684-abe9-0df4d97ff38e"},{"_raw":"At 13:56:34 02/05/2023 robertg, acting as root, successfully ended-session /dev/tty1 ","last_line":true,"uuid":"55819eec-4628-4684-abe9-0df4d97ff38e"}]}

More readable output:

$> jq </tmp/ausearch_batch-1683031997.log

{
"_marker": "session",
"complete": true,
"duration": 0,
"recs": [
{
"_raw": "At 13:56:28 02/05/2023 system, acting as root, successfully changed-login-id-to robertg ",
"first_line": true,
"uuid": "55819eec-4628-4684-abe9-0df4d97ff38e"
},
{
"_raw": "At 13:56:28 02/05/2023 robertg, acting as root, successfully started-session /dev/tty1 using /usr/bin/login",
"uuid": "55819eec-4628-4684-abe9-0df4d97ff38e"
},
{
"_raw": "At 13:56:28 02/05/2023 robertg successfully logged-in tty1 using /usr/bin/login",
"uuid": "55819eec-4628-4684-abe9-0df4d97ff38e"
},
{
"_raw": "At 13:56:34 02/05/2023 robertg, acting as root, successfully ended-session /dev/tty1 ",
"last_line": true,
"uuid": "55819eec-4628-4684-abe9-0df4d97ff38e"
}
]
}

This returns a single generated event {"_marker":"session","recs":[...]} with the recs array containing the entire batch for the user login, session events, and logout.

Extending the example, Pipe and Agent tags could be used to update the context pushed from secure.netter.lan, via the Server, and another Agent, on syslog.netter.lan, can poll for the updated context (session=<int>). The Pipe will run accordingly to enrich the session events, write to file and possibly be ingested for further analytics. For secure environments, off-host logging can be enabled and processed accordingly on the remote host via an Agent that receives a session ID upon logoff via PAM.