SysFlow Processor (sf-processor repo)¶
The SysFlow processor is a lighweight edge analytics pipeline that can process and enrich SysFlow data. The processor is written in golang, and allows users to build and configure various pipelines using a set of built-in and custom plugins and drivers. Pipeline plugins are producer-consumer objects that follow an interface and pass data to one another through pre-defined channels in a multi-threaded environment. By contrast, a driver represents a data source, which pushes data to the plugins. The processor currently supports two builtin drivers, including one that reads sysflow from a file, and another that reads streaming sysflow over a domain socket. Plugins and drivers are configured using a JSON file.
A core built-in plugin is a policy engine that can apply logical rules to filter, alert, or semantically label sysflow records using a declarative language based on the Falco rules syntax with a few added extensions (more on this later).
Custom plugins and drivers can be implemented as dynamic libraries to tailor analytics to specific user requirements.
The endpoint of a pipeline configuration is an exporter plugin that sends the processed data to a target. The processor supports various types of export plugins for a variety of different targets.
Pre-requisites¶
The processor has been tested on Ubuntu/RHEL distributions, but should work on any Linux system.
Golang version 1.17+ and make (if building from sources)
Docker, docker-compose (if building with docker)
Build¶
Clone the processor repository
git clone https://github.com/sysflow-telemetry/sf-processor.git
Build locally, from sources
cd sf-processor
make build
Build with docker
cd sf-processor
make docker-build
Usage¶
For usage information, type:
cd driver/
./sfprocessor -help
This should yield the following usage statement:
Usage: sfprocessor [[-version]|[-driver <value>] [-log <value>] [-driverdir <value>] [-plugdir <value>] path]
Positional arguments:
path string
Input path
Arguments:
-config string
Path to pipeline configuration file (default "pipeline.json")
-cpuprofile file
Write cpu profile to file
-driver string
Driver name {file|socket|<custom>} (default "file")
-driverdir string
Dynamic driver directory (default "../resources/drivers")
-log string
Log level {trace|info|warn|error} (default "info")
-memprofile file
Write memory profile to file
-plugdir string
Dynamic plugins directory (default "../resources/plugins")
-test
Test pipeline configuration
-traceprofile file
Write trace profile to file
-version
Output version information
The four most important flags are config
, driverdir
, plugdir
, and driver
. The config
flag points to a pipeline configuration file, which describes the entire pipeline and settings for the individual settings for the plugins. The driverdir
and plugdir
flags specify where any dynamic drivers and plugins shared libraries reside that should be loaded by the processor at runtime. The driver
flag accepts a label to a pre-configured driver (either built-in or custom) that will be used as the data source to the pipeline. Currently, the pipeline only supports one driver at a time, but we anticipate handling multiple drivers in the future. There are two built-in drivers:
file: loads a sysflow file reading driver that reads from
path
.socket: the processor loads a sysflow streaming driver. The driver creates a domain socket named
path
and acts as a server waiting for a SysFlow collector to attach and send sysflow data.
Configuration¶
The pipeline configuration below shows how to configure a pipeline that will read a sysflow stream and push records to the policy engine, which will trigger alerts using a set of runtime policies stored in a yaml
file. An example pipeline with this configuration looks as follows:
{
"pipeline":[
{
"processor": "sysflowreader",
"handler": "flattener",
"in": "sysflow sysflowchan",
"out": "flat flattenerchan"
},
{
"processor": "policyengine",
"in": "flat flattenerchan",
"out": "evt eventchan",
"policies": "../resources/policies/runtimeintegrity"
},
{
"processor": "exporter",
"in": "evt eventchan",
"export": "syslog",
"proto": "tcp",
"tag": "sysflow",
"host": "localhost",
"port": "514"
}
]
}
NOTE: This configuration can be found in:
sf-collector/resources/pipelines/pipeline.syslog.json
This pipeline specifies three built-in plugins:
sysflowreader: is a generic reader plugin that ingests sysflow from the driver, caches entities, and presents sysflow objects to a handler object (i.e., an object that implements the handler interface) for processing. In this case, we are using the flattener handler, but custom handlers are possible.
policyengine: is the policy engine, which takes flattened (row-oriented) SysFlow records as input and outputs records, which represent alerts, or filtered sysflow records depending on the policy engine’s mode (more on this later).
exporter: takes records from the policy engine, and exports them to ElasticSearch, syslog, file, or terminal, in a JSON format or in Elastic Common Schema (ECS) format. Note that custom export plugins can be created to export to other serialization formats and transport protocols.
Each plugin has a set of general attributes that are present in all plugins, and a set of attributes that are custom to the specific plugins. For more details on the specific attributes in this example, see the pipeline configuration template
The general attributes are as follows:
processor (required): the name of the processor plugin to load. Processors must implement the SFProcessor interface; the name is the value that must be returned from the
GetName()
function as defined in the processor object.handler (optional): the name of the handler object to be used for the processor. Handlers must implement the SFHandler interface.
in (required): the input channel (i.e. golang channel) of objects that are passed to the plugin.
out (optional): the output channel (i.e. golang channel) for objects that are pushed out of the plugin, and into the next plugin in the pipeline sequence.
Channels are modelled as channel objects that have an In
attribute representing some golang channel of objects. See SFChannel for an example. The syntax for a channel in the pipeline is [channel name] [channel type]
. Where channel type is the label given to the channel type at plugin registration (more on this later), and channel name is a unique identifier for the current channel instance. The name and type of an output channel in one plugin must match that of the name and type of the input channel of the next plugin in the pipeline sequence.
NOTE: A plugin has exacly one input channel but it may specify more than one output channels. This allows pipeline definitions that fan out data to more than one receiver plugin similar to a Unix
tee
command. While there must be always one SysFlow reader acting as the entry point of a pipeline, a pipeline configuration may specify policy engines passing data to different exporters or a SysFlow reader passing data to different policy engines. Generally, pipelines form a tree rather being a linear structure.
Policy engine configuration¶
The policy engine ("processor": "policyengine"
) plugin is driven by a set of rules. These rules are specified in a YAML file which adopts the same syntax as the rules of the Falco project. A policy engine plugin specification may have the following attributes:
policies (required for
alert
mode`): The path to the YAML rules specification file. More information on rules can be found in the Policies section.mode (optional): The mode of the policy engine. Allowed values are:
alert
(default): the policy engine generates rule-based alerts;alert
is a blocking mode that drops all records that do not match any given rule. If no mode is specified, the policy engine runs inalert
mode by default.enrich
for enriching records with additional context from the rule. In contrast toalert
, this is a non-blocking mode which applies tagging and action enrichments to matching records as defined in the policy file. Non-matching records are passed on “as is”.
monitor (optional): Specifies if changes to the policy file(s) should be monitored and updated in the policy engine.
none
(default): no monitor is used.local
: the processor will monitor for changes in the policies path and update its rule set if changes are detected.
monitor.interval (optional): The interval in seconds for updating policies, if a monitor is used. (default: 30 seconds).
concurrency (optional); The number of concurrent threads for record processing. (default: 5).
actiondir (optional): The path of the directory containing the shared object files for user-defined action plugins. See the section on User-defined Actions for more information.
NOTE: Prior to release 0.4.0, the mode attribute accepted different values with different semantics. To preserve the behavior of older releases:
For old
alert
behavior, useenrich
mode.For old
filter
behavior, useenrich
mode and a policy file with filter rules only.For old
bypass
behavior, useenrich
and drop the policies key from the configuration.
Exporter configuration¶
An exporter ("processor": "exporter"
) plugin consists of two modules, an encoder for converting the data to a suitable format, and a transport module for sending the data to the target. Encoders target specific, i.e. for a particular export target a particular set of encoders may be used. In the exporter configuration the transport module is specified via the export parameter (required). The encoder is selected via the format parameter (optional). The default format is json
.
The following table lists the currently supported exporter modules and the corresponding encoders. Additional encoders and transport modules can be implemented if need arises. If you plan to contribute or want to get involved in the discussion please join the SysFlow community.
Some of these combinations require additional configuration as described in the following sections. null
is used for debugging the processor and doesn’t export any data.
File¶
If export is set to file
, an additional parameter file.path allows the specification of the target file.
Syslog¶
If the export parameter is set to syslog
, output to syslog is enabled and the following addtional parameters are used:
syslog.proto (optional): The protocol used for communicating with the syslog server. Allows values are
tcp
,tls
andudp
. Default istcp
.syslog.tag (optional): The tag used for each Sysflow record in syslog. Default is
SysFlow
.syslog.source (optional): If set adds a hostname to the syslog header.
syslog.host (optional): The hostname of the sysflow server. Default is
localhost
.syslog.port (optional): The port of the syslow server. Default is
514
.
ElasticSearch¶
Export to ElasticSearch is enabled by setting the config parameter export to es
. The only supported format for export to ElasticSearch is ecs
.
Data export is done via bulk ingestion. The ingestion can be controlled by some additional parameters which are read when the es
export target is selected. Required parameters specify the ES target, index and credentials. Optional parameters control some aspects of the behavior of the bulk ingestion and may have an effect on performance. You may need to adapt their valuesfor optimal performance in your environment.
es.addresses (required): A comma-separated list of ES endpoints.
es.index (required): The name of the ES index to ingest into.
es.username (required): The ES username.
es.password (required): The password for the specified ES user.
buffer (optional) The bulk size as the number of records to be ingested at once. Default is
0
but value of0
indicates record-by-record ingestion which may be highly inefficient.es.bulk.numWorkers (optional): The number of ingestion workers used in parallel. Default is
0
which means that the exporter uses as many workers as there are cores in the machine.es.bulk.flashBuffer (optional): The size in bytes of the flush buffer for ingestion. It should be large enough to hold one bulk (the number of records specified in buffer), otherwise the bulk is broken into smaller chunks. Default is
5e+6
.es.bulk.flushTimeout (optional): The flush buffer time threshold. Valid values are golang duration strings. Default is
30s
.
The Elastic exporter does not make any assumption on the existence or configuration of the index specified in es.index. If the index does not exist, Elastic will automatically create it and apply a default dynamic mapping. It may be beneficial to use an explicit mapping for the ECS data generated by the Elastic exporter. For convinience we provide an explicit mapping for creating a new tailored index in Elastic. For more information refer to the Elastic Mapping reference.
Environment variables¶
It is possible to override any of the custom attributes of a plugin using an environment variable. This is especially useful when operating the processor as a container, where you may have to deploy the processor to multiple nodes, and have attributes that change per node. If an environment variable is set, it overrides the setting inside the config file. The environment variables must follow the following structure:
Environment variables must follow the naming schema
<PLUGIN NAME>_<CONFIG ATTRIBUTE NAME>
The plugin name inside the pipeline configuration file must be all lower case.
For example, to set the alert mode inside the policy engine, the following environment variable is set:
export POLICYENGINE_MODE=alert
To set the syslog values for the exporter:
export EXPORTER_TYPE=telemetry
export EXPORTER_SOURCE=${HOSTNAME}
export EXPORTER_EXPORT=syslog
export EXPORTER_HOST=192.168.2.10
export EXPORTER_PORT=514
If running as a docker container, environment variables can be passed with the docker run command:
docker run
-e EXPORTER_TYPE=telemetry \
-e EXPORTER_SOURCE=${HOSTNAME} \
-e EXPORTER_EXPORT=syslog \
-e EXPORTER_HOST=192.168.2.10 \
-e EXPORTER_PORT=514
...
Rate limiter configuration (experimental)¶
The flattener
handler has a built-in time decay filter that can be enabled to reduce even rates in the processor. The filter uses a time-decay bloom filter based on a semantic hashing of records. This means that the filter should only forward one record matching a semantic hash per time decay period. The semantic hash takes into consideration process, flow and event attributes. To enable rate limiting, modify the sysflowreader
processor as follows:
{
"processor": "sysflowreader",
"handler": "flattener",
"in": "sysflow sysflowchan",
"out": "flat flattenerchan",
"filter.enabled": "on|off (default: off)",
"filter.maxage": "time decay in minutes (default: 24H)"
}
Policy Language¶
The policy engine adopts and extends the Falco rules definition syntax. Before reading the rest of this section, please go through the Falco Rules documentation to get familiar with rule, macro, and list syntax, all of which are supported in our policy engine. Policies are written in one or more yaml
files, and stored in a directory specified in the pipeline configuration file under the policies
attribute of the policy engine plugin.
Rules contain the following fields:
rule: the name of the rule
description: a textual description of the rule
condition: a set of logical operations that can reference lists and macros, which when evaluating to true, can trigger record enrichment or alert creation (depending on the policy engine mode)
action: a comma-separated list of actions to take place when the rule evaluates to true. For a particular rule, actions are evaluated in the order they are specified, i.e., an action can make use of the results provided by earlier actions. An action is just the name of an action function without any parameters. The current version only supports plugable user-defined actions. See here for a detailed description of the plugin interface and a sample action plugin.
priority: label representing the severity of the alert can be: (1) low, medium, or high, or (2) emergency, alert, critical, error, warning, notice, informational, debug.
tags (optional): set of labels appended to alert (default: empty).
prefilter (optional): list of record types (
sf.type
) to whitelist before applying rule condition (default: empty).enabled (optional): indicates whether the rule is enabled (default: true).
NOTE: The syntax of the policy language changed slighly with the switch to release 0.4.0. For migrating policy files used with prior releases to release 0.4.0 or higher, simply remove all
action: [tag]
lines. As of release 0.4.0, tagging is done automatically. If a rule triggers all tags specified via the tags key will be appended to the record. The action key is reserved for specifying user-defined action plugins.</p>
Macros are named conditions and contain the following fields:
macro: the name of the macro
condition: a set of logical operations that can reference lists and macros, which evaluate to true or false
Lists are named collections and contain the following fields:
list: the name of the list
items: a collection of values or lists
Drop rules block records matching a condition and can be used for reducing the amount of records processed by the policy engine:
drop: the name of the filter
condition: a set of logical operations that can reference lists and macros, which evaluate to true or false
For example, the rule below specifies that matching records are process events (sf.type = PE
), denoting EXEC
operations (sf.opflags = EXEC
) for which the process matches macro package_installers
. Additionally, the global filter containers
preemptively removes from the processing stream any records for processes not running in a container environment.
# lists
- list: rpm_binaries
items: [dnf, rpm, rpmkey, yum, '"75-system-updat"', rhsmcertd-worke, subscription-ma,
repoquery, rpmkeys, rpmq, yum-cron, yum-config-mana, yum-debug-dump,
abrt-action-sav, rpmdb_stat, microdnf, rhn_check, yumdb]
- list: deb_binaries
items: [dpkg, dpkg-preconfigu, dpkg-reconfigur, dpkg-divert, apt, apt-get, aptitude,
frontend, preinst, add-apt-reposit, apt-auto-remova, apt-key,
apt-listchanges, unattended-upgr, apt-add-reposit]
- list: package_mgmt_binaries
items: [rpm_binaries, deb_binaries, update-alternat, gem, pip, pip3, sane-utils.post, alternatives, chef-client]
# macros
- macro: package_installers
condition: sf.proc.name pmatch (package_mgmt_binaries)
# global filters (blacklisting)
- filter: containers
condition: sf.container.type = host
# rule definitions
- rule: Package installer detected
desc: Use of package installer detected
condition: sf.opflags = EXEC and package_installers
priority: medium
tags: [actionable-offense, suspicious-process]
prefilter: [PE] # record types for which this rule should be applied (whitelisting)
enabled: true
Attribute names¶
The following table shows a detailed list of attribute names supported by the policy engine, as well as their type, and comparative Falco attribute name. Our policy engine supports both SysFlow and Falco attribute naming convention to enable reuse of policies across the two frameworks.
Attributes |
Description |
Values |
Falco Attribute |
---|---|---|---|
sf.type |
Record type |
PE,PF,NF,FF,FE,KE |
N/A |
sf.opflags |
Operation flags |
Operation Flags List: remove |
evt.type (remapped as falco event types) |
sf.ret |
Return code |
int |
evt.res |
sf.ts |
start timestamp(ns) |
int64 |
evt.time |
sf.endts |
end timestamp(ns) |
int64 |
N/A |
sf.proc.pid |
Process PID |
int64 |
proc.pid |
sf.proc.tid |
Thread PID |
int64 |
thread.tid |
sf.proc.uid |
Process user ID |
int |
user.uid |
sf.proc.user |
Process user name |
string |
user.name |
sf.proc.gid |
Process group ID |
int |
group.gid |
sf.proc.group |
Process group name |
string |
group.name |
sf.proc.apid |
Proc ancestors PIDs (qo) |
int64 |
proc.apid |
sf.proc.aname |
Proc anctrs names (qo) (exclude path) |
string |
proc.aname |
sf.proc.exe |
Process command/filename (with path) |
string |
proc.exe |
sf.proc.args |
Process command arguments |
string |
proc.args |
sf.proc.name |
Process name (qo) (exclude path) |
string |
proc.name |
sf.proc.cmdline |
Process command line (qo) |
string |
proc.cmdline |
sf.proc.tty |
Process TTY status |
boolean |
proc.tty |
sf.proc.entry |
Process container entrypoint |
bool |
proc.vpid == 1 |
sf.proc.createts |
Process creation timestamp (ns) |
int64 |
N/A |
sf.pproc.pid |
Parent process ID |
int64 |
proc.ppid |
sf.pproc.gid |
Parent process group ID |
int64 |
N/A |
sf.pproc.uid |
Parent process user ID |
int64 |
N/A |
sf.pproc.group |
Parent process group name |
string |
N/A |
sf.pproc.tty |
Parent process TTY status |
bool |
N/A |
sf.pproc.entry |
Parent process container entry |
bool |
N/A |
sf.pproc.user |
Parent process user name |
string |
N/A |
sf.pproc.exe |
Parent process command/filename |
string |
N/A |
sf.pproc.args |
Parent process command arguments |
string |
N/A |
sf.pproc.name |
Parent process name (qo) (no path) |
string |
proc.pname |
sf.pproc.cmdline |
Parent process command line (qo) |
string |
proc.pcmdline |
sf.pproc.createts |
Parent process creation timestamp |
int64 |
N/A |
sf.file.fd |
File descriptor number |
int |
fd.num |
sf.file.path |
File path |
string |
fd.name |
sf.file.newpath |
New file path (used in some FileEvents) |
string |
N/A |
sf.file.name |
File name (qo) |
string |
fd.filename |
sf.file.directory |
File directory (qo) |
string |
fd.directory |
sf.file.type |
File type |
char ‘f’: file, 4: IPv4, 6: IPv6, ‘u’: unix socket, ‘p’: pipe, ‘e’: eventfd, ‘s’: signalfd, ‘l’: eventpoll, ‘i’: inotify, ‘o’: unknown. |
fd.typechar |
sf.file.is_open_write |
File open with write flag (qo) |
bool |
evt.is_open_write |
sf.file.is_open_read |
File open with read flag (qo) |
bool |
evt.is_open_read |
sf.file.openflags |
File open flags |
int |
evt.args |
sf.net.proto |
Network protocol |
int |
fd.l4proto |
sf.net.sport |
Source port |
int |
fd.sport |
sf.net.dport |
Destination port |
int |
fd.dport |
sf.net.port |
Src or Dst port (qo) |
int |
fd.port |
sf.net.sip |
Source IP |
int |
fd.sip |
sf.net.dip |
Destination IP |
int |
fd.dip |
sf.net.ip |
Src or dst IP (qo) |
int |
fd.ip |
sf.res |
File or network resource |
string |
fd.name |
sf.flow.rbytes |
Flow bytes read/received |
int64 |
evt.res |
sf.flow.rops |
Flow operations read/received |
int64 |
N/A |
sf.flow.wbytes |
Flow bytes written/sent |
int64 |
evt.res |
sf.flow.wops |
Flow bytes written/sent |
int64 |
N/A |
sf.container.id |
Container ID |
string |
container.id |
sf.container.name |
Container name |
string |
container.name |
sf.container.image.id |
Container image ID |
string |
container.image.id |
sf.container.image |
Container image name |
string |
container.image |
sf.container.type |
Container type |
CT_DOCKER, CT_LXC, CT_LIBVIRT_LXC, CT_MESOS, CT_RKT, CT_CUSTOM, CT_CRI, CT_CONTAINERD, CT_CRIO, CT_BPM |
container.type |
sf.container.privileged |
Container privilege status |
bool |
container.privileged |
sf.pod.ts |
Pod creation timestamp |
int |
N/A |
sf.pod.id |
Pod id |
string |
N/A |
sf.pod.name |
Pod name |
string |
N/A |
sf.pod.nodename |
Pod node name |
string |
N/A |
sf.pod.namespace |
Pod namespace |
string |
N/A |
sf.pod.restartcnt |
Pod restart count |
int |
N/A |
sf.pod.hostip |
Pod host IP addresses |
json |
N/A |
sf.pod.internalip |
Pod internal IP addresses |
json |
N/A |
sf.pod.services |
Pod services |
json |
N/A |
sf.ke.action |
Kubernetes event action |
K8S_COMPONENT_ADDED, K8S_COMPONENT_MODIFIED, K8S_COMPONENT_DELETED, K8S_COMPONENT_ERROR, K8S_COMPONENTNONEXISTENT, K8S_COMPONENT_UNKNOWN |
N/A |
sf.ke.kind |
Kubernetes event resource type |
K8S_NODES, K8S_NAMESPACES, K8S_PODS, K8S_REPLICATIONCONTROLLERS, K8S_SERVICES, K8S_EVENTS, K8S_REPLICASETS, K8S_DAEMONSETS, K8S_DEPLOYMENT, K8S_UNKNOWN |
N/A |
sf.ke.message |
Kubernetes event json message |
json |
N/A |
sf.node.id |
Node identifier |
string |
N/A |
sf.node.ip |
Node IP address |
string |
N/A |
sf.schema.version |
SysFlow schema version |
string |
N/A |
sf.version |
SysFlow JSON schema version |
int |
N/A |
$ Jsonpath Expressions¶
Unlike attributes of the scalar types bool, int(64), and string, attributes of type json
contain structured information in form of stringified json records. The policy language allows access to subfields inside such json records via GJSON jsonpath expressions. The jsonpath expression must be specified as a suffix to the attribute enclosed in square brackets. Examples of such terms are:
sf.pod.services[0.clusterip.0] - the first cluster IP address of the first service associated with a pod
sf.ke.message[items.0.namespace] - the namespace of the first item in a KE message attribute
See the GJSON path synax for more details. The result of applying a jsonpath expression to a json attribute is always of type string.
Operations¶
The policy language supports the following operations:
Operation |
Description |
Example |
---|---|---|
A and B |
Returns true if both statements are true |
sf.pproc.name=bash and sf.pproc.cmdline contains echo |
A or B |
Returns true if one of the statements are true |
sf.file.path = “/etc/passwd” or sf.file.path = “/etc/shadow” |
not A |
Returns true if the statement isn’t true |
not sf.pproc.exe = /usr/local/sbin/runc |
A = B |
Returns true if A exactly matches B. Note, if B is a list, A only has to exact match one element of the list. If B is a list, it must be explicit. It cannot be a variable. If B is a variable use |
sf.file.path = [“/etc/passwd”, “/etc/shadow”] |
A != B |
Returns true if A is not equal to B. Note, if B is a list, A only has to be not equal to one element of the list. If B is a list, it must be explicit. It cannot be a variable. |
sf.file.path != “/etc/passwd” |
A < B |
Returns true if A is less than B. Note, if B is a list, A only has to be less than one element in the list. If B is a list, it must be explicit. It cannot be a variable. |
sf.flow.wops < 1000 |
A <= B |
Returns true if A is less than or equal to B. Note, if B is a list, A only has to be less than or equal to one element in the list. If B is a list, it must be explicit. It cannot be a variable. |
sf.flow.wops <= 1000 |
A > B |
Returns true if A is greater than B. Note, if B is a list, A only has to be greater than one element in the list. If B is a list, it must be explicit. It cannot be a variable. |
sf.flow.wops > 1000 |
A >= B |
Returns true if A is greater than or equal to B. Note, if B is a list, A only has to be greater than or equal to one element in the list. If B is a list, it must be explicit. It cannot be a variable. |
sf.flow.wops >= 1000 |
A in B |
Returns true if value A is an exact match to one of the elements in list B. Note: B must be a list. Note: () can be used on B to merge multiple list objects into one list. |
sf.proc.exe in (bin_binaries, usr_bin_binaries) |
A startswith B |
Returns true if string A starts with string B |
sf.file.path startswith ‘/home’ |
A endswith B |
Returns true if string A ends with string B |
sf.file.path endswith ‘.json’ |
A contains B |
Returns true if string A contains string B |
sf.pproc.name=java and sf.pproc.cmdline contains org.apache.hadoop |
A icontains B |
Returns true if string A contains string B ignoring capitalization |
sf.pproc.name=java and sf.pproc.cmdline icontains org.apache.hadooP |
A pmatch B |
Returns true if string A partial matches one of the elements in B. Note: B must be a list. Note: () can be used on B to merge multiple list objects into one list. |
sf.proc.name pmatch (modify_passwd_binaries, verify_passwd_binaries, user_util_binaries) |
exists A |
Checks if A is not a zero value (i.e. 0 for int, “” for string) |
exists sf.file.path |
See the resources policies directory in github for examples. Feel free to contribute new and interesting rules through a github pull request.
User-defined Actions¶
User-defined actions are implemented via the golang plugin mechanism. Check the documentation on Action Plugins for a custom action plugin example.
Plugins¶
In addition to its core plugins, the processor also supports custom plugins that can be dynamically loaded into the processor via a compiled golang shared library using the golang plugin package. Custom plugins enable easy extension of the processor and the creation of custom pipelines tailored to specific use cases.
The processor supports four types of plugins:
drivers: enable the ingestion of different telemetry sources into the processor pipeline.
processors: enable the creation of custom data processing and analytic plugins to extend sf-processor pipelines.
handlers: enable the creation of custom SysFlow record handling plugins.
actions: enable the creation of custom action plugins to extend sf-processor’s policy engine.
Pre-requisites¶
Go 1.17 (if building locally, without the plugin builder)
Processor Plugins¶
User-defined plugins can be plugged and extend the sf-processor pipeline. These are the most generic type of plugins, from which all built-in processor plugins are build. Check the core
package for examples. We have built-in processor plugins for flattening the telemetry stream, implementing a policy engine, and creating event exporters.
Interface¶
Processor plugins (or just plugins) are implemented via the golang plugin mechanism. A plugin must implement the following interface, defined in the github.com/sysflow-telemetry/sf-apis/go/plugins
package.
// SFProcessor defines the SysFlow processor interface.
type SFProcessor interface {
Register(pc SFPluginCache)
Init(conf map[string]interface{}) error
Process(ch interface{}, wg *sync.WaitGroup)
GetName() string
SetOutChan(ch []interface{})
Cleanup()
}
The Process
function is the main function of the plugin.It’s where the “main loop” of the plugin should be implemented. It receives the input channel configured in the custom plugin’s block in the pipeline configuration. It also received the pepeline thread WaitGroup. Custom processing code should be implemented using this function. Init
is called once, when the pipeline is loaded. Cleanup
is called when the pipeline is terminated. SetOutChannel
receives a slice with the output channels configured in the plugin’s block in the pipeline configuration.
When loading a pipeline, sf-processor performs a series of health checks before the pipeline is enabled. If these health checks fail, the processor terminates. To enable health checks on custom plugins, implement the Test
function defined in the interface below. For an example, check core/exporter/exporter.go
.
// SFTestableProcessor defines a testable SysFlow processor interface.
type SFTestableProcessor interface {
SFProcessor
Test() (bool, error)
}
Example¶
A dynamic plugin example is provided in github. The core of the plugin is building an object that implements an SFProcessor interface. Such an implementation looks as follows:
package main
import (
"sync"
"github.com/sysflow-telemetry/sf-apis/go/logger"
"github.com/sysflow-telemetry/sf-apis/go/plugins"
"github.com/sysflow-telemetry/sf-apis/go/sfgo"
"github.com/sysflow-telemetry/sf-processor/core/flattener"
)
const (
pluginName string = "example"
)
// Plugin exports a symbol for this plugin.
var Plugin Example
// Example defines an example plugin.
type Example struct{}
// NewExample creates a new plugin instance.
func NewExample() plugins.SFProcessor {
return new(Example)
}
// GetName returns the plugin name.
func (s *Example) GetName() string {
return pluginName
}
// Init initializes the plugin with a configuration map.
func (s *Example) Init(conf map[string]interface{}) error {
return nil
}
// Register registers plugin to plugin cache.
func (s *Example) Register(pc plugins.SFPluginCache) {
pc.AddProcessor(pluginName, NewExample)
}
// Process implements the main interface of the plugin.
func (s *Example) Process(ch interface{}, wg *sync.WaitGroup) {
cha := ch.(*flattener.FlatChannel)
record := cha.In
logger.Trace.Println("Example channel capacity:", cap(record))
defer wg.Done()
logger.Trace.Println("Starting Example")
for {
fc, ok := <-record
if !ok {
logger.Trace.Println("Channel closed. Shutting down.")
break
}
if fc.Ints[sfgo.SYSFLOW_IDX][sfgo.SF_REC_TYPE] == sfgo.PROC_EVT {
logger.Info.Printf("Process Event: %s, %d", fc.Strs[sfgo.SYSFLOW_IDX][sfgo.PROC_EXE_STR], fc.Ints[sfgo.SYSFLOW_IDX][sfgo.EV_PROC_TID_INT])
}
}
logger.Trace.Println("Exiting Example")
}
// SetOutChan sets the output channel of the plugin.
func (s *Example) SetOutChan(ch []interface{}) {}
// Cleanup tears down plugin resources.
func (s *Example) Cleanup() {}
// This function is not run when module is used as a plugin.
func main() {}
The custom plugin must implement the following interface:
GetName()
- returns a lowercase string representing the plugin’s label. This label is important, because it identifies the plugin in thepipeline.json
file, enabling the processor to load the plugin. In the object above, this plugin is calledexample
. Note that the label must be unique.Init(conf map[string]interface{}) error
- used to initialize the plugin. The configuration map that is passed to the function stores all the configuration information defined in the plugin’s definition insidepipeline.json
(more on this later).Register(pc plugins.SFPluginCache)
- this registers the plugin with the plugin cache of the processor.pc.AddProcessor(pluginName, <plugin constructor function>)
(required) - registers the plugin namedexample
with the processor. You must define a constructor function using the conventionNew<PluginName>
which is used to instantiate the plugin, and returns it as anSFProcessor
interface - seeNewExample
for an example.pc.AddChannel(channelName, <output channel constructor function>)
(optional) - if your plugin is using a custom output channel of objects (i.e., the channel used to pass output objects from this plugin to the next in the pipeline), it should be included in this plugin.The
channelName
should be a lowercase unique label defining the channel type.The constructor function should return a golang
interface{}
representing an object that as anIn
attribute of typechan <ObjectToBePassed>
. We will call this object, a wrapped channel object going forward. For example, the channel object that passes sysflow objects is called SFChannel, and is defined hereFor a complete example of defining an output channel, see
NewFlattenerChan
in the flattener as well as theRegister
function. TheFlatChannel
is defined here
Process(ch interface{}, wg *sync.WaitGroup)
- this function is launched by the processor as a go thread and is where the main plugin processing occurs. It takes a wrapped channel object, which acts as the input data source to the plugin (i.e., this is the channel that is configured as the input channel to the plugin in the pipeline.json). It also takes a sync.WaitGroup object, which is used to signal to the processor when the plugin has completed running (seedefer wg.Done()
in code). The processor must loop on the input channel, and do its analysis on each input record. In this case, the example plugin is reading flat records and printing them to the screen.SetOutChan(ch []interface{})
- sets the wrapped channels that will serve as the output channels for the plugin. The output channels are instantiated by the processor, which is also in charge of stitching the plugins together. If the plugin is the last one in the chain, then this function can be left empty. See theSetOutputChan
function in the flattener to see how an output channel is implemented.Cleanup()
- Used to cleanup any resources. This function is called by the processor after the pluginProcess
function exits. One of the key items to close in theCleanup
function is the output channel using the golangclose()
function. Closing the output channel enables the pipeline to be torn down gracefully and in sequence.main(){}
- this main method is not used by the plugin or processor. It’s required by golang in order to be able to compile as a shared object.
To compile the example plugin, use the provided Makefile:
make -C plugins/processors/example
This will build the plugin and copy it into resources/plugins/
.
To use the new plugin, use the configuration provided in github, which defines the following pipeline:
{
"pipeline":[
{
"processor": "sysflowreader",
"handler": "flattener",
"in": "sysflow sysflowchan",
"out": "flat flattenerchan"
},
{
"processor": "example",
"in": "flat flattenerchan"
}
]
}
This pipeline contains two plugins:
- The builtin
sysflowReader
plugin with flattener handler, which takes raw sysflow objects, and flattens them into arrays of integers and strings for easier processing in certain plugins like the policy engine.
- The builtin
The
example
plugin, which takes the flattened output from the sysflowreader plugin, and prints it the screen.
The key item to note is that the output channel (i.e., out
) of sysflowreader
matches the input channel (i.e., in
) of the example plugin. This ensures that the plugins will be properly stitched together.
Build¶
The example
plugin is a custom plugin that illustrates how to implement a minimal plugin that reads the records from the input channel and logs them to the standard output.
To run this example, in the root of sf-processor, build the processor and the example plugin. Note, this plugin’s shared object is generated in resources/plugins/example.so
.
make build && make -C plugins/processors/example
Then, run:
cd driver && ./sfprocessor -log=info -config=../plugins/processors/example/pipeline.example.json ../resources/traces/tcp.sf
Plugin builder¶
To build the plugin for release, Go requires the code to be compiled with the exact package versions that the SysFlow processor was compiled with. The easiest way to achieve this is to use the pre-built plugin-builder
Docker image in your build. This option also works for building plugins for deployment with the SysFlow binary packages.
Below is an example of how this can be achieved. Set $TAG to a SysFlow release (>=0.4.0), edge
, or dev
.
First, build the plugin:
docker run --rm \
-v $(pwd)/plugins:/go/src/github.com/sysflow-telemetry/sf-processor/plugins \
-v $(pwd)/resources:/go/src/github.com/sysflow-telemetry/sf-processor/resources \
sysflowtelemetry/plugin-builder:$TAG \
make -C /go/src/github.com/sysflow-telemetry/sf-processor/plugins/processors/example
To test it, run the pre-built processor with the example configuration and trace.
docker run --rm \
-v $(pwd)/plugins:/usr/local/sysflow/plugins \
-v $(pwd)/resources:/usr/local/sysflow/resources \
-w /usr/local/sysflow/bin \
--entrypoint=/usr/local/sysflow/bin/sfprocessor \
sysflowtelemetry/sf-processor:$TAG \
-log=info -config=../plugins/processors/example/pipeline.example.json ../resources/traces/tcp.sf
The output on the above pre-recorded trace should look like this:
[Health] 2022/02/21 12:55:19 pipeline.go:246: Health checks: passed
[Info] 2022/02/21 12:55:19 main.go:147: Successfully loaded pipeline configuration
[Info] 2022/02/21 12:55:19 pipeline.go:170: Starting the processing pipeline
[Info] 2022/02/21 12:55:19 example.go:75: Process Event: ./server, 13823
[Info] 2022/02/21 12:55:19 example.go:75: Process Event: ./client, 13824
[Info] 2022/02/21 12:55:19 example.go:75: Process Event: ./client, 13824
[Info] 2022/02/21 12:55:19 example.go:75: Process Event: ./server, 13823
Handler Plugins¶
User-defined handler modules can be plugged to the built-in SysFlow processor
plugin to implement custom data processing and analytic pipelines.
Interface¶
Handlers are implemented via the golang plugin mechanism. A handler must implement the following interface, defined in the github.com/sysflow-telemetry/sf-apis/go/plugins
package.
// SFHandler defines the SysFlow handler interface.
type SFHandler interface {
RegisterChannel(pc SFPluginCache)
RegisterHandler(hc SFHandlerCache)
Init(conf map[string]interface{}) error
IsEntityEnabled() bool
HandleHeader(sf *CtxSysFlow, hdr *sfgo.SFHeader) error
HandleContainer(sf *CtxSysFlow, cont *sfgo.Container) error
HandleProcess(sf *CtxSysFlow, proc *sfgo.Process) error
HandleFile(sf *CtxSysFlow, file *sfgo.File) error
HandleNetFlow(sf *CtxSysFlow, nf *sfgo.NetworkFlow) error
HandleNetEvt(sf *CtxSysFlow, ne *sfgo.NetworkEvent) error
HandleFileFlow(sf *CtxSysFlow, ff *sfgo.FileFlow) error
HandleFileEvt(sf *CtxSysFlow, fe *sfgo.FileEvent) error
HandleProcFlow(sf *CtxSysFlow, pf *sfgo.ProcessFlow) error
HandleProcEvt(sf *CtxSysFlow, pe *sfgo.ProcessEvent) error
SetOutChan(ch []interface{})
Cleanup()
}
Each Handle*
function receives the current SysFlow record being processed along with its corresponding parsed record type. Custom processing code should be implemented using these functions.
Build¶
The printer
handler is a pluggable handler that logs select SysFlow records to the standard output. This plugin doesn’t define any output channels, so it acts as a plugin sink (last plugin in a pipeline).
To run this example, in the root of sf-processor, build the processor and the handler plugin. Note, this plugin’s shared object is generated in resources/handlers/printer.so
.
make build && make -C plugins/handlers/printer
Then, run:
cd driver && ./sfprocessor -log=info -config=../plugins/handlers/printer/pipeline.printer.json ../resources/traces/tcp.sf
Plugin builder¶
To build the plugin for release, Go requires the code to be compiled with the exact package versions that the SysFlow processor was compiled with. The easiest way to achieve this is to use the pre-built plugin-builder
Docker image in your build. This option also works for building plugins for deployment with the SysFlow binary packages.
Below is an example of how this can be achieved. Set $TAG to a SysFlow release (>=0.4.0), edge
, or dev
.
First, build the plugin:
docker run --rm \
-v $(pwd)/plugins:/go/src/github.com/sysflow-telemetry/sf-processor/plugins \
-v $(pwd)/resources:/go/src/github.com/sysflow-telemetry/sf-processor/resources \
sysflowtelemetry/plugin-builder:$TAG \
make -C /go/src/github.com/sysflow-telemetry/sf-processor/plugins/handlers/printer
To test it, run the pre-built processor with the example configuration and trace.
docker run --rm \
-v $(pwd)/plugins:/usr/local/sysflow/plugins \
-v $(pwd)/resources:/usr/local/sysflow/resources \
-w /usr/local/sysflow/bin \
--entrypoint=/usr/local/sysflow/bin/sfprocessor \
sysflowtelemetry/sf-processor:$TAG \
-log=info -config=../plugins/handlers/printer/pipeline.printer.json ../resources/traces/tcp.sf
The output on the above pre-recorded trace should look like this:
[Info] 2022/02/21 15:39:58 printer.go:118: ProcEvt ./server, 13823
[Info] 2022/02/21 15:39:58 printer.go:100: FileFlow ./server, 3
[Info] 2022/02/21 15:39:58 printer.go:100: FileFlow ./server, 3
[Info] 2022/02/21 15:39:58 printer.go:118: ProcEvt ./client, 13824
[Info] 2022/02/21 15:39:58 printer.go:100: FileFlow ./client, 3
[Info] 2022/02/21 15:39:58 printer.go:100: FileFlow ./client, 3
[Info] 2022/02/21 15:39:58 printer.go:94: NetworkFlow ./client, 8080
[Info] 2022/02/21 15:39:58 printer.go:118: ProcEvt ./client, 13824
[Info] 2022/02/21 15:39:58 printer.go:94: NetworkFlow ./server, 8080
[Info] 2022/02/21 15:39:58 printer.go:118: ProcEvt ./server, 13823
Action Plugins¶
User-defined actions can be plugged to SysFlow’s Policy Engine rule declarations to perform additional processing on matched records.
Interface¶
Actions are implemented via the golang plugin mechanism. An action must implement the following interface, defined in the github.com/sysflow-telemetry/sf-processor/core/policyengine/engine
package.
// Prototype of an action function
type ActionFunc func(r *Record) error
// Action interface for user-defined actions
type Action interface {
GetName() string
GetFunc() ActionFunc
}
Actions have a name and an action function. Within a single policy engine instance, action names must be unique. User-defined actions cannot re-declare built-in actions. Reusing names of user-defined actions overwrites previously registered actions.
The action function receives the current record as an argument and thus has access to all record attributes. The action result can be stored in the record context via the context modifier methods.
Build¶
The now
action is a pluggable action that creates a tag containing the current time in nanosecond precision.
First, in the root of sf-processor, build the processor and the action plugin. Note, this plugin’s shared object is generated in resources/actions/now.so
.
make build && make -C plugins/actions/example
Then, run:
cd driver && ./sfprocessor -log=quiet -config=../plugins/actions/example/pipeline.actions.json ../resources/traces/tcp.sf
Plugin builder¶
To build the plugin for release, Go requires the code to be compiled with the exact package versions that the SysFlow processor was compiled with. The easiest way to achieve this is to use the pre-built plugin-builder
Docker image in your build. This option also works for building plugins for deployment with the SysFlow binary packages.
Below is an example of how this can be achieved. Set $TAG to a SysFlow release (>=0.4.0), edge
, or dev
.
First, build the plugin:
docker run --rm \
-v $(pwd)/plugins:/go/src/github.com/sysflow-telemetry/sf-processor/plugins \
-v $(pwd)/resources:/go/src/github.com/sysflow-telemetry/sf-processor/resources \
sysflowtelemetry/plugin-builder:$TAG \
make -C /go/src/github.com/sysflow-telemetry/sf-processor/plugins/actions/example
To test it, run the pre-built processor with the example configuration and trace.
docker run --rm \
-v $(pwd)/plugins:/usr/local/sysflow/plugins \
-v $(pwd)/resources:/usr/local/sysflow/resources \
-w /usr/local/sysflow/bin \
--entrypoint=/usr/local/sysflow/bin/sfprocessor \
sysflowtelemetry/sf-processor:$TAG \
-log=quiet -config=../plugins/actions/example/pipeline.actions.json ../resources/traces/tcp.sf
In the output, observe that all records matching the policy speficied in pipeline.actions.json
are tagged by action now
with the tag now_in_nanos
. For example:
{
"version": 4,
"endts": 0,
"opflags": [
"EXEC"
],
...
"policies": [
{
"id": "Action example",
"desc": "user-defined action example",
"priority": 0
}
],
"tags": [
"now_in_nanos:1645409122055957900"
]
}
Docker usage¶
Documentation and scripts for how to deploy the SysFlow Processor with docker compose can be found in here.
Processor environment¶
As mentioned in a previous section, all custom plugin attributes can be set using the following: <PLUGIN NAME>_<CONFIG ATTRIBUTE NAME>
format. Note that the docker compose file sets several attributes including EXPORTER_TYPE
, EXPORTER_HOST
and EXPORTER_PORT
.
The following are the default locations of the pipeline configuration and plugins directory:
pipeline.json:
/usr/local/sysflow/conf/pipeline.json
drivers dir:
/usr/local/sysflow/resources/drivers
plugins dir:
/usr/local/sysflow/resources/plugins
handler dir:
/usr/local/sysflow/resources/handlers
actions dir:
/usr/local/sysflow/resources/actions
The default configuration can be changed by setting up a virtual mounts mapping the host directories/files into the container using the volumes section of the sf-processor in the docker-compose.yaml.
sf-processor:
container_name: sf-processor
image: sysflowtelemetry/sf-processor:latest
privileged: true
volumes:
...
- ./path/to/my.pipeline.json:/usr/local/sysflow/conf/pipeline.json
The policy location can be overwritten by setting the POLICYENGINE_POLICIES
environment variable, which can point to a policy file or a directory containing policy files (must have yaml extension).
The docker container uses a default filter.yaml
policy that outputs SysFlow records in json. You can use your own policy files from the host by mounting your policy directory into the container as follows, in which the custom pipeline points to the mounted policies.
sf-processor:
container_name: sf-processor
image: sysflowtelemetry/sf-processor:latest
privileged: true
volumes:
...
- ./path/to/my.pipeline.json:/usr/local/sysflow/conf/pipeline.json
- ./path/to/policies/:/usr/local/sysflow/resources/policies/