Skip to main content

Input: s3

Stream data from a S3 Object

Field Summary

Field NameTypeDescriptionDefault
whenmessage_filterFire this input when a specific internal message occurs-
intervaldurationHow often to run the command-
croncronHow often to run the command. Note that unlike standard Cron, Pipes use a Cron syntax that includes a column for seconds. See full discussion-
immediateboolRun as soon as invoked, instead of waiting for the specified cron intervalfalse
random-offsetdurationSets a random offset to the schedule, then sticks to it0s
windowWindowFor resources that need a time window to be specified-
blockboolBlock further input schedules from triggering if the pipe output is retryingfalse
bucket-namestringThe storage service container for created blobs-
object-namesarray of stringsThe name for the blob-
object-name-fieldfieldThe field that a blob name from an operation should be stored in-
creation-time-fieldfieldThe field that the blob creation time should be stored in-
last-modified-fieldfieldThe field that the blob last modified time should be stored in-
content-length-fieldfieldThe field that the blob content length information should be stored in-
content-type-fieldfieldThe field that the blob content type information should be stored in-
etag-fieldfieldThe field that the object ETag should be stored in-
data-fieldfieldA field that the blob data should be nested in-
regionstringRegion-
endpointstringS3 Endpoint-
access-keystringAccess Key ID-
secret-keystringSecret Key ID-
security-tokenstringSecurity Token-
session-tokenstringSession Token-
timestamp-modeS3ObjectTimestampModeDerive a timestamp for this blob for filtering purposes based on the selected strategy.-
maximum-ageMaxAgeSpecifierRemove any object older than this many seconds from the candidate list-
modeS3BlockInputModeThe operating mode for this input-
fingerprintingboolEnable object fingerprinting, which will cause a object to only be downloaded oncefalse
fingerprinting-db-pathpathSpecify a custom path for the fingerprinting database-
maximum-fingerprint-ageMaxAgeSpecifierRemove any object fingerprints older than this from the tracker30 days
preprocessorsPreProcessorPreprocessors (process downloaded data before making it available to the pipeline) these processors will be run in the order they are specified-

Fields

when

Type: message_filter

Fire this input when a specific internal message occurs

This field overloads time-based scheduling with a scheduler that fires on matching messages.

Example

Pipe Language Snippet:

input:
http-poll:
when:
message-received:
filter-type:
- pipe-idle
url: "http://localhost:8888"
raw: true
ignore-line-breaks: true

interval

Type: duration

How often to run the command

By default, interval: 0s which means: once. Note that scheduled inputs set document markers. See full discussion

Example

Pipe Language Snippet:

exec:
command: echo 'once a day'
interval: 1d

cron

Type: cron

How often to run the command. Note that unlike standard Cron, Pipes use a Cron syntax that includes a column for seconds. See full discussion

Example: Once a day

Pipe Language Snippet:

exec:
command: echo 'once a day'
cron: '0 0 0 * * *'

Example: Once a day, using a convenient shortcut

Pipe Language Snippet:

exec:
command: echo 'once a day'
cron: '@daily'

immediate

Type: bool

Default: false

Run as soon as invoked, instead of waiting for the specified cron interval

Example: Run immediately on invocation, and thereafter at 10h every morning

Pipe Language Snippet:

exec:
command: echo 'hello'
immediate: true
cron: '0 0 10 * * *'

random-offset

Type: duration

Default: 0s

Sets a random offset to the schedule, then sticks to it

This can help avoid the thundering herd problem, where you do not, for example, want to overload some service at 00:00:00

Example: Would fire up to a minute after every hour

Pipe Language Snippet:

exec:
command: echo 'hello'
random-offset: 1m
cron: '0 0 * * * *'

window

Type: Window

For resources that need a time window to be specified

Field NameTypeDescriptionDefault
sizedurationWindow size-
offsetdurationWindow offset0s
start-timetimeAllows the windowing to start at a specified time-
highwatermark-filepathSpecify file where timestamp would be stored in order to resume, for when Pipe has been restarted-

  size

Type: duration

Window size

Example

Pipe Language Snippet:

exec:
command: echo 'one two'
window:
size: 1m

  offset

Type: duration

Default: 0s

Window offset

Example

Pipe Language Snippet:

exec:
command: echo 'one two'
window:
size: 1m
offset: 10s

  start-time

Type: time

Allows the windowing to start at a specified time

It should in the following format: 2019-07-10 18:45:00.000 +0200

Example

Pipe Language Snippet:

exec:
command: echo 'one two'
window:
size: 1m
start-time: 10s

  highwatermark-file

Type: path

Specify file where timestamp would be stored in order to resume, for when Pipe has been restarted

Example

Pipe Language Snippet:

exec:
command: echo 'one two'
window:
size: 1m
highwatermark-file:: /tmp/mark.txt

block

Type: bool

Default: false

Block further input schedules from triggering if the pipe output is retrying

bucket-name

Type: string

The storage service container for created blobs

object-names

Type: array of strings

The name for the blob

object-name-field

Type: field

The field that a blob name from an operation should be stored in

creation-time-field

Type: field

The field that the blob creation time should be stored in

last-modified-field

Type: field

The field that the blob last modified time should be stored in

content-length-field

Type: field

The field that the blob content length information should be stored in

content-type-field

Type: field

The field that the blob content type information should be stored in

etag-field

Type: field

The field that the object ETag should be stored in

data-field

Type: field

A field that the blob data should be nested in

region

Type: string

Region

endpoint

Type: string

S3 Endpoint

access-key

Type: string

Access Key ID

secret-key

Type: string

Secret Key ID

security-token

Type: string

Security Token

session-token

Type: string

Session Token

timestamp-mode

Type: S3ObjectTimestampMode

Derive a timestamp for this blob for filtering purposes based on the selected strategy.

Field NameTypeDescriptionDefault
noneThe default mode, do not filter object based on timestamps-
last-modifiedFilter object on the last-modified timestamp reported by the service-
blob-name-patternstringFilter blobs on the timestamp derived from the object name for example: object-name-pattern: =(?P<Y>[\\d]{4,4})-(?P<m>[\\d]{2,2})-(?P<d>[\\d]{2,2})/-

  none

The default mode, do not filter object based on timestamps

  last-modified

Filter object on the last-modified timestamp reported by the service

  blob-name-pattern

Type: string

Filter blobs on the timestamp derived from the object name for example: object-name-pattern: =(?P<Y>[\\d]{4,4})-(?P<m>[\\d]{2,2})-(?P<d>[\\d]{2,2})/

maximum-age

Type: MaxAgeSpecifier

Remove any object older than this many seconds from the candidate list

Field NameTypeDescriptionDefault
secondsintegerSpecify the maximum age in number of seconds-
durationstringSpecify the maximum age as a human readable duration (example: 1 hour)-

  seconds

Type: integer

Specify the maximum age in number of seconds

  duration

Type: string

Specify the maximum age as a human readable duration (example: 1 hour)

mode

Type: S3BlockInputMode

The operating mode for this input

Field NameTypeDescriptionDefault
list-objectsList Objects-
download-objectsDownload Given Objects-
list-and-download-objectsList Objects and Download-

  list-objects

List Objects

  download-objects

Download Given Objects

  list-and-download-objects

List Objects and Download

fingerprinting

Type: bool

Default: false

Enable object fingerprinting, which will cause a object to only be downloaded once

fingerprinting-db-path

Type: path

Specify a custom path for the fingerprinting database

maximum-fingerprint-age

Type: MaxAgeSpecifier

Default: 30 days

Remove any object fingerprints older than this from the tracker

Field NameTypeDescriptionDefault
secondsintegerSpecify the maximum age in number of seconds-
durationstringSpecify the maximum age as a human readable duration (example: 1 hour)-

  seconds

Type: integer

Specify the maximum age in number of seconds

  duration

Type: string

Specify the maximum age as a human readable duration (example: 1 hour)

preprocessors

Type: PreProcessor

Preprocessors (process downloaded data before making it available to the pipeline) these processors will be run in the order they are specified

Field NameTypeDescriptionDefault
extensionPreprocess the object or blob based on the extension of the object or blob name (.gz, .parquet)-
gzipUnGzip the received data-
parquetExtract the received data as JSON rows from a parquet file-

  extension

Preprocess the object or blob based on the extension of the object or blob name (.gz, .parquet)

  gzip

UnGzip the received data

  parquet

Extract the received data as JSON rows from a parquet file