SysFlow Processor (sf-processor repo)

The SysFlow processor is a lighweight edge analytics pipeline that can process and enrich SysFlow data. The processor is written in golang, and allows users to build and configure various pipelines using a set of built-in and custom plugins and drivers. Pipeline plugins are producer-consumer objects that follow an interface and pass data to one another through pre-defined channels in a multi-threaded environment. By contrast, a driver represents a data source, which pushes data to the plugins. The processor currently supports two builtin drivers, including one that reads sysflow from a file, and another that reads streaming sysflow over a domain socket. Plugins and drivers are configured using a JSON file.

A core built-in plugin is a policy engine that can apply logical rules to filter, alert, or semantically label sysflow records using a declarative language based on the Falco rules syntax with a few added extensions (more on this later).

Custom plugins and drivers can be implemented as dynamic libraries to tailor analytics to specific user requirements.

The endpoint of a pipeline configuration is an exporter plugin that sends the processed data to a target. The processor supports various types of export plugins for a variety of different targets.

Prerequisites

The processor has been tested on Ubuntu/RHEL distributions, but should work on any Linux system.

  • Golang version 1.14+ and make (if buiding from sources)

  • Docker, docker-compose (if building with docker)

Build

Clone the processor repository

git clone https://github.com/sysflow-telemetry/sf-processor.git

Build locally, from sources

cd sf-processor
make build

Build with docker

cd sf-processor
make docker-build

Usage

For usage information, type:

cd driver/
./sfprocessor -help

This should yield the following usage statement:

Usage: sfprocessor [[-version]|[-driver <value>] [-log <value>] [-driverdir <value>] [-plugdir <value>] path]
Positional arguments:
  path string
        Input path
Arguments:
  -config string
        Path to pipeline configuration file (default "pipeline.json")
  -cpuprofile file
        Write cpu profile to file
  -driver string
        Driver name {file|socket|<custom>} (default "file")
  -driverdir string
        Dynamic driver directory (default "../resources/drivers")
  -log string
        Log level {trace|info|warn|error} (default "info")
  -memprofile file
        Write memory profile to file
  -plugdir string
        Dynamic plugins directory (default "../resources/plugins")
  -test
        Test pipeline configuration
  -traceprofile file
        Write trace profile to file
  -version
        Output version information

The four most important flags are config, driverdir, plugdir, and driver. The config flag points to a pipeline configuration file, which describes the entire pipeline and settings for the individual settings for the plugins. The driverdir and plugdir flags specify where any dynamic drivers and plugins shared libraries reside that should be loaded by the processor at runtime. The driver flag accepts a label to a pre-configured driver (either built-in or custom) that will be used as the data source to the pipeline. Currently, the pipeline only supports one driver at a time, but we anticipate handling multiple drivers in the future. There are two built-in drivers:

  • file: loads a sysflow file reading driver that reads from path.

  • socket: the processor loads a sysflow streaming driver. The driver creates a domain socket named path and acts as a server waiting for a SysFlow collector to attach and send sysflow data.

Pipeline Configuration

The pipeline configuration below shows how to configure a pipeline that will read a sysflow stream and push records to the policy engine, which will trigger alerts using a set of runtime policies stored in a yaml file. An example pipeline with this configuration looks as follows:

{
  "pipeline":[
    {
     "processor": "sysflowreader",
     "handler": "flattener",
     "in": "sysflow sysflowchan",
     "out": "flat flattenerchan"
    },
    {
     "processor": "policyengine",
     "in": "flat flattenerchan",
     "out": "evt eventchan",
     "policies": "../resources/policies/runtimeintegrity"
    },
    {
     "processor": "exporter",
     "in": "evt eventchan",
     "export": "syslog",
     "proto": "tcp",
     "tag": "sysflow",
     "host": "localhost",
     "port": "514"
    }
  ]
}

Note: This configuration can be found in: sf-collector/resources/pipelines/pipeline.syslog.json

This pipeline specifies three built-in plugins:

  • sysflowreader: is a generic reader plugin that ingests sysflow from the driver, caches entities, and presents sysflow objects to a handler object (i.e., an object that implements the handler interface) for processing. In this case, we are using the flattener handler, but custom handlers are possible.

  • policyengine: is the policy engine, which takes flattened (row-oriented) SysFlow records as input and outputs records, which represent alerts, or filtered sysflow records depending on the policy engine’s mode (more on this later).

  • exporter: takes records from the policy engine, and exports them to ElasticSearch, syslog, file, or terminal, in a JSON format or in Elastic Common Schema (ECS) format. Note that custom export plugins can be created to export to other serialization formats and transport protocols.

Each plugin has a set of general attributes that are present in all plugins, and a set of attributes that are custom to the specific plugins. For more details on the specific attributes in this example, see the pipeline configuration template

The general attributes are as follows:

  • processor (required): the name of the processor plugin to load. Processors must implement the SFProcessor interface; the name is the value that must be returned from the GetName() function as defined in the processor object.

  • handler (optional): the name of the handler object to be used for the processor. Handlers must implement the SFHandler interface.

  • in (required): the input channel (i.e. golang channel) of objects that are passed to the plugin.

  • out (optional): the output channel (i.e. golang channel) for objects that are pushed out of the plugin, and into the next plugin in the pipeline sequence.

Channels are modeled as channel objects that have an In attribute representing some golang channel of objects. See SFChannel for an example. The syntax for a channel in the pipeline is [channel name] [channel type]. Where channel type is the label given to the channel type at plugin registration (more on this later), and channel name is a unique identifier for the current channel instance. The name and type of an output channel in one plugin must match that of the name and type of the input channel of the next plugin in the pipeline sequence.

A plugin has exacly one input channel but it may specify more than one output channels. This allows pipeline definitions that fan out data to more than one receiver plugin similar to a Unix tee command. While there must be always one SysFlow reader acting as the entry point of a pipeline, a pipeline configuration may specify policy engines passing data to different exporters or a SysFlow reader passing data to different policy engines. Generally, pipelines form a tree rather being a linear structure.

Policy engine configuration

The policy engine ("processor": "policyengine") plugin is driven by a set of rules. These rules are specified in a YAML which adopts the same syntax as the rules of the [Falco](https://falco.org/docs/rules] project. A policy engine plugin specification requires the following attributes:

  • policies (required): The path to the YAML rules specification file. More information on rules can be found in the Rules section.

  • mode (optional): The mode of the polcy engine. Allowed values are alert for generating rule-based alerts, filter for rule-based filtering of SysFlow events, and bypasss for unchnanged pass-on of raw syflow events. Default value ist alert. If mode is bypass the policyengine attribute can be omitted.

Exporter configuration

An exporter ("processor": "exporter") plugin consists of two modules, an encoder for converting the data to a suitable format, and a transport module for sending the data to the target. Encoders target specific, i.e. for a particular export target a particular set of encoders may be used. In the exporter configuration the transport module is specified via the export paramater (required). The encoder is selected via the format parameter (optional). The default format is json.

The following table lists the cuurently supported exporter modules and the corresponding encoders. Additional encoders and transport modules can be implemented if need arises. If you plan to contribute or want to get involved in the discussion please join the SysFlow community.

Some of these combinations require additional configuration as described in the following sections. null is used for debugging the processor and doesn’t export any data.

Export to file

If export is set to file, an additional parameter file.path allows the specification of the target file.

Export to syslog

If the export parameter is set to syslog, output to syslog is enabled and the following addtional parameters are used:

  • syslog.proto (optional): The protocol used for communicating with the syslog server. Allows values are tcp, tls and udp. Default is tcp.

  • syslog.tag (optional): The tag used for each Sysflow record in syslog. Default is SysFlow.

  • syslog.source (optional): If set adds a hostname to the syslog header.

  • syslog.host (optional): The hostname of the sysflow server. Default is localhost.

  • syslog.port (optional): The port pf the syslow server. Default is 514.

Export to ElasticSearch

Export to ElasticSearch is enabled by setting the config parameter export to es. The only supported format for export to ElasticSearch is ecs.

Data export is done via bulk ingestion. The ingestion can be controlled by some additional parameters which are read when the es export target is selected. Required parameters specify the ES target, index and credentials. Optional parameters control some aspects of the behavior of the bulk ingestion and may have an effect on performance. You may need to adapt their valuesfor optimal performance in your environment.

  • es.addresses (required): A comma-separated list of ES endpoints.

  • es.index (required): The name of the ES index to ingest into.

  • es.username (required): The ES username.

  • es.password (required): The password for the specified ES user.

  • buffer (optional) The bulk size as the number of records to be ingested at once. Default is 0 but value of 0 indicates record-by-record ingestion which may be highly inefficient.

  • es.bulk.numWorkers (optional): The number of ingestion workers used in parallel. Default is 0 which means that the exporter uses as many workers as there are cores in the machine.

  • es.bulk.flashBuffer (optional): The size in bytes of the flush buffer for ingestion. It should be large enough to hold one bulk (the number of records specified in buffer), otherwise the bulk is broken into smaller chunks. Default is 5e+6.

  • es.bulk.flushTimeout (optional): The flush buffer time threshold. Valid values are golang duration strings. Default is 30s.

The Elastic exporter does not make any assumption on the existence or configuration of the index specified in es.index. If the index does not exist, Elastic will automatically create it and apply a default dynamic mapping. It may be beneficial to use an explicit mapping for the ECS data generated by the Elastic exporter. For convinience we provide an explicit mapping for creating a new tailored index in Elastic. For more information refer to the Elastic Mapping reference.

Export fo IBM Findings API (IBM Cloud Security & Compliance Center)

Export to IBM Findings API allows adding custom findings to the IBM Cloud Security & Compliance Center (SCC). The mode is enabled via setting the configuration parameter export to findings. The format parameter must be set to occurence in this case. For export to IBM Findings, the following parameters are used:

  • findings.apikey (required): The API key used for the Advisor service instance.

  • findings.url (required): The URL of the Advisor service instance.

  • findings.accountid (required): The acccount ID used for the Advisor service instance.

  • findings.provider (required): Unique ID of the note provider

  • findings.region (required): The cloud region of Advisor service instance.

  • findings.sqlqueryurl (required):

  • findings.sqlquerycrn (required):

  • findings.s3region (required):

  • findings.s3bucket (required):

  • findings.path (required):

  • findings.pool.capacity (optional): The capacity of the findings pool, Default is 250.

  • findings.pool.maxage (woptional): The maximum age of the security findings in the pool in minutes. Default is 1440.

For more information about inserting custom findings into IBM SCC, refer to Custom Findings section of IBM Cloud Security Advisor.

Override plugin configuration attributes with environment variables

It is possible to override any of the custom attributes of a plugin using an environment variable. This is especially useful when operating the processor as a container, where you may have to deploy the processor to multiple nodes, and have attributes that change per node. If an environment variable is set, it overrides the setting inside the config file. The environment variables must follow the following structure:

  • Environment variables must follow the naming schema <PLUGIN NAME>_<CONFIG ATTRIBUTE NAME>

  • The plugin name inside the pipeline configuration file must be all lower case.

For example, to set the alert mode inside the policy engine, the following environment variable is set:

export POLICYENGINE_MODE=alert

To set the syslog values for the exporter:

export EXPORTER_TYPE=telemetry
export EXPORTER_SOURCE=${HOSTNAME}
export EXPORTER_EXPORT=syslog
export EXPORTER_HOST=192.168.2.10
export EXPORTER_PORT=514

If running as a docker container, environment variables can be passed with the docker run command:

docker run
-e EXPORTER_TYPE=telemetry \
-e EXPORTER_SOURCE=${HOSTNAME} \
-e EXPORTER_EXPORT=syslog \
-e EXPORTER_HOST=192.168.2.10 \
-e EXPORTER_PORT=514
...

Writing runtime policies

The policy engine adopts and extends the Falco rules definition syntax. Before reading the rest of this section, please go through the Falco Rules documentation to get familiar with rule, macro, and list syntax, all of which are supported in our policy engine. Policies are written in one or more yaml files, and stored in a directory specified in the pipeline configuration file under the policies attribute of the policy engine plugin.

Rules contain the following fields:

  • rule: the name of the rule

  • description: a textual description of the rule

  • condition: a set of logical operations that can reference lists and macros, which when evaluating to true, can trigger an alert (when processor is in alert mode), or filter a sysflow record (when processor is in filter mode)

  • action: a list of actions to take place when the rule evaluates to true. Actions can be any of the following (note: new actions will be added in the future):

    • alert: processor outputs an alert

    • tag: enriches or tags the sysflow record with the labels in the tags field. This can be useful for semantically labeling of records with TTPs for example.

  • priority: label representing the severity of the alert can be: (1) low, medium, or high, or (2) emergency, alert, critical, error, warning, notice, informational, debug.

  • tags (optional): set of labels appended to alert (default: empty).

  • prefilter (optional): list of record types (sf.type) to whitelist before applying rule condition (default: empty).

  • enabled (optional): indicates whether the rule is enabled (default: true).

Macros are named conditions and contain the following fields:

  • macro: the name of the macro

  • condition: a set of logical operations that can reference lists and macros, which evaluate to true or false

Lists are named collections and contain the following fields:

  • list: the name of the list

  • items: a collection of values or lists

Filters blacklist records matching a condition:

  • filter: the name of the filter

  • condition: a set of logical operations that can reference lists and macros, which evaluate to true or false

For example, the rule below specifies that matching records are process events (sf.type = PE), denoting EXEC operations (sf.opflags = EXEC) for which the process matches macro package_installers. Additionally, the gloabl filter containers preempitively removes from the processing stream any records for processes not running in a container environment.

# lists
- list: rpm_binaries
  items: [dnf, rpm, rpmkey, yum, '"75-system-updat"', rhsmcertd-worke, subscription-ma,
          repoquery, rpmkeys, rpmq, yum-cron, yum-config-mana, yum-debug-dump,
          abrt-action-sav, rpmdb_stat, microdnf, rhn_check, yumdb]

- list: deb_binaries
  items: [dpkg, dpkg-preconfigu, dpkg-reconfigur, dpkg-divert, apt, apt-get, aptitude,
    frontend, preinst, add-apt-reposit, apt-auto-remova, apt-key,
    apt-listchanges, unattended-upgr, apt-add-reposit]

- list: package_mgmt_binaries
  items: [rpm_binaries, deb_binaries, update-alternat, gem, pip, pip3, sane-utils.post, alternatives, chef-client]

# macros
- macro: package_installers
  condition: sf.proc.name pmatch (package_mgmt_binaries)

# global filters (blacklisting)
- filter: containers
  condition: sf.container.type = host

# rule definitions
- rule: Package installer detected
  desc: Use of package installer detected
  condition:  sf.opflags = EXEC and package_installers
  action: [alert]
  priority: medium
  tags: [actionable-offense, suspicious-process]
  prefilter: [PE] # record types for which this rule should be applied (whitelisting)
  enabled: true

The following table shows a detailed list of attribute names supported by the policy engine, as well as their type, and comparative Falco attribute name. Our policy engine supports both SysFlow and Falco attribute naming convention to enable reuse of policies across the two frameworks.

Attributes

Description

Values

Falco Attribute

sf.type

Record type

PE,PF,NF,FF,FE

N/A

sf.opflags

Operation flags

Operation Flags List: remove OP_ prefix

evt.type (remapped as falco event types)

sf.ret

Return code

int

evt.res

sf.ts

start timestamp(ns)

int64

evt.time

sf.endts

end timestamp(ns)

int64

N/A

sf.proc.pid

Process PID

int64

proc.pid

sf.proc.tid

Thread PID

int64

thread.tid

sf.proc.uid

Process user ID

int

user.uid

sf.proc.user

Process user name

string

user.name

sf.proc.gid

Process group ID

int

group.gid

sf.proc.group

Process group name

string

group.name

sf.proc.apid

Proc ancestors PIDs (qo)

int64

proc.apid

sf.proc.aname

Proc anctrs names (qo) (exclude path)

string

proc.aname

sf.proc.exe

Process command/filename (with path)

string

proc.exe

sf.proc.args

Process command arguments

string

proc.args

sf.proc.name

Process name (qo) (exclude path)

string

proc.name

sf.proc.cmdline

Process command line (qo)

string

proc.cmdline

sf.proc.tty

Process TTY status

boolean

proc.tty

sf.proc.entry

Process container entrypoint

bool

proc.vpid == 1

sf.proc.createts

Process creation timestamp (ns)

int64

N/A

sf.pproc.pid

Parent process ID

int64

proc.ppid

sf.pproc.gid

Parent process group ID

int64

N/A

sf.pproc.uid

Parent process user ID

int64

N/A

sf.pproc.group

Parent process group name

string

N/A

sf.pproc.tty

Parent process TTY status

bool

N/A

sf.pproc.entry

Parent process container entry

bool

N/A

sf.pproc.user

Parent process user name

string

N/A

sf.pproc.exe

Parent process command/filename

string

N/A

sf.pproc.args

Parent process command arguments

string

N/A

sf.pproc.name

Parent process name (qo) (no path)

string

proc.pname

sf.pproc.cmdline

Parent process command line (qo)

string

proc.pcmdline

sf.pproc.createts

Parent process creation timestamp

int64

N/A

sf.file.fd

File descriptor number

int

fd.num

sf.file.path

File path

string

fd.name

sf.file.newpath

New file path (used in some FileEvents)

string

N/A

sf.file.name

File name (qo)

string

fd.filename

sf.file.directory

File directory (qo)

string

fd.directory

sf.file.type

File type

char ‘f’: file, 4: IPv4, 6: IPv6, ‘u’: unix socket, ‘p’: pipe, ‘e’: eventfd, ‘s’: signalfd, ‘l’: eventpoll, ‘i’: inotify, ‘o’: unknown.

fd.typechar

sf.file.is_open_write

File open with write flag (qo)

bool

evt.is_open_write

sf.file.is_open_read

File open with read flag (qo)

bool

evt.is_open_read

sf.file.openflags

File open flags

int

evt.args

sf.net.proto

Network protocol

int

fd.l4proto

sf.net.sport

Source port

int

fd.sport

sf.net.dport

Destination port

int

fd.dport

sf.net.port

Src or Dst port (qo)

int

fd.port

sf.net.sip

Source IP

int

fd.sip

sf.net.dip

Destination IP

int

fd.dip

sf.net.ip

Src or dst IP (qo)

int

fd.ip

sf.res

File or network resource

string

fd.name

sf.flow.rbytes

Flow bytes read/received

int64

evt.res

sf.flow.rops

Flow operations read/received

int64

N/A

sf.flow.wbytes

Flow bytes written/sent

int64

evt.res

sf.flow.wops

Flow bytes written/sent

int64

N/A

sf.container.id

Container ID

string

container.id

sf.container.name

Container name

string

container.name

sf.container.image.id

Container image ID

string

container.image.id

sf.container.image

Container image name

string

container.image

sf.container.type

Container type

CT_DOCKER, CT_LXC, CT_LIBVIRT_LXC, CT_MESOS, CT_RKT, CT_CUSTOM, CT_CRI, CT_CONTAINERD, CT_CRIO, CT_BPM

container.type

sf.container.privileged

Container privilege status

bool

container.privileged

sf.node.id

Node identifier

string

N/A

sf.node.ip

Node IP address

string

N/A

sf.schema.version

SysFlow schema version

string

N/A

sf.version

SysFlow JSON schema version

int

N/A

The policy language supports the following operations:

Operation

Description

Example

A and B

Returns true if both statements are true

sf.pproc.name=bash and sf.pproc.cmdline contains echo

A or B

Returns true if one of the statements are true

sf.file.path = “/etc/passwd” or sf.file.path = “/etc/shadow”

not A

Returns true if the statement isn’t true

not sf.pproc.exe = /usr/local/sbin/runc

A = B

Returns true if A exactly matches B. Note, if B is a list, A only has to exact match one element of the list. If B is a list, it must be explicit. It cannot be a variable. If B is a variable use in instead.

sf.file.path = [“/etc/passwd”, “/etc/shadow”]

A != B

Returns true if A is not equal to B. Note, if B is a list, A only has to be not equal to one element of the list. If B is a list, it must be explicit. It cannot be a variable.

sf.file.path != “/etc/passwd”

A < B

Returns true if A is less than B. Note, if B is a list, A only has to be less than one element in the list. If B is a list, it must be explicit. It cannot be a variable.

sf.flow.wops < 1000

A <= B

Returns true if A is less than or equal to B. Note, if B is a list, A only has to be less than or equal to one element in the list. If B is a list, it must be explicit. It cannot be a variable.

sf.flow.wops <= 1000

A > B

Returns true if A is greater than B. Note, if B is a list, A only has to be greater than one element in the list. If B is a list, it must be explicit. It cannot be a variable.

sf.flow.wops > 1000

A >= B

Returns true if A is greater than or equal to B. Note, if B is a list, A only has to be greater than or equal to one element in the list. If B is a list, it must be explicit. It cannot be a variable.

sf.flow.wops >= 1000

A in B

Returns true if value A is an exact match to one of the elements in list B. Note: B must be a list. Note: () can be used on B to merge multiple list objects into one list.

sf.proc.exe in (bin_binaries, usr_bin_binaries)

A startswith B

Returns true if string A starts with string B

sf.file.path startswith ‘/home’

A endswith B

Returns true if string A ends with string B

sf.file.path endswith ‘.json’

A contains B

Returns true if string A contains string B

sf.pproc.name=java and sf.pproc.cmdline contains org.apache.hadoop

A icontains B

Returns true if string A contains string B ignoring capitalization

sf.pproc.name=java and sf.pproc.cmdline icontains org.apache.hadooP

A pmatch B

Returns true if string A partial matches one of the elements in B. Note: B must be a list. Note: () can be used on B to merge multiple list objects into one list.

sf.proc.name pmatch (modify_passwd_binaries, verify_passwd_binaries, user_util_binaries)

exists A

Checks if A is not a zero value (i.e. 0 for int, “” for string)

exists sf.file.path

See the resources policies directory in github for examples. Feel free to contribute new and interesting rules through a github pull request.

Write a simple processing plugin

In addition to its core plugins, the processor also supports custom plugins that can be dynamically loaded into the processor via a compiled golang shared library using the golang plugin package. Custom plugins enables easy extension of the processor and the creation of custom pipelines tailored to specific use cases.

A dynamic plugin example is provided in github. The core of the plugin is building an object that implements an SFProcessor interface. Such an implementation looks as follows:

package main

import (
    "sync"

    "github.com/sysflow-telemetry/sf-apis/go/logger"
    "github.com/sysflow-telemetry/sf-apis/go/plugins"
    "github.com/sysflow-telemetry/sf-processor/core/flattener"
)

const (
    pluginName string = "example"
)

// Plugin exports a symbol for this plugin.
var Plugin Example

// Example defines an example plugin.
type Example struct{}

// NewExample creates a new plugin instance.
func NewExample() plugins.SFProcessor {
    return new(Example)
}

// GetName returns the plugin name.
func (s *Example) GetName() string {
    return pluginName
}

// Init initializes the plugin with a configuration map.
func (s *Example) Init(conf map[string]string) error {
    return nil
}

// Register registers plugin to plugin cache.
func (s *Example) Register(pc plugins.SFPluginCache) {
    pc.AddProcessor(pluginName, NewExample)
}

// Process implements the main interface of the plugin.
func (s *Example) Process(ch interface{}, wg *sync.WaitGroup) {
    cha := ch.(*flattener.FlatChannel)
    record := cha.In
    logger.Trace.Println("Example channel capacity:", cap(record))
    defer wg.Done()
    logger.Trace.Println("Starting Example")
    for {
        fc, ok := <-record
        if !ok {
            logger.Trace.Println("Channel closed. Shutting down.")
            break
        }
        logger.Info.Println(fc)
    }
    logger.Trace.Println("Exiting Example")
}

// SetOutChan sets the output channel of the plugin.
func (s *Example) SetOutChan(ch interface{}) {}

// Cleanup tears down plugin resources.
func (s *Example) Cleanup() {}

// This function is not run when module is used as a plugin.
func main() {}

The object must implement the following interface:

  • GetName() - returns a lowercase string representing the plugin’s label. This label is important, because it identifies the plugin in the pipeline.json file, enabling the processor to load the plugin. In the object above, this plugin is called example. Note that the label must be unique.

  • Init(config map[string]string) error - used to initialize the plugin. The configuration map that is passed to the function stores all the configuration information defined in the plugin’s definition inside pipeline.json (more on this later).

  • Register(pc plugins.SFPluginCache) - this registers the plugin with the plugin cache of the processor.

    • pc.AddProcessor(pluginName, <plugin constructor function>) (required) - registers the plugin named example with the processor. You must define a constructor function using the convention New<PluginName> which is used to instantiate the plugin, and returns it as an SFProcessor interface - see NewExample for an example.

    • pc.AddChannel(channelName, <output channel constructor function>) (optional) - if your plugin is using a custom output channel of objects (i.e., the channel used to pass output objects from this plugin to the next in the pipeline), it should be included in this plugin.

      • The channelName should be a lowercase unique label defining the channel type.

      • The constructor function should return a golang interface{} representing an object that as an In attribute of type chan <ObjectToBePassed>. We will call this object, a wrapped channel object going forward. For example, the channel object that passes sysflow objects is called SFChannel, and is defined here

      • For a complete example of defining an output channel, see NewFlattenerChan in the flattener as well as the Register function. The FlatChannel is defined here

  • Process(ch interface{}, wg *sync.WaitGroup) - this function is launched by the processor as a go thread and is where the main plugin processing occurs. It takes a wrapped channel object, which acts as the input data source to the plugin (i.e., this is the channel that is configured as the input channel to the plugin in the pipeline.json). It also takes a sync.WaitGroup object, which is used to signal to the processor when the plugin has completed running (see defer wg.Done() in code). The processor must loop on the input channel, and do its analysis on each input record. In this case, the example plugin is reading flat records and printing them to the screen.

  • SetOutChan(ch interface{}) - sets the wrapped channel that will serve as the output channel for the plugin. The output channel is instantiated by the processor, which is also in charge of stitching the plugins together. If the plugin is the last one in the chain, then this function can be left empty. See the SetOutputChan function in the flattener to see how an output channel is implemented.

  • Cleanup() - Used to cleanup any resources. This function is called by the processor after the plugin Process function exits. One of the key items to close in the Cleanup function is the output channel using the golang close() function. Closing the output channel enables the pipeline to be torn down gracefully and in sequence.

  • main(){} - this main method is not used by the plugin or processor. It’s required by golang in order to be able to compile as a shared object.

To compile the example plugin, use the provided Makefile:

cd plugins/example
make

This will build the plugin and copy it into resources/plugins/.

To use the new plugin, use the configuration provided in github, which defines the following pipeline:

{
   "pipeline":[
     {
      "processor": "sysflowreader",
      "handler": "flattener",
      "in": "sysflow sysflowchan",
      "out": "flat flattenerchan"
     },
     {
      "processor": "example",
      "in": "flat flattenerchan"
     }
   ]
}

This pipeline contains two plugins:

  • The builtin sysflowReader plugin with flattener handler, which takes raw sysflow objects, and flattens them

    into arrays of integers and strings for easier processing in certain plugins like the policy engine.

  • The example plugin, which takes the flattened output from the sysflowreader plugin, and prints it the screen.

The key item to note is that the output channel (i.e., out) of sysflowreader matches the input channel (i.e., in) of the example plugin. This ensures that the plugins will be properly stitched together.

To run the example pipeline:

cd driver
./sfprocessor -config ../plugins/example/pipeline.example.json -plugdir ../resources/plugins/  ../resources/traces/mon.1531776712.sf

Deploy collector and processor using docker

The following docker-compose file illustrates how to deploy the processor and the collector as containers.

version: "3.5"
services:
  sf-processor:
    container_name: sf-processor
    image: sysflowtelemetry/sf-processor:latest
    privileged: false
    volumes:
      - socket-vol:/sock/
    environment:
      DRIVER: socket
      INPUT_PATH: /sock/sysflow.sock
      POLICYENGINE_MODE: alert
      EXPORTER_TYPE: telemetry
      EXPORTER_SOURCE: sysflow
      EXPORTER_EXPORT: syslog
      EXPORTER_HOST: <IP address of the syslog server>
      EXPORTER_PORT: 514
  sf-collector:
    container_name: sf-collector
    image: sysflowtelemetry/sf-collector:latest
    depends_on:
      - "sf-processor"
    privileged: true
    volumes:
      - /var/run/docker.sock:/host/var/run/docker.sock
      - /dev:/host/dev
      - /proc:/host/proc:ro
      - /boot:/host/boot:ro
      - /lib/modules:/host/lib/modules:ro
      - /usr:/host/usr:ro
      - /mnt/data:/mnt/data
      - socket-vol:/sock/
      - ./resources/traces:/tests/traces
    environment:
      EXPORTER_ID: ${HOSTNAME}
      NODE_IP: <Host IP address>
      FILTER: "container.name!=sf-collector and container.name!=sf-processor"
      INTERVAL: 300
      SOCK_FILE: /sock/sysflow.sock
volumes:
  socket-vol:

Setting up the collector environment

The key setting in the collector portion of the file is the FILTER variable. Since the collector is built atop the sysdig core, it uses the sysdig filtering mechanism described here and can support all the sysdig attributes described there in case you want to filter on specific containers, processes, operations, etc. One of the more powerful filters is the container.type!=host filter, which limits collection only to container monitoring. If you want to monitor the entire host, simply remove the container.type operation from the filter.

Setting up the processor environment

As mentioned in a previous section, all custom plugin attributes can be set using the following: <PLUGIN NAME>_<CONFIG ATTRIBUTE NAME> format. Note that the docker compose file sets several attributes including EXPORTER_TYPE, EXPORTER_HOST and EXPORTER_PORT. Note that EXPORTER_SOURCE is set to the bash environment variable ${HOSTNAME}. HOSTNAME must be explicitly exported before launching docker compose in order to be picked up.

export HOSTNAME
docker-compose up

The following are the default locations of the pipeline configuration and plugins directory:

  • pipeline.json - /usr/local/sysflow/conf/pipeline.json

  • plugins dir - /usr/local/sysflow/resources/plugins

We can overwrite these particular files/dirs in the docker container with those on the host by setting up a virtual mounts mapping the host directories/files into the container using the volumes section of the sf-processor in the docker-compose.yaml.

sf-processor:
    container_name: sf-processor
    image: sysflowtelemetry/sf-processor:latest
    privileged: true
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - socket-vol:/sock/
      - ./resources/pipelines/pipeline.runtimeintegrity.json:/usr/local/sysflow/conf/pipeline.json
      - ./resources/plugins:/usr/local/sysflow/resources/plugins

If using the policy engine, the policy folder defaults to the following location in the container:

/usr/local/sysflow/resources/policies/

This location can be overwritten by setting the POLICYENGINE_POLICIES environment variable.

The docker container uses a default filter.yaml policy that outputs sysflow records in json. You can use your own policy files from the host by mounting your policy directory into the container as follows:

sf-processor:
    container_name: sf-processor
    image: sysflowtelemetry/sf-processor:latest
    privileged: true
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - socket-vol:/sock/
      - ./resources/policies/runtimeintegrity/:/usr/local/sysflow/resources/policies/