SysFlow Processor (sf-processor repo)

The SysFlow processor is a lighweight edge analytics pipeline that can process and enrich SysFlow data. The processor is written in golang, and allows users to build and configure various pipelines using a set of built-in and custom plugins and drivers. Pipeline plugins are producer-consumer objects that follow an interface and pass data to one another through pre-defined channels in a multi-threaded environment. By contrast, a driver represents a data source, which pushes data to the plugins. The processor currently supports two builtin drivers, including one that reads sysflow from a file, and another that reads streaming sysflow over a domain socket. Plugins and drivers are configured using a JSON file.

A core built-in plugin is a policy engine that can apply logical rules to filter, alert, or semantically label sysflow records using a declarative language based on the Falco rules syntax with a few added extensions (more on this later).

Custom plugins and drivers can be implemented as dynamic libraries to tailor analytics to specific user requirements.

The endpoint of a pipeline configuration is an exporter plugin that sends the processed data to a target. The processor supports various types of export plugins for a variety of different targets.

Pre-requisites

The processor has been tested on Ubuntu/RHEL distributions, but should work on any Linux system.

  • Golang version 1.17+ and make (if building from sources)

  • Docker, docker-compose (if building with docker)

Build

Clone the processor repository

git clone https://github.com/sysflow-telemetry/sf-processor.git

Build locally, from sources

cd sf-processor
make build

Build with docker

cd sf-processor
make docker-build

Usage

For usage information, type:

cd driver/
./sfprocessor -help

This should yield the following usage statement:

Usage: sfprocessor [[-version]|[-driver <value>] [-log <value>] [-driverdir <value>] [-plugdir <value>] path]
Positional arguments:
  path string
        Input path
Arguments:
  -config string
        Path to pipeline configuration file (default "pipeline.json")
  -cpuprofile file
        Write cpu profile to file
  -driver string
        Driver name {file|socket|<custom>} (default "file")
  -driverdir string
        Dynamic driver directory (default "../resources/drivers")
  -log string
        Log level {trace|info|warn|error} (default "info")
  -memprofile file
        Write memory profile to file
  -plugdir string
        Dynamic plugins directory (default "../resources/plugins")
  -test
        Test pipeline configuration
  -traceprofile file
        Write trace profile to file
  -version
        Output version information

The four most important flags are config, driverdir, plugdir, and driver. The config flag points to a pipeline configuration file, which describes the entire pipeline and settings for the individual settings for the plugins. The driverdir and plugdir flags specify where any dynamic drivers and plugins shared libraries reside that should be loaded by the processor at runtime. The driver flag accepts a label to a pre-configured driver (either built-in or custom) that will be used as the data source to the pipeline. Currently, the pipeline only supports one driver at a time, but we anticipate handling multiple drivers in the future. There are two built-in drivers:

  • file: loads a sysflow file reading driver that reads from path.

  • socket: the processor loads a sysflow streaming driver. The driver creates a domain socket named path and acts as a server waiting for a SysFlow collector to attach and send sysflow data.

Configuration

The pipeline configuration below shows how to configure a pipeline that will read a sysflow stream and push records to the policy engine, which will trigger alerts using a set of runtime policies stored in a yaml file. An example pipeline with this configuration looks as follows:

{
  "pipeline":[
    {
     "processor": "sysflowreader",
     "handler": "flattener",
     "in": "sysflow sysflowchan",
     "out": "flat flattenerchan"
    },
    {
     "processor": "policyengine",
     "in": "flat flattenerchan",
     "out": "evt eventchan",
     "policies": "../resources/policies/runtimeintegrity"
    },
    {
     "processor": "exporter",
     "in": "evt eventchan",
     "export": "syslog",
     "proto": "tcp",
     "tag": "sysflow",
     "host": "localhost",
     "port": "514"
    }
  ]
}

NOTE: This configuration can be found in: sf-collector/resources/pipelines/pipeline.syslog.json

This pipeline specifies three built-in plugins:

  • sysflowreader: is a generic reader plugin that ingests sysflow from the driver, caches entities, and presents sysflow objects to a handler object (i.e., an object that implements the handler interface) for processing. In this case, we are using the flattener handler, but custom handlers are possible.

  • policyengine: is the policy engine, which takes flattened (row-oriented) SysFlow records as input and outputs records, which represent alerts, or filtered sysflow records depending on the policy engine’s mode (more on this later).

  • exporter: takes records from the policy engine, and exports them to ElasticSearch, syslog, file, or terminal, in a JSON format or in Elastic Common Schema (ECS) format. Note that custom export plugins can be created to export to other serialization formats and transport protocols.

Each plugin has a set of general attributes that are present in all plugins, and a set of attributes that are custom to the specific plugins. For more details on the specific attributes in this example, see the pipeline configuration template

The general attributes are as follows:

  • processor (required): the name of the processor plugin to load. Processors must implement the SFProcessor interface; the name is the value that must be returned from the GetName() function as defined in the processor object.

  • handler (optional): the name of the handler object to be used for the processor. Handlers must implement the SFHandler interface.

  • in (required): the input channel (i.e. golang channel) of objects that are passed to the plugin.

  • out (optional): the output channel (i.e. golang channel) for objects that are pushed out of the plugin, and into the next plugin in the pipeline sequence.

Channels are modelled as channel objects that have an In attribute representing some golang channel of objects. See SFChannel for an example. The syntax for a channel in the pipeline is [channel name] [channel type]. Where channel type is the label given to the channel type at plugin registration (more on this later), and channel name is a unique identifier for the current channel instance. The name and type of an output channel in one plugin must match that of the name and type of the input channel of the next plugin in the pipeline sequence.

NOTE: A plugin has exacly one input channel but it may specify more than one output channels. This allows pipeline definitions that fan out data to more than one receiver plugin similar to a Unix tee command. While there must be always one SysFlow reader acting as the entry point of a pipeline, a pipeline configuration may specify policy engines passing data to different exporters or a SysFlow reader passing data to different policy engines. Generally, pipelines form a tree rather being a linear structure.

Policy engine configuration

The policy engine ("processor": "policyengine") plugin is driven by a set of rules. These rules are specified in a YAML file which adopts the same syntax as the rules of the Falco project. A policy engine plugin specification may have the following attributes:

  • policies (required for alert mode`): The path to the YAML rules specification file. More information on rules can be found in the Policies section.

  • mode (optional): The mode of the policy engine. Allowed values are:

    • alert (default): the policy engine generates rule-based alerts; alert is a blocking mode that drops all records that do not match any given rule. If no mode is specified, the policy engine runs in alert mode by default.

    • enrich for enriching records with additional context from the rule. In contrast to alert, this is a non-blocking mode which applies tagging and action enrichments to matching records as defined in the policy file. Non-matching records are passed on “as is”.

  • monitor (optional): Specifies if changes to the policy file(s) should be monitored and updated in the policy engine.

    • none (default): no monitor is used.

    • local: the processor will monitor for changes in the policies path and update its rule set if changes are detected.

  • monitor.interval (optional): The interval in seconds for updating policies, if a monitor is used. (default: 30 seconds).

  • concurrency (optional); The number of concurrent threads for record processing. (default: 5).

  • actiondir (optional): The path of the directory containing the shared object files for user-defined action plugins. See the section on User-defined Actions for more information.

NOTE: Prior to release 0.4.0, the mode attribute accepted different values with different semantics. To preserve the behavior of older releases:

  • For old alert behavior, use enrich mode.

  • For old filter behavior, use enrich mode and a policy file with filter rules only.

  • For old bypass behavior, use enrich and drop the policies key from the configuration.

Exporter configuration

An exporter ("processor": "exporter") plugin consists of two modules, an encoder for converting the data to a suitable format, and a transport module for sending the data to the target. Encoders target specific, i.e. for a particular export target a particular set of encoders may be used. In the exporter configuration the transport module is specified via the export parameter (required). The encoder is selected via the format parameter (optional). The default format is json.

The following table lists the currently supported exporter modules and the corresponding encoders. Additional encoders and transport modules can be implemented if need arises. If you plan to contribute or want to get involved in the discussion please join the SysFlow community.

Some of these combinations require additional configuration as described in the following sections. null is used for debugging the processor and doesn’t export any data.

File

If export is set to file, an additional parameter file.path allows the specification of the target file.

Syslog

If the export parameter is set to syslog, output to syslog is enabled and the following addtional parameters are used:

  • syslog.proto (optional): The protocol used for communicating with the syslog server. Allows values are tcp, tls and udp. Default is tcp.

  • syslog.tag (optional): The tag used for each Sysflow record in syslog. Default is SysFlow.

  • syslog.source (optional): If set adds a hostname to the syslog header.

  • syslog.host (optional): The hostname of the sysflow server. Default is localhost.

  • syslog.port (optional): The port of the syslow server. Default is 514.

ElasticSearch

Export to ElasticSearch is enabled by setting the config parameter export to es. The only supported format for export to ElasticSearch is ecs.

Data export is done via bulk ingestion. The ingestion can be controlled by some additional parameters which are read when the es export target is selected. Required parameters specify the ES target, index and credentials. Optional parameters control some aspects of the behavior of the bulk ingestion and may have an effect on performance. You may need to adapt their valuesfor optimal performance in your environment.

  • es.addresses (required): A comma-separated list of ES endpoints.

  • es.index (required): The name of the ES index to ingest into.

  • es.username (required): The ES username.

  • es.password (required): The password for the specified ES user.

  • buffer (optional) The bulk size as the number of records to be ingested at once. Default is 0 but value of 0 indicates record-by-record ingestion which may be highly inefficient.

  • es.bulk.numWorkers (optional): The number of ingestion workers used in parallel. Default is 0 which means that the exporter uses as many workers as there are cores in the machine.

  • es.bulk.flashBuffer (optional): The size in bytes of the flush buffer for ingestion. It should be large enough to hold one bulk (the number of records specified in buffer), otherwise the bulk is broken into smaller chunks. Default is 5e+6.

  • es.bulk.flushTimeout (optional): The flush buffer time threshold. Valid values are golang duration strings. Default is 30s.

The Elastic exporter does not make any assumption on the existence or configuration of the index specified in es.index. If the index does not exist, Elastic will automatically create it and apply a default dynamic mapping. It may be beneficial to use an explicit mapping for the ECS data generated by the Elastic exporter. For convinience we provide an explicit mapping for creating a new tailored index in Elastic. For more information refer to the Elastic Mapping reference.

Environment variables

It is possible to override any of the custom attributes of a plugin using an environment variable. This is especially useful when operating the processor as a container, where you may have to deploy the processor to multiple nodes, and have attributes that change per node. If an environment variable is set, it overrides the setting inside the config file. The environment variables must follow the following structure:

  • Environment variables must follow the naming schema <PLUGIN NAME>_<CONFIG ATTRIBUTE NAME>

  • The plugin name inside the pipeline configuration file must be all lower case.

For example, to set the alert mode inside the policy engine, the following environment variable is set:

export POLICYENGINE_MODE=alert

To set the syslog values for the exporter:

export EXPORTER_TYPE=telemetry
export EXPORTER_SOURCE=${HOSTNAME}
export EXPORTER_EXPORT=syslog
export EXPORTER_HOST=192.168.2.10
export EXPORTER_PORT=514

If running as a docker container, environment variables can be passed with the docker run command:

docker run
-e EXPORTER_TYPE=telemetry \
-e EXPORTER_SOURCE=${HOSTNAME} \
-e EXPORTER_EXPORT=syslog \
-e EXPORTER_HOST=192.168.2.10 \
-e EXPORTER_PORT=514
...

Rate limiter configuration (experimental)

The flattener handler has a built-in time decay filter that can be enabled to reduce even rates in the processor. The filter uses a time-decay bloom filter based on a semantic hashing of records. This means that the filter should only forward one record matching a semantic hash per time decay period. The semantic hash takes into consideration process, flow and event attributes. To enable rate limiting, modify the sysflowreader processor as follows:

{
     "processor": "sysflowreader",
     "handler": "flattener",
     "in": "sysflow sysflowchan",
     "out": "flat flattenerchan",
     "filter.enabled": "on|off (default: off)",
     "filter.maxage": "time decay in minutes (default: 24H)"
}

Policy Language

The policy engine adopts and extends the Falco rules definition syntax. Before reading the rest of this section, please go through the Falco Rules documentation to get familiar with rule, macro, and list syntax, all of which are supported in our policy engine. Policies are written in one or more yaml files, and stored in a directory specified in the pipeline configuration file under the policies attribute of the policy engine plugin.

Rules contain the following fields:

  • rule: the name of the rule

  • description: a textual description of the rule

  • condition: a set of logical operations that can reference lists and macros, which when evaluating to true, can trigger record enrichment or alert creation (depending on the policy engine mode)

  • action: a comma-separated list of actions to take place when the rule evaluates to true. For a particular rule, actions are evaluated in the order they are specified, i.e., an action can make use of the results provided by earlier actions. An action is just the name of an action function without any parameters. The current version only supports plugable user-defined actions. See here for a detailed description of the plugin interface and a sample action plugin.

  • priority: label representing the severity of the alert can be: (1) low, medium, or high, or (2) emergency, alert, critical, error, warning, notice, informational, debug.

  • tags (optional): set of labels appended to alert (default: empty).

  • prefilter (optional): list of record types (sf.type) to whitelist before applying rule condition (default: empty).

  • enabled (optional): indicates whether the rule is enabled (default: true).

NOTE: The syntax of the policy language changed slighly with the switch to release 0.4.0. For migrating policy files used with prior releases to release 0.4.0 or higher, simply remove all action: [tag] lines. As of release 0.4.0, tagging is done automatically. If a rule triggers all tags specified via the tags key will be appended to the record. The action key is reserved for specifying user-defined action plugins.</p>

Macros are named conditions and contain the following fields:

  • macro: the name of the macro

  • condition: a set of logical operations that can reference lists and macros, which evaluate to true or false

Lists are named collections and contain the following fields:

  • list: the name of the list

  • items: a collection of values or lists

Drop rules block records matching a condition and can be used for reducing the amount of records processed by the policy engine:

  • drop: the name of the filter

  • condition: a set of logical operations that can reference lists and macros, which evaluate to true or false

For example, the rule below specifies that matching records are process events (sf.type = PE), denoting EXEC operations (sf.opflags = EXEC) for which the process matches macro package_installers. Additionally, the global filter containers preemptively removes from the processing stream any records for processes not running in a container environment.

# lists
- list: rpm_binaries
  items: [dnf, rpm, rpmkey, yum, '"75-system-updat"', rhsmcertd-worke, subscription-ma,
          repoquery, rpmkeys, rpmq, yum-cron, yum-config-mana, yum-debug-dump,
          abrt-action-sav, rpmdb_stat, microdnf, rhn_check, yumdb]

- list: deb_binaries
  items: [dpkg, dpkg-preconfigu, dpkg-reconfigur, dpkg-divert, apt, apt-get, aptitude,
    frontend, preinst, add-apt-reposit, apt-auto-remova, apt-key,
    apt-listchanges, unattended-upgr, apt-add-reposit]

- list: package_mgmt_binaries
  items: [rpm_binaries, deb_binaries, update-alternat, gem, pip, pip3, sane-utils.post, alternatives, chef-client]

# macros
- macro: package_installers
  condition: sf.proc.name pmatch (package_mgmt_binaries)

# global filters (blacklisting)
- filter: containers
  condition: sf.container.type = host

# rule definitions
- rule: Package installer detected
  desc: Use of package installer detected
  condition:  sf.opflags = EXEC and package_installers
  priority: medium
  tags: [actionable-offense, suspicious-process]
  prefilter: [PE] # record types for which this rule should be applied (whitelisting)
  enabled: true

Attribute names

The following table shows a detailed list of attribute names supported by the policy engine, as well as their type, and comparative Falco attribute name. Our policy engine supports both SysFlow and Falco attribute naming convention to enable reuse of policies across the two frameworks.

Attributes

Description

Values

Falco Attribute

sf.type

Record type

PE,PF,NF,FF,FE,KE

N/A

sf.opflags

Operation flags

Operation Flags List: remove OP_ prefix

evt.type (remapped as falco event types)

sf.ret

Return code

int

evt.res

sf.ts

start timestamp(ns)

int64

evt.time

sf.endts

end timestamp(ns)

int64

N/A

sf.proc.pid

Process PID

int64

proc.pid

sf.proc.tid

Thread PID

int64

thread.tid

sf.proc.uid

Process user ID

int

user.uid

sf.proc.user

Process user name

string

user.name

sf.proc.gid

Process group ID

int

group.gid

sf.proc.group

Process group name

string

group.name

sf.proc.apid

Proc ancestors PIDs (qo)

int64

proc.apid

sf.proc.aname

Proc anctrs names (qo) (exclude path)

string

proc.aname

sf.proc.exe

Process command/filename (with path)

string

proc.exe

sf.proc.args

Process command arguments

string

proc.args

sf.proc.name

Process name (qo) (exclude path)

string

proc.name

sf.proc.cmdline

Process command line (qo)

string

proc.cmdline

sf.proc.tty

Process TTY status

boolean

proc.tty

sf.proc.entry

Process container entrypoint

bool

proc.vpid == 1

sf.proc.createts

Process creation timestamp (ns)

int64

N/A

sf.pproc.pid

Parent process ID

int64

proc.ppid

sf.pproc.gid

Parent process group ID

int64

N/A

sf.pproc.uid

Parent process user ID

int64

N/A

sf.pproc.group

Parent process group name

string

N/A

sf.pproc.tty

Parent process TTY status

bool

N/A

sf.pproc.entry

Parent process container entry

bool

N/A

sf.pproc.user

Parent process user name

string

N/A

sf.pproc.exe

Parent process command/filename

string

N/A

sf.pproc.args

Parent process command arguments

string

N/A

sf.pproc.name

Parent process name (qo) (no path)

string

proc.pname

sf.pproc.cmdline

Parent process command line (qo)

string

proc.pcmdline

sf.pproc.createts

Parent process creation timestamp

int64

N/A

sf.file.fd

File descriptor number

int

fd.num

sf.file.path

File path

string

fd.name

sf.file.newpath

New file path (used in some FileEvents)

string

N/A

sf.file.name

File name (qo)

string

fd.filename

sf.file.directory

File directory (qo)

string

fd.directory

sf.file.type

File type

char ‘f’: file, 4: IPv4, 6: IPv6, ‘u’: unix socket, ‘p’: pipe, ‘e’: eventfd, ‘s’: signalfd, ‘l’: eventpoll, ‘i’: inotify, ‘o’: unknown.

fd.typechar

sf.file.is_open_write

File open with write flag (qo)

bool

evt.is_open_write

sf.file.is_open_read

File open with read flag (qo)

bool

evt.is_open_read

sf.file.openflags

File open flags

int

evt.args

sf.net.proto

Network protocol

int

fd.l4proto

sf.net.sport

Source port

int

fd.sport

sf.net.dport

Destination port

int

fd.dport

sf.net.port

Src or Dst port (qo)

int

fd.port

sf.net.sip

Source IP

int

fd.sip

sf.net.dip

Destination IP

int

fd.dip

sf.net.ip

Src or dst IP (qo)

int

fd.ip

sf.res

File or network resource

string

fd.name

sf.flow.rbytes

Flow bytes read/received

int64

evt.res

sf.flow.rops

Flow operations read/received

int64

N/A

sf.flow.wbytes

Flow bytes written/sent

int64

evt.res

sf.flow.wops

Flow bytes written/sent

int64

N/A

sf.container.id

Container ID

string

container.id

sf.container.name

Container name

string

container.name

sf.container.image.id

Container image ID

string

container.image.id

sf.container.image

Container image name

string

container.image

sf.container.type

Container type

CT_DOCKER, CT_LXC, CT_LIBVIRT_LXC, CT_MESOS, CT_RKT, CT_CUSTOM, CT_CRI, CT_CONTAINERD, CT_CRIO, CT_BPM

container.type

sf.container.privileged

Container privilege status

bool

container.privileged

sf.pod.ts

Pod creation timestamp

int

N/A

sf.pod.id

Pod id

string

N/A

sf.pod.name

Pod name

string

N/A

sf.pod.nodename

Pod node name

string

N/A

sf.pod.namespace

Pod namespace

string

N/A

sf.pod.restartcnt

Pod restart count

int

N/A

sf.pod.hostip

Pod host IP addresses

json

N/A

sf.pod.internalip

Pod internal IP addresses

json

N/A

sf.pod.services

Pod services

json

N/A

sf.ke.action

Kubernetes event action

K8S_COMPONENT_ADDED, K8S_COMPONENT_MODIFIED, K8S_COMPONENT_DELETED, K8S_COMPONENT_ERROR, K8S_COMPONENTNONEXISTENT, K8S_COMPONENT_UNKNOWN

N/A

sf.ke.kind

Kubernetes event resource type

K8S_NODES, K8S_NAMESPACES, K8S_PODS, K8S_REPLICATIONCONTROLLERS, K8S_SERVICES, K8S_EVENTS, K8S_REPLICASETS, K8S_DAEMONSETS, K8S_DEPLOYMENT, K8S_UNKNOWN

N/A

sf.ke.message

Kubernetes event json message

json

N/A

sf.node.id

Node identifier

string

N/A

sf.node.ip

Node IP address

string

N/A

sf.schema.version

SysFlow schema version

string

N/A

sf.version

SysFlow JSON schema version

int

N/A

$ Jsonpath Expressions

Unlike attributes of the scalar types bool, int(64), and string, attributes of type json contain structured information in form of stringified json records. The policy language allows access to subfields inside such json records via GJSON jsonpath expressions. The jsonpath expression must be specified as a suffix to the attribute enclosed in square brackets. Examples of such terms are:

sf.pod.services[0.clusterip.0]   - the first cluster IP address of the first service associated with  a pod
sf.ke.message[items.0.namespace] - the namespace of the first item in a KE message attribute

See the GJSON path synax for more details. The result of applying a jsonpath expression to a json attribute is always of type string.

Operations

The policy language supports the following operations:

Operation

Description

Example

A and B

Returns true if both statements are true

sf.pproc.name=bash and sf.pproc.cmdline contains echo

A or B

Returns true if one of the statements are true

sf.file.path = “/etc/passwd” or sf.file.path = “/etc/shadow”

not A

Returns true if the statement isn’t true

not sf.pproc.exe = /usr/local/sbin/runc

A = B

Returns true if A exactly matches B. Note, if B is a list, A only has to exact match one element of the list. If B is a list, it must be explicit. It cannot be a variable. If B is a variable use in instead.

sf.file.path = [“/etc/passwd”, “/etc/shadow”]

A != B

Returns true if A is not equal to B. Note, if B is a list, A only has to be not equal to one element of the list. If B is a list, it must be explicit. It cannot be a variable.

sf.file.path != “/etc/passwd”

A < B

Returns true if A is less than B. Note, if B is a list, A only has to be less than one element in the list. If B is a list, it must be explicit. It cannot be a variable.

sf.flow.wops < 1000

A <= B

Returns true if A is less than or equal to B. Note, if B is a list, A only has to be less than or equal to one element in the list. If B is a list, it must be explicit. It cannot be a variable.

sf.flow.wops <= 1000

A > B

Returns true if A is greater than B. Note, if B is a list, A only has to be greater than one element in the list. If B is a list, it must be explicit. It cannot be a variable.

sf.flow.wops > 1000

A >= B

Returns true if A is greater than or equal to B. Note, if B is a list, A only has to be greater than or equal to one element in the list. If B is a list, it must be explicit. It cannot be a variable.

sf.flow.wops >= 1000

A in B

Returns true if value A is an exact match to one of the elements in list B. Note: B must be a list. Note: () can be used on B to merge multiple list objects into one list.

sf.proc.exe in (bin_binaries, usr_bin_binaries)

A startswith B

Returns true if string A starts with string B

sf.file.path startswith ‘/home’

A endswith B

Returns true if string A ends with string B

sf.file.path endswith ‘.json’

A contains B

Returns true if string A contains string B

sf.pproc.name=java and sf.pproc.cmdline contains org.apache.hadoop

A icontains B

Returns true if string A contains string B ignoring capitalization

sf.pproc.name=java and sf.pproc.cmdline icontains org.apache.hadooP

A pmatch B

Returns true if string A partial matches one of the elements in B. Note: B must be a list. Note: () can be used on B to merge multiple list objects into one list.

sf.proc.name pmatch (modify_passwd_binaries, verify_passwd_binaries, user_util_binaries)

exists A

Checks if A is not a zero value (i.e. 0 for int, “” for string)

exists sf.file.path

See the resources policies directory in github for examples. Feel free to contribute new and interesting rules through a github pull request.

User-defined Actions

User-defined actions are implemented via the golang plugin mechanism. Check the documentation on Action Plugins for a custom action plugin example.

Plugins

In addition to its core plugins, the processor also supports custom plugins that can be dynamically loaded into the processor via a compiled golang shared library using the golang plugin package. Custom plugins enable easy extension of the processor and the creation of custom pipelines tailored to specific use cases.

The processor supports four types of plugins:

  • drivers: enable the ingestion of different telemetry sources into the processor pipeline.

  • processors: enable the creation of custom data processing and analytic plugins to extend sf-processor pipelines.

  • handlers: enable the creation of custom SysFlow record handling plugins.

  • actions: enable the creation of custom action plugins to extend sf-processor’s policy engine.

Pre-requisites

  • Go 1.17 (if building locally, without the plugin builder)

Processor Plugins

User-defined plugins can be plugged and extend the sf-processor pipeline. These are the most generic type of plugins, from which all built-in processor plugins are build. Check the core package for examples. We have built-in processor plugins for flattening the telemetry stream, implementing a policy engine, and creating event exporters.

Interface

Processor plugins (or just plugins) are implemented via the golang plugin mechanism. A plugin must implement the following interface, defined in the github.com/sysflow-telemetry/sf-apis/go/plugins package.

// SFProcessor defines the SysFlow processor interface.
type SFProcessor interface {
  Register(pc SFPluginCache)
  Init(conf map[string]interface{}) error
  Process(ch interface{}, wg *sync.WaitGroup)
  GetName() string
  SetOutChan(ch []interface{})
  Cleanup()
}

The Process function is the main function of the plugin.It’s where the “main loop” of the plugin should be implemented. It receives the input channel configured in the custom plugin’s block in the pipeline configuration. It also received the pepeline thread WaitGroup. Custom processing code should be implemented using this function. Init is called once, when the pipeline is loaded. Cleanup is called when the pipeline is terminated. SetOutChannel receives a slice with the output channels configured in the plugin’s block in the pipeline configuration.

When loading a pipeline, sf-processor performs a series of health checks before the pipeline is enabled. If these health checks fail, the processor terminates. To enable health checks on custom plugins, implement the Test function defined in the interface below. For an example, check core/exporter/exporter.go.

// SFTestableProcessor defines a testable SysFlow processor interface.
type SFTestableProcessor interface {
  SFProcessor
  Test() (bool, error)
}

Example

A dynamic plugin example is provided in github. The core of the plugin is building an object that implements an SFProcessor interface. Such an implementation looks as follows:

package main

import (
    "sync"

    "github.com/sysflow-telemetry/sf-apis/go/logger"
    "github.com/sysflow-telemetry/sf-apis/go/plugins"
    "github.com/sysflow-telemetry/sf-apis/go/sfgo"
    "github.com/sysflow-telemetry/sf-processor/core/flattener"
)

const (
    pluginName string = "example"
)

// Plugin exports a symbol for this plugin.
var Plugin Example

// Example defines an example plugin.
type Example struct{}

// NewExample creates a new plugin instance.
func NewExample() plugins.SFProcessor {
    return new(Example)
}

// GetName returns the plugin name.
func (s *Example) GetName() string {
    return pluginName
}

// Init initializes the plugin with a configuration map.
func (s *Example) Init(conf map[string]interface{}) error {
    return nil
}

// Register registers plugin to plugin cache.
func (s *Example) Register(pc plugins.SFPluginCache) {
    pc.AddProcessor(pluginName, NewExample)
}

// Process implements the main interface of the plugin.
func (s *Example) Process(ch interface{}, wg *sync.WaitGroup) {
    cha := ch.(*flattener.FlatChannel)
    record := cha.In
    logger.Trace.Println("Example channel capacity:", cap(record))
    defer wg.Done()
    logger.Trace.Println("Starting Example")
    for {
        fc, ok := <-record
        if !ok {
            logger.Trace.Println("Channel closed. Shutting down.")
            break
        }
        if fc.Ints[sfgo.SYSFLOW_IDX][sfgo.SF_REC_TYPE] == sfgo.PROC_EVT {
            logger.Info.Printf("Process Event: %s, %d", fc.Strs[sfgo.SYSFLOW_IDX][sfgo.PROC_EXE_STR], fc.Ints[sfgo.SYSFLOW_IDX][sfgo.EV_PROC_TID_INT])
        }
    }
    logger.Trace.Println("Exiting Example")
}

// SetOutChan sets the output channel of the plugin.
func (s *Example) SetOutChan(ch []interface{}) {}

// Cleanup tears down plugin resources.
func (s *Example) Cleanup() {}

// This function is not run when module is used as a plugin.
func main() {}

The custom plugin must implement the following interface:

  • GetName() - returns a lowercase string representing the plugin’s label. This label is important, because it identifies the plugin in the pipeline.json file, enabling the processor to load the plugin. In the object above, this plugin is called example. Note that the label must be unique.

  • Init(conf map[string]interface{}) error - used to initialize the plugin. The configuration map that is passed to the function stores all the configuration information defined in the plugin’s definition inside pipeline.json (more on this later).

  • Register(pc plugins.SFPluginCache) - this registers the plugin with the plugin cache of the processor.

    • pc.AddProcessor(pluginName, <plugin constructor function>) (required) - registers the plugin named example with the processor. You must define a constructor function using the convention New<PluginName> which is used to instantiate the plugin, and returns it as an SFProcessor interface - see NewExample for an example.

    • pc.AddChannel(channelName, <output channel constructor function>) (optional) - if your plugin is using a custom output channel of objects (i.e., the channel used to pass output objects from this plugin to the next in the pipeline), it should be included in this plugin.

      • The channelName should be a lowercase unique label defining the channel type.

      • The constructor function should return a golang interface{} representing an object that as an In attribute of type chan <ObjectToBePassed>. We will call this object, a wrapped channel object going forward. For example, the channel object that passes sysflow objects is called SFChannel, and is defined here

      • For a complete example of defining an output channel, see NewFlattenerChan in the flattener as well as the Register function. The FlatChannel is defined here

  • Process(ch interface{}, wg *sync.WaitGroup) - this function is launched by the processor as a go thread and is where the main plugin processing occurs. It takes a wrapped channel object, which acts as the input data source to the plugin (i.e., this is the channel that is configured as the input channel to the plugin in the pipeline.json). It also takes a sync.WaitGroup object, which is used to signal to the processor when the plugin has completed running (see defer wg.Done() in code). The processor must loop on the input channel, and do its analysis on each input record. In this case, the example plugin is reading flat records and printing them to the screen.

  • SetOutChan(ch []interface{}) - sets the wrapped channels that will serve as the output channels for the plugin. The output channels are instantiated by the processor, which is also in charge of stitching the plugins together. If the plugin is the last one in the chain, then this function can be left empty. See the SetOutputChan function in the flattener to see how an output channel is implemented.

  • Cleanup() - Used to cleanup any resources. This function is called by the processor after the plugin Process function exits. One of the key items to close in the Cleanup function is the output channel using the golang close() function. Closing the output channel enables the pipeline to be torn down gracefully and in sequence.

  • main(){} - this main method is not used by the plugin or processor. It’s required by golang in order to be able to compile as a shared object.

To compile the example plugin, use the provided Makefile:

make -C plugins/processors/example

This will build the plugin and copy it into resources/plugins/.

To use the new plugin, use the configuration provided in github, which defines the following pipeline:

{
   "pipeline":[
     {
      "processor": "sysflowreader",
      "handler": "flattener",
      "in": "sysflow sysflowchan",
      "out": "flat flattenerchan"
     },
     {
      "processor": "example",
      "in": "flat flattenerchan"
     }
   ]
}

This pipeline contains two plugins:

  • The builtin sysflowReader plugin with flattener handler, which takes raw sysflow objects, and flattens them

    into arrays of integers and strings for easier processing in certain plugins like the policy engine.

  • The example plugin, which takes the flattened output from the sysflowreader plugin, and prints it the screen.

The key item to note is that the output channel (i.e., out) of sysflowreader matches the input channel (i.e., in) of the example plugin. This ensures that the plugins will be properly stitched together.

Build

The example plugin is a custom plugin that illustrates how to implement a minimal plugin that reads the records from the input channel and logs them to the standard output.

To run this example, in the root of sf-processor, build the processor and the example plugin. Note, this plugin’s shared object is generated in resources/plugins/example.so.

make build && make -C plugins/processors/example

Then, run:

cd driver && ./sfprocessor -log=info -config=../plugins/processors/example/pipeline.example.json ../resources/traces/tcp.sf

Plugin builder

To build the plugin for release, Go requires the code to be compiled with the exact package versions that the SysFlow processor was compiled with. The easiest way to achieve this is to use the pre-built plugin-builder Docker image in your build. This option also works for building plugins for deployment with the SysFlow binary packages.

Below is an example of how this can be achieved. Set $TAG to a SysFlow release (>=0.4.0), edge, or dev.

First, build the plugin:

docker run --rm \
    -v $(pwd)/plugins:/go/src/github.com/sysflow-telemetry/sf-processor/plugins \
    -v $(pwd)/resources:/go/src/github.com/sysflow-telemetry/sf-processor/resources \
    sysflowtelemetry/plugin-builder:$TAG \
    make -C /go/src/github.com/sysflow-telemetry/sf-processor/plugins/processors/example

To test it, run the pre-built processor with the example configuration and trace.

docker run --rm \
    -v $(pwd)/plugins:/usr/local/sysflow/plugins \
    -v $(pwd)/resources:/usr/local/sysflow/resources \
    -w /usr/local/sysflow/bin \
    --entrypoint=/usr/local/sysflow/bin/sfprocessor \
    sysflowtelemetry/sf-processor:$TAG \
    -log=info -config=../plugins/processors/example/pipeline.example.json ../resources/traces/tcp.sf

The output on the above pre-recorded trace should look like this:

[Health] 2022/02/21 12:55:19 pipeline.go:246: Health checks: passed
[Info] 2022/02/21 12:55:19 main.go:147: Successfully loaded pipeline configuration
[Info] 2022/02/21 12:55:19 pipeline.go:170: Starting the processing pipeline
[Info] 2022/02/21 12:55:19 example.go:75: Process Event: ./server, 13823
[Info] 2022/02/21 12:55:19 example.go:75: Process Event: ./client, 13824
[Info] 2022/02/21 12:55:19 example.go:75: Process Event: ./client, 13824
[Info] 2022/02/21 12:55:19 example.go:75: Process Event: ./server, 13823

Handler Plugins

User-defined handler modules can be plugged to the built-in SysFlow processor plugin to implement custom data processing and analytic pipelines.

Interface

Handlers are implemented via the golang plugin mechanism. A handler must implement the following interface, defined in the github.com/sysflow-telemetry/sf-apis/go/plugins package.

// SFHandler defines the SysFlow handler interface.
type SFHandler interface {
  RegisterChannel(pc SFPluginCache)
  RegisterHandler(hc SFHandlerCache)
  Init(conf map[string]interface{}) error
  IsEntityEnabled() bool
  HandleHeader(sf *CtxSysFlow, hdr *sfgo.SFHeader) error
  HandleContainer(sf *CtxSysFlow, cont *sfgo.Container) error
  HandleProcess(sf *CtxSysFlow, proc *sfgo.Process) error
  HandleFile(sf *CtxSysFlow, file *sfgo.File) error
  HandleNetFlow(sf *CtxSysFlow, nf *sfgo.NetworkFlow) error
  HandleNetEvt(sf *CtxSysFlow, ne *sfgo.NetworkEvent) error
  HandleFileFlow(sf *CtxSysFlow, ff *sfgo.FileFlow) error
  HandleFileEvt(sf *CtxSysFlow, fe *sfgo.FileEvent) error
  HandleProcFlow(sf *CtxSysFlow, pf *sfgo.ProcessFlow) error
  HandleProcEvt(sf *CtxSysFlow, pe *sfgo.ProcessEvent) error
  SetOutChan(ch []interface{})
  Cleanup()
}

Each Handle* function receives the current SysFlow record being processed along with its corresponding parsed record type. Custom processing code should be implemented using these functions.

Build

The printer handler is a pluggable handler that logs select SysFlow records to the standard output. This plugin doesn’t define any output channels, so it acts as a plugin sink (last plugin in a pipeline).

To run this example, in the root of sf-processor, build the processor and the handler plugin. Note, this plugin’s shared object is generated in resources/handlers/printer.so.

make build && make -C plugins/handlers/printer

Then, run:

cd driver && ./sfprocessor -log=info -config=../plugins/handlers/printer/pipeline.printer.json ../resources/traces/tcp.sf

Plugin builder

To build the plugin for release, Go requires the code to be compiled with the exact package versions that the SysFlow processor was compiled with. The easiest way to achieve this is to use the pre-built plugin-builder Docker image in your build. This option also works for building plugins for deployment with the SysFlow binary packages.

Below is an example of how this can be achieved. Set $TAG to a SysFlow release (>=0.4.0), edge, or dev.

First, build the plugin:

docker run --rm \
    -v $(pwd)/plugins:/go/src/github.com/sysflow-telemetry/sf-processor/plugins \
    -v $(pwd)/resources:/go/src/github.com/sysflow-telemetry/sf-processor/resources \
    sysflowtelemetry/plugin-builder:$TAG \
    make -C /go/src/github.com/sysflow-telemetry/sf-processor/plugins/handlers/printer

To test it, run the pre-built processor with the example configuration and trace.

docker run --rm \
    -v $(pwd)/plugins:/usr/local/sysflow/plugins \
    -v $(pwd)/resources:/usr/local/sysflow/resources \
    -w /usr/local/sysflow/bin \
    --entrypoint=/usr/local/sysflow/bin/sfprocessor \
    sysflowtelemetry/sf-processor:$TAG \
    -log=info -config=../plugins/handlers/printer/pipeline.printer.json ../resources/traces/tcp.sf

The output on the above pre-recorded trace should look like this:

[Info] 2022/02/21 15:39:58 printer.go:118: ProcEvt ./server, 13823
[Info] 2022/02/21 15:39:58 printer.go:100: FileFlow ./server, 3
[Info] 2022/02/21 15:39:58 printer.go:100: FileFlow ./server, 3
[Info] 2022/02/21 15:39:58 printer.go:118: ProcEvt ./client, 13824
[Info] 2022/02/21 15:39:58 printer.go:100: FileFlow ./client, 3
[Info] 2022/02/21 15:39:58 printer.go:100: FileFlow ./client, 3
[Info] 2022/02/21 15:39:58 printer.go:94: NetworkFlow ./client, 8080
[Info] 2022/02/21 15:39:58 printer.go:118: ProcEvt ./client, 13824
[Info] 2022/02/21 15:39:58 printer.go:94: NetworkFlow ./server, 8080
[Info] 2022/02/21 15:39:58 printer.go:118: ProcEvt ./server, 13823

Action Plugins

User-defined actions can be plugged to SysFlow’s Policy Engine rule declarations to perform additional processing on matched records.

Interface

Actions are implemented via the golang plugin mechanism. An action must implement the following interface, defined in the github.com/sysflow-telemetry/sf-processor/core/policyengine/engine package.

// Prototype of an action function
type ActionFunc func(r *Record) error

// Action interface for user-defined actions
type Action interface {
        GetName() string
        GetFunc() ActionFunc
}

Actions have a name and an action function. Within a single policy engine instance, action names must be unique. User-defined actions cannot re-declare built-in actions. Reusing names of user-defined actions overwrites previously registered actions.

The action function receives the current record as an argument and thus has access to all record attributes. The action result can be stored in the record context via the context modifier methods.

Build

The now action is a pluggable action that creates a tag containing the current time in nanosecond precision.

First, in the root of sf-processor, build the processor and the action plugin. Note, this plugin’s shared object is generated in resources/actions/now.so.

make build && make -C plugins/actions/example

Then, run:

cd driver && ./sfprocessor -log=quiet -config=../plugins/actions/example/pipeline.actions.json ../resources/traces/tcp.sf

Plugin builder

To build the plugin for release, Go requires the code to be compiled with the exact package versions that the SysFlow processor was compiled with. The easiest way to achieve this is to use the pre-built plugin-builder Docker image in your build. This option also works for building plugins for deployment with the SysFlow binary packages.

Below is an example of how this can be achieved. Set $TAG to a SysFlow release (>=0.4.0), edge, or dev.

First, build the plugin:

docker run --rm \
    -v $(pwd)/plugins:/go/src/github.com/sysflow-telemetry/sf-processor/plugins \
    -v $(pwd)/resources:/go/src/github.com/sysflow-telemetry/sf-processor/resources \
    sysflowtelemetry/plugin-builder:$TAG \
    make -C /go/src/github.com/sysflow-telemetry/sf-processor/plugins/actions/example

To test it, run the pre-built processor with the example configuration and trace.

docker run --rm \
    -v $(pwd)/plugins:/usr/local/sysflow/plugins \
    -v $(pwd)/resources:/usr/local/sysflow/resources \
    -w /usr/local/sysflow/bin \
    --entrypoint=/usr/local/sysflow/bin/sfprocessor \
    sysflowtelemetry/sf-processor:$TAG \
    -log=quiet -config=../plugins/actions/example/pipeline.actions.json ../resources/traces/tcp.sf

In the output, observe that all records matching the policy speficied in pipeline.actions.json are tagged by action now with the tag now_in_nanos. For example:

{
  "version": 4,
  "endts": 0,
  "opflags": [
    "EXEC"
  ],
  ...
  "policies": [
    {
      "id": "Action example",
      "desc": "user-defined action example",
      "priority": 0
    }
  ],
  "tags": [
    "now_in_nanos:1645409122055957900"
  ]
}

Docker usage

Documentation and scripts for how to deploy the SysFlow Processor with docker compose can be found in here.

Processor environment

As mentioned in a previous section, all custom plugin attributes can be set using the following: <PLUGIN NAME>_<CONFIG ATTRIBUTE NAME> format. Note that the docker compose file sets several attributes including EXPORTER_TYPE, EXPORTER_HOST and EXPORTER_PORT.

The following are the default locations of the pipeline configuration and plugins directory:

  • pipeline.json: /usr/local/sysflow/conf/pipeline.json

  • drivers dir: /usr/local/sysflow/resources/drivers

  • plugins dir: /usr/local/sysflow/resources/plugins

  • handler dir: /usr/local/sysflow/resources/handlers

  • actions dir: /usr/local/sysflow/resources/actions

The default configuration can be changed by setting up a virtual mounts mapping the host directories/files into the container using the volumes section of the sf-processor in the docker-compose.yaml.

sf-processor:
    container_name: sf-processor
    image: sysflowtelemetry/sf-processor:latest
    privileged: true
    volumes:
      ...
      - ./path/to/my.pipeline.json:/usr/local/sysflow/conf/pipeline.json

The policy location can be overwritten by setting the POLICYENGINE_POLICIES environment variable, which can point to a policy file or a directory containing policy files (must have yaml extension).

The docker container uses a default filter.yaml policy that outputs SysFlow records in json. You can use your own policy files from the host by mounting your policy directory into the container as follows, in which the custom pipeline points to the mounted policies.

sf-processor:
    container_name: sf-processor
    image: sysflowtelemetry/sf-processor:latest
    privileged: true
    volumes:
      ...
      - ./path/to/my.pipeline.json:/usr/local/sysflow/conf/pipeline.json
      - ./path/to/policies/:/usr/local/sysflow/resources/policies/