SysFlow Processor (sf-processor repo)

The SysFlow processor is a lighweight edge analytics pipeline that can process and enrich SysFlow data. The processor is written in golang, and allows users to build and configure various pipelines using a set of built-in and custom plugins and drivers. Pipeline plugins are producer-consumer objects that follow an interface and pass data to one another through pre-defined channels in a multi-threaded environment. By contrast, a driver represents a data source, which pushes data to the plugins. The processor currently supports two builtin drivers, including one that reads sysflow from a file, and another that reads streaming sysflow over a domain socket. Plugins and drivers are configured using a JSON file.

A core built-in plugin is a policy engine that can apply logical rules to filter, alert, or semantically label sysflow records using a declarative language based on the Falco rules syntax with a few added extensions (more on this later).

Custom plugins and drivers can be implemented as dynamic libraries to tailor analytics to specific user requirements.

The endpoint of a pipeline configuration is an exporter plugin that sends the processed data to a target. The processor supports various types of export plugins for a variety of different targets.


The processor has been tested on Ubuntu/RHEL distributions, but should work on any Linux system.

  • Golang version 1.17+ and make (if building from sources)

  • Docker, docker-compose (if building with docker)


Clone the processor repository

git clone

Build locally, from sources

cd sf-processor
make build

Build with docker

cd sf-processor
make docker-build


For usage information, type:

cd driver/
./sfprocessor -help

This should yield the following usage statement:

Usage: sfprocessor [[-version]|[-driver <value>] [-log <value>] [-driverdir <value>] [-plugdir <value>] path]
Positional arguments:
  path string
        Input path
  -config string
        Path to pipeline configuration file (default "pipeline.json")
  -cpuprofile file
        Write cpu profile to file
  -driver string
        Driver name {file|socket|<custom>} (default "file")
  -driverdir string
        Dynamic driver directory (default "../resources/drivers")
  -log string
        Log level {trace|info|warn|error} (default "info")
  -memprofile file
        Write memory profile to file
  -plugdir string
        Dynamic plugins directory (default "../resources/plugins")
        Test pipeline configuration
  -traceprofile file
        Write trace profile to file
        Output version information

The four most important flags are config, driverdir, plugdir, and driver. The config flag points to a pipeline configuration file, which describes the entire pipeline and settings for the individual settings for the plugins. The driverdir and plugdir flags specify where any dynamic drivers and plugins shared libraries reside that should be loaded by the processor at runtime. The driver flag accepts a label to a pre-configured driver (either built-in or custom) that will be used as the data source to the pipeline. Currently, the pipeline only supports one driver at a time, but we anticipate handling multiple drivers in the future. There are two built-in drivers:

  • file: loads a sysflow file reading driver that reads from path.

  • socket: the processor loads a sysflow streaming driver. The driver creates a domain socket named path and acts as a server waiting for a SysFlow collector to attach and send sysflow data.


The pipeline configuration below shows how to configure a pipeline that will read a sysflow stream and push records to the policy engine, which will trigger alerts using a set of runtime policies stored in a yaml file. An example pipeline with this configuration looks as follows:

     "processor": "sysflowreader",
     "handler": "flattener",
     "in": "sysflow sysflowchan",
     "out": "flat flattenerchan"
     "processor": "policyengine",
     "in": "flat flattenerchan",
     "out": "evt eventchan",
     "policies": "../resources/policies/runtimeintegrity"
     "processor": "exporter",
     "in": "evt eventchan",
     "export": "syslog",
     "proto": "tcp",
     "tag": "sysflow",
     "host": "localhost",
     "port": "514"

NOTE: This configuration can be found in: sf-collector/resources/pipelines/pipeline.syslog.json

This pipeline specifies three built-in plugins:

  • sysflowreader: is a generic reader plugin that ingests sysflow from the driver, caches entities, and presents sysflow objects to a handler object (i.e., an object that implements the handler interface) for processing. In this case, we are using the flattener handler, but custom handlers are possible.

  • policyengine: is the policy engine, which takes flattened (row-oriented) SysFlow records as input and outputs records, which represent alerts, or filtered sysflow records depending on the policy engine’s mode (more on this later).

  • exporter: takes records from the policy engine, and exports them to ElasticSearch, syslog, file, or terminal, in a JSON format or in Elastic Common Schema (ECS) format. Note that custom export plugins can be created to export to other serialization formats and transport protocols.

Each plugin has a set of general attributes that are present in all plugins, and a set of attributes that are custom to the specific plugins. For more details on the specific attributes in this example, see the pipeline configuration template

The general attributes are as follows:

  • processor (required): the name of the processor plugin to load. Processors must implement the SFProcessor interface; the name is the value that must be returned from the GetName() function as defined in the processor object.

  • handler (optional): the name of the handler object to be used for the processor. Handlers must implement the SFHandler interface.

  • in (required): the input channel (i.e. golang channel) of objects that are passed to the plugin.

  • out (optional): the output channel (i.e. golang channel) for objects that are pushed out of the plugin, and into the next plugin in the pipeline sequence.

Channels are modelled as channel objects that have an In attribute representing some golang channel of objects. See SFChannel for an example. The syntax for a channel in the pipeline is [channel name] [channel type]. Where channel type is the label given to the channel type at plugin registration (more on this later), and channel name is a unique identifier for the current channel instance. The name and type of an output channel in one plugin must match that of the name and type of the input channel of the next plugin in the pipeline sequence.

NOTE: A plugin has exacly one input channel but it may specify more than one output channels. This allows pipeline definitions that fan out data to more than one receiver plugin similar to a Unix tee command. While there must be always one SysFlow reader acting as the entry point of a pipeline, a pipeline configuration may specify policy engines passing data to different exporters or a SysFlow reader passing data to different policy engines. Generally, pipelines form a tree rather being a linear structure.

Policy engine configuration

The policy engine ("processor": "policyengine") plugin is driven by a set of rules. These rules are specified in a YAML file which adopts the same syntax as the rules of the Falco project. A policy engine plugin specification may have the following attributes:

  • policies (required for alert mode`): The path to the YAML rules specification file. More information on rules can be found in the Policies section.

  • mode (optional): The mode of the policy engine. Allowed values are:

    • alert (default): the policy engine generates rule-based alerts; alert is a blocking mode that drops all records that do not match any given rule. If no mode is specified, the policy engine runs in alert mode by default.

    • enrich for enriching records with additional context from the rule. In contrast to alert, this is a non-blocking mode which applies tagging and action enrichments to matching records as defined in the policy file. Non-matching records are passed on “as is”.

  • monitor (optional): Specifies if changes to the policy file(s) should be monitored and updated in the policy engine.

    • none (default): no monitor is used.

    • local: the processor will monitor for changes in the policies path and update its rule set if changes are detected.

  • monitor.interval (optional): The interval in seconds for updating policies, if a monitor is used. (default: 30 seconds).

  • concurrency (optional); The number of concurrent threads for record processing. (default: 5).

  • actiondir (optional): The path of the directory containing the shared object files for user-defined action plugins. See the section on User-defined Actions for more information.

NOTE: Prior to release 0.4.0, the mode attribute accepted different values with different semantics. To preserve the behavior of older releases:

  • For old alert behavior, use enrich mode.

  • For old filter behavior, use enrich mode and a policy file with filter rules only.

  • For old bypass behavior, use enrich and drop the policies key from the configuration.

Exporter configuration

An exporter ("processor": "exporter") plugin consists of two modules, an encoder for converting the data to a suitable format, and a transport module for sending the data to the target. Encoders target specific, i.e. for a particular export target a particular set of encoders may be used. In the exporter configuration the transport module is specified via the export parameter (required). The encoder is selected via the format parameter (optional). The default format is json.

The following table lists the currently supported exporter modules and the corresponding encoders. Additional encoders and transport modules can be implemented if need arises. If you plan to contribute or want to get involved in the discussion please join the SysFlow community.

Some of these combinations require additional configuration as described in the following sections. null is used for debugging the processor and doesn’t export any data.


If export is set to file, an additional parameter file.path allows the specification of the target file.


If the export parameter is set to syslog, output to syslog is enabled and the following addtional parameters are used:

  • syslog.proto (optional): The protocol used for communicating with the syslog server. Allows values are tcp, tls and udp. Default is tcp.

  • syslog.tag (optional): The tag used for each Sysflow record in syslog. Default is SysFlow.

  • syslog.source (optional): If set adds a hostname to the syslog header.

  • (optional): The hostname of the sysflow server. Default is localhost.

  • syslog.port (optional): The port of the syslow server. Default is 514.


Export to ElasticSearch is enabled by setting the config parameter export to es. The only supported format for export to ElasticSearch is ecs.

Data export is done via bulk ingestion. The ingestion can be controlled by some additional parameters which are read when the es export target is selected. Required parameters specify the ES target, index and credentials. Optional parameters control some aspects of the behavior of the bulk ingestion and may have an effect on performance. You may need to adapt their valuesfor optimal performance in your environment.

  • es.addresses (required): A comma-separated list of ES endpoints.

  • es.index (required): The name of the ES index to ingest into.

  • es.username (required): The ES username.

  • es.password (required): The password for the specified ES user.

  • buffer (optional) The bulk size as the number of records to be ingested at once. Default is 0 but value of 0 indicates record-by-record ingestion which may be highly inefficient.

  • es.bulk.numWorkers (optional): The number of ingestion workers used in parallel. Default is 0 which means that the exporter uses as many workers as there are cores in the machine.

  • es.bulk.flashBuffer (optional): The size in bytes of the flush buffer for ingestion. It should be large enough to hold one bulk (the number of records specified in buffer), otherwise the bulk is broken into smaller chunks. Default is 5e+6.

  • es.bulk.flushTimeout (optional): The flush buffer time threshold. Valid values are golang duration strings. Default is 30s.

The Elastic exporter does not make any assumption on the existence or configuration of the index specified in es.index. If the index does not exist, Elastic will automatically create it and apply a default dynamic mapping. It may be beneficial to use an explicit mapping for the ECS data generated by the Elastic exporter. For convinience we provide an explicit mapping for creating a new tailored index in Elastic. For more information refer to the Elastic Mapping reference.

Environment variables

It is possible to override any of the custom attributes of a plugin using an environment variable. This is especially useful when operating the processor as a container, where you may have to deploy the processor to multiple nodes, and have attributes that change per node. If an environment variable is set, it overrides the setting inside the config file. The environment variables must follow the following structure:

  • Environment variables must follow the naming schema <PLUGIN NAME>_<CONFIG ATTRIBUTE NAME>

  • The plugin name inside the pipeline configuration file must be all lower case.

For example, to set the alert mode inside the policy engine, the following environment variable is set:


To set the syslog values for the exporter:

export EXPORTER_TYPE=telemetry
export EXPORTER_EXPORT=syslog
export EXPORTER_PORT=514

If running as a docker container, environment variables can be passed with the docker run command:

docker run
-e EXPORTER_TYPE=telemetry \

Rate limiter configuration (experimental)

The flattener handler has a built-in time decay filter that can be enabled to reduce even rates in the processor. The filter uses a time-decay bloom filter based on a semantic hashing of records. This means that the filter should only forward one record matching a semantic hash per time decay period. The semantic hash takes into consideration process, flow and event attributes. To enable rate limiting, modify the sysflowreader processor as follows:

     "processor": "sysflowreader",
     "handler": "flattener",
     "in": "sysflow sysflowchan",
     "out": "flat flattenerchan",
     "filter.enabled": "on|off (default: off)",
     "filter.maxage": "time decay in minutes (default: 24H)"

Policy Language

The policy engine adopts and extends the Falco rules definition syntax. Before reading the rest of this section, please go through the Falco Rules documentation to get familiar with rule, macro, and list syntax, all of which are supported in our policy engine. Policies are written in one or more yaml files, and stored in a directory specified in the pipeline configuration file under the policies attribute of the policy engine plugin.

Rules contain the following fields:

  • rule: the name of the rule

  • description: a textual description of the rule

  • condition: a set of logical operations that can reference lists and macros, which when evaluating to true, can trigger record enrichment or alert creation (depending on the policy engine mode)

  • action: a comma-separated list of actions to take place when the rule evaluates to true. For a particular rule, actions are evaluated in the order they are specified, i.e., an action can make use of the results provided by earlier actions. An action is just the name of an action function without any parameters. The current version only supports plugable user-defined actions. See here for a detailed description of the plugin interface and a sample action plugin.

  • priority: label representing the severity of the alert can be: (1) low, medium, or high, or (2) emergency, alert, critical, error, warning, notice, informational, debug.

  • tags (optional): set of labels appended to alert (default: empty).

  • prefilter (optional): list of record types (sf.type) to whitelist before applying rule condition (default: empty).

  • enabled (optional): indicates whether the rule is enabled (default: true).

NOTE: The syntax of the policy language changed slighly with the switch to release 0.4.0. For migrating policy files used with prior releases to release 0.4.0 or higher, simply remove all action: [tag] lines. As of release 0.4.0, tagging is done automatically. If a rule triggers all tags specified via the tags key will be appended to the record. The action key is reserved for specifying user-defined action plugins.</p>

Macros are named conditions and contain the following fields:

  • macro: the name of the macro

  • condition: a set of logical operations that can reference lists and macros, which evaluate to true or false

Lists are named collections and contain the following fields:

  • list: the name of the list

  • items: a collection of values or lists

Drop rules block records matching a condition and can be used for reducing the amount of records processed by the policy engine:

  • drop: the name of the filter

  • condition: a set of logical operations that can reference lists and macros, which evaluate to true or false

For example, the rule below specifies that matching records are process events (sf.type = PE), denoting EXEC operations (sf.opflags = EXEC) for which the process matches macro package_installers. Additionally, the global filter containers preemptively removes from the processing stream any records for processes not running in a container environment.

# lists
- list: rpm_binaries
  items: [dnf, rpm, rpmkey, yum, '"75-system-updat"', rhsmcertd-worke, subscription-ma,
          repoquery, rpmkeys, rpmq, yum-cron, yum-config-mana, yum-debug-dump,
          abrt-action-sav, rpmdb_stat, microdnf, rhn_check, yumdb]

- list: deb_binaries
  items: [dpkg, dpkg-preconfigu, dpkg-reconfigur, dpkg-divert, apt, apt-get, aptitude,
    frontend, preinst, add-apt-reposit, apt-auto-remova, apt-key,
    apt-listchanges, unattended-upgr, apt-add-reposit]

- list: package_mgmt_binaries
  items: [rpm_binaries, deb_binaries, update-alternat, gem, pip, pip3,, alternatives, chef-client]

# macros
- macro: package_installers
  condition: pmatch (package_mgmt_binaries)

# global filters (blacklisting)
- filter: containers
  condition: sf.container.type = host

# rule definitions
- rule: Package installer detected
  desc: Use of package installer detected
  condition:  sf.opflags = EXEC and package_installers
  priority: medium
  tags: [actionable-offense, suspicious-process]
  prefilter: [PE] # record types for which this rule should be applied (whitelisting)
  enabled: true

Attribute names

The following table shows a detailed list of attribute names supported by the policy engine, as well as their type, and comparative Falco attribute name. Our policy engine supports both SysFlow and Falco attribute naming convention to enable reuse of policies across the two frameworks.




Falco Attribute


Record type




Operation flags

Operation Flags List: remove OP_ prefix

evt.type (remapped as falco event types)


Return code




start timestamp(ns)




end timestamp(ns)



Process PID



Thread PID




Process user ID




Process user name



Process group ID



Process group name



Proc ancestors PIDs (qo)




Proc anctrs names (qo) (exclude path)




Process command/filename (with path)




Process command arguments



Process name (qo) (exclude path)



Process command line (qo)




Process TTY status




Process container entrypoint


proc.vpid == 1


Process creation timestamp (ns)



Parent process ID




Parent process group ID




Parent process user ID



Parent process group name




Parent process TTY status




Parent process container entry




Parent process user name




Parent process command/filename




Parent process command arguments



Parent process name (qo) (no path)




Parent process command line (qo)




Parent process creation timestamp




File descriptor number




File path



New file path (used in some FileEvents)



File name (qo)



File directory (qo)



File type

char ‘f’: file, 4: IPv4, 6: IPv6, ‘u’: unix socket, ‘p’: pipe, ‘e’: eventfd, ‘s’: signalfd, ‘l’: eventpoll, ‘i’: inotify, ‘o’: unknown.



File open with write flag (qo)




File open with read flag (qo)




File open flags



Network protocol



Source port


Destination port



Src or Dst port (qo)



Source IP



Destination IP



Src or dst IP (qo)




File or network resource



Flow bytes read/received




Flow operations read/received




Flow bytes written/sent




Flow bytes written/sent



Container ID


Container name


Container image ID



Container image name




Container type




Container privilege status




Pod creation timestamp



Pod id



Pod name




Pod node name




Pod namespace




Pod restart count




Pod host IP addresses




Pod internal IP addresses



Pod services



Kubernetes event action



Kubernetes event resource type



Kubernetes event json message



Node identifier




Node IP address




SysFlow schema version




SysFlow JSON schema version



$ Jsonpath Expressions

Unlike attributes of the scalar types bool, int(64), and string, attributes of type json contain structured information in form of stringified json records. The policy language allows access to subfields inside such json records via GJSON jsonpath expressions. The jsonpath expression must be specified as a suffix to the attribute enclosed in square brackets. Examples of such terms are:[0.clusterip.0]   - the first cluster IP address of the first service associated with  a pod[items.0.namespace] - the namespace of the first item in a KE message attribute

See the GJSON path synax for more details. The result of applying a jsonpath expression to a json attribute is always of type string.


The policy language supports the following operations:




A and B

Returns true if both statements are true and sf.pproc.cmdline contains echo

A or B

Returns true if one of the statements are true

sf.file.path = “/etc/passwd” or sf.file.path = “/etc/shadow”

not A

Returns true if the statement isn’t true

not sf.pproc.exe = /usr/local/sbin/runc

A = B

Returns true if A exactly matches B. Note, if B is a list, A only has to exact match one element of the list. If B is a list, it must be explicit. It cannot be a variable. If B is a variable use in instead.

sf.file.path = [“/etc/passwd”, “/etc/shadow”]

A != B

Returns true if A is not equal to B. Note, if B is a list, A only has to be not equal to one element of the list. If B is a list, it must be explicit. It cannot be a variable.

sf.file.path != “/etc/passwd”

A < B

Returns true if A is less than B. Note, if B is a list, A only has to be less than one element in the list. If B is a list, it must be explicit. It cannot be a variable.

sf.flow.wops < 1000

A <= B

Returns true if A is less than or equal to B. Note, if B is a list, A only has to be less than or equal to one element in the list. If B is a list, it must be explicit. It cannot be a variable.

sf.flow.wops <= 1000

A > B

Returns true if A is greater than B. Note, if B is a list, A only has to be greater than one element in the list. If B is a list, it must be explicit. It cannot be a variable.

sf.flow.wops > 1000

A >= B

Returns true if A is greater than or equal to B. Note, if B is a list, A only has to be greater than or equal to one element in the list. If B is a list, it must be explicit. It cannot be a variable.

sf.flow.wops >= 1000

A in B

Returns true if value A is an exact match to one of the elements in list B. Note: B must be a list. Note: () can be used on B to merge multiple list objects into one list.

sf.proc.exe in (bin_binaries, usr_bin_binaries)

A startswith B

Returns true if string A starts with string B

sf.file.path startswith ‘/home’

A endswith B

Returns true if string A ends with string B

sf.file.path endswith ‘.json’

A contains B

Returns true if string A contains string B and sf.pproc.cmdline contains org.apache.hadoop

A icontains B

Returns true if string A contains string B ignoring capitalization and sf.pproc.cmdline icontains org.apache.hadooP

A pmatch B

Returns true if string A partial matches one of the elements in B. Note: B must be a list. Note: () can be used on B to merge multiple list objects into one list. pmatch (modify_passwd_binaries, verify_passwd_binaries, user_util_binaries)

exists A

Checks if A is not a zero value (i.e. 0 for int, “” for string)

exists sf.file.path

See the resources policies directory in github for examples. Feel free to contribute new and interesting rules through a github pull request.

User-defined Actions

User-defined actions are implemented via the golang plugin mechanism. Check the documentation on Action Plugins for a custom action plugin example.


In addition to its core plugins, the processor also supports custom plugins that can be dynamically loaded into the processor via a compiled golang shared library using the golang plugin package. Custom plugins enable easy extension of the processor and the creation of custom pipelines tailored to specific use cases.

The processor supports four types of plugins:

  • drivers: enable the ingestion of different telemetry sources into the processor pipeline.

  • processors: enable the creation of custom data processing and analytic plugins to extend sf-processor pipelines.

  • handlers: enable the creation of custom SysFlow record handling plugins.

  • actions: enable the creation of custom action plugins to extend sf-processor’s policy engine.


  • Go 1.17 (if building locally, without the plugin builder)

Processor Plugins

User-defined plugins can be plugged and extend the sf-processor pipeline. These are the most generic type of plugins, from which all built-in processor plugins are build. Check the core package for examples. We have built-in processor plugins for flattening the telemetry stream, implementing a policy engine, and creating event exporters.


Processor plugins (or just plugins) are implemented via the golang plugin mechanism. A plugin must implement the following interface, defined in the package.

// SFProcessor defines the SysFlow processor interface.
type SFProcessor interface {
  Register(pc SFPluginCache)
  Init(conf map[string]interface{}) error
  Process(ch interface{}, wg *sync.WaitGroup)
  GetName() string
  SetOutChan(ch []interface{})

The Process function is the main function of the plugin.It’s where the “main loop” of the plugin should be implemented. It receives the input channel configured in the custom plugin’s block in the pipeline configuration. It also received the pepeline thread WaitGroup. Custom processing code should be implemented using this function. Init is called once, when the pipeline is loaded. Cleanup is called when the pipeline is terminated. SetOutChannel receives a slice with the output channels configured in the plugin’s block in the pipeline configuration.

When loading a pipeline, sf-processor performs a series of health checks before the pipeline is enabled. If these health checks fail, the processor terminates. To enable health checks on custom plugins, implement the Test function defined in the interface below. For an example, check core/exporter/exporter.go.

// SFTestableProcessor defines a testable SysFlow processor interface.
type SFTestableProcessor interface {
  Test() (bool, error)


A dynamic plugin example is provided in github. The core of the plugin is building an object that implements an SFProcessor interface. Such an implementation looks as follows:

package main

import (


const (
    pluginName string = "example"

// Plugin exports a symbol for this plugin.
var Plugin Example

// Example defines an example plugin.
type Example struct{}

// NewExample creates a new plugin instance.
func NewExample() plugins.SFProcessor {
    return new(Example)

// GetName returns the plugin name.
func (s *Example) GetName() string {
    return pluginName

// Init initializes the plugin with a configuration map.
func (s *Example) Init(conf map[string]interface{}) error {
    return nil

// Register registers plugin to plugin cache.
func (s *Example) Register(pc plugins.SFPluginCache) {
    pc.AddProcessor(pluginName, NewExample)

// Process implements the main interface of the plugin.
func (s *Example) Process(ch interface{}, wg *sync.WaitGroup) {
    cha := ch.(*flattener.FlatChannel)
    record := cha.In
    logger.Trace.Println("Example channel capacity:", cap(record))
    defer wg.Done()
    logger.Trace.Println("Starting Example")
    for {
        fc, ok := <-record
        if !ok {
            logger.Trace.Println("Channel closed. Shutting down.")
        if fc.Ints[sfgo.SYSFLOW_IDX][sfgo.SF_REC_TYPE] == sfgo.PROC_EVT {
            logger.Info.Printf("Process Event: %s, %d", fc.Strs[sfgo.SYSFLOW_IDX][sfgo.PROC_EXE_STR], fc.Ints[sfgo.SYSFLOW_IDX][sfgo.EV_PROC_TID_INT])
    logger.Trace.Println("Exiting Example")

// SetOutChan sets the output channel of the plugin.
func (s *Example) SetOutChan(ch []interface{}) {}

// Cleanup tears down plugin resources.
func (s *Example) Cleanup() {}

// This function is not run when module is used as a plugin.
func main() {}

The custom plugin must implement the following interface:

  • GetName() - returns a lowercase string representing the plugin’s label. This label is important, because it identifies the plugin in the pipeline.json file, enabling the processor to load the plugin. In the object above, this plugin is called example. Note that the label must be unique.

  • Init(conf map[string]interface{}) error - used to initialize the plugin. The configuration map that is passed to the function stores all the configuration information defined in the plugin’s definition inside pipeline.json (more on this later).

  • Register(pc plugins.SFPluginCache) - this registers the plugin with the plugin cache of the processor.

    • pc.AddProcessor(pluginName, <plugin constructor function>) (required) - registers the plugin named example with the processor. You must define a constructor function using the convention New<PluginName> which is used to instantiate the plugin, and returns it as an SFProcessor interface - see NewExample for an example.

    • pc.AddChannel(channelName, <output channel constructor function>) (optional) - if your plugin is using a custom output channel of objects (i.e., the channel used to pass output objects from this plugin to the next in the pipeline), it should be included in this plugin.

      • The channelName should be a lowercase unique label defining the channel type.

      • The constructor function should return a golang interface{} representing an object that as an In attribute of type chan <ObjectToBePassed>. We will call this object, a wrapped channel object going forward. For example, the channel object that passes sysflow objects is called SFChannel, and is defined here

      • For a complete example of defining an output channel, see NewFlattenerChan in the flattener as well as the Register function. The FlatChannel is defined here

  • Process(ch interface{}, wg *sync.WaitGroup) - this function is launched by the processor as a go thread and is where the main plugin processing occurs. It takes a wrapped channel object, which acts as the input data source to the plugin (i.e., this is the channel that is configured as the input channel to the plugin in the pipeline.json). It also takes a sync.WaitGroup object, which is used to signal to the processor when the plugin has completed running (see defer wg.Done() in code). The processor must loop on the input channel, and do its analysis on each input record. In this case, the example plugin is reading flat records and printing them to the screen.

  • SetOutChan(ch []interface{}) - sets the wrapped channels that will serve as the output channels for the plugin. The output channels are instantiated by the processor, which is also in charge of stitching the plugins together. If the plugin is the last one in the chain, then this function can be left empty. See the SetOutputChan function in the flattener to see how an output channel is implemented.

  • Cleanup() - Used to cleanup any resources. This function is called by the processor after the plugin Process function exits. One of the key items to close in the Cleanup function is the output channel using the golang close() function. Closing the output channel enables the pipeline to be torn down gracefully and in sequence.

  • main(){} - this main method is not used by the plugin or processor. It’s required by golang in order to be able to compile as a shared object.

To compile the example plugin, use the provided Makefile:

make -C plugins/processors/example

This will build the plugin and copy it into resources/plugins/.

To use the new plugin, use the configuration provided in github, which defines the following pipeline:

      "processor": "sysflowreader",
      "handler": "flattener",
      "in": "sysflow sysflowchan",
      "out": "flat flattenerchan"
      "processor": "example",
      "in": "flat flattenerchan"

This pipeline contains two plugins:

  • The builtin sysflowReader plugin with flattener handler, which takes raw sysflow objects, and flattens them

    into arrays of integers and strings for easier processing in certain plugins like the policy engine.

  • The example plugin, which takes the flattened output from the sysflowreader plugin, and prints it the screen.

The key item to note is that the output channel (i.e., out) of sysflowreader matches the input channel (i.e., in) of the example plugin. This ensures that the plugins will be properly stitched together.


The example plugin is a custom plugin that illustrates how to implement a minimal plugin that reads the records from the input channel and logs them to the standard output.

To run this example, in the root of sf-processor, build the processor and the example plugin. Note, this plugin’s shared object is generated in resources/plugins/

make build && make -C plugins/processors/example

Then, run:

cd driver && ./sfprocessor -log=info -config=../plugins/processors/example/pipeline.example.json ../resources/traces/tcp.sf

Plugin builder

To build the plugin for release, Go requires the code to be compiled with the exact package versions that the SysFlow processor was compiled with. The easiest way to achieve this is to use the pre-built plugin-builder Docker image in your build. This option also works for building plugins for deployment with the SysFlow binary packages.

Below is an example of how this can be achieved. Set $TAG to a SysFlow release (>=0.4.0), edge, or dev.

First, build the plugin:

docker run --rm \
    -v $(pwd)/plugins:/go/src/ \
    -v $(pwd)/resources:/go/src/ \
    sysflowtelemetry/plugin-builder:$TAG \
    make -C /go/src/

To test it, run the pre-built processor with the example configuration and trace.

docker run --rm \
    -v $(pwd)/plugins:/usr/local/sysflow/plugins \
    -v $(pwd)/resources:/usr/local/sysflow/resources \
    -w /usr/local/sysflow/bin \
    --entrypoint=/usr/local/sysflow/bin/sfprocessor \
    sysflowtelemetry/sf-processor:$TAG \
    -log=info -config=../plugins/processors/example/pipeline.example.json ../resources/traces/tcp.sf

The output on the above pre-recorded trace should look like this:

[Health] 2022/02/21 12:55:19 pipeline.go:246: Health checks: passed
[Info] 2022/02/21 12:55:19 main.go:147: Successfully loaded pipeline configuration
[Info] 2022/02/21 12:55:19 pipeline.go:170: Starting the processing pipeline
[Info] 2022/02/21 12:55:19 example.go:75: Process Event: ./server, 13823
[Info] 2022/02/21 12:55:19 example.go:75: Process Event: ./client, 13824
[Info] 2022/02/21 12:55:19 example.go:75: Process Event: ./client, 13824
[Info] 2022/02/21 12:55:19 example.go:75: Process Event: ./server, 13823

Handler Plugins

User-defined handler modules can be plugged to the built-in SysFlow processor plugin to implement custom data processing and analytic pipelines.


Handlers are implemented via the golang plugin mechanism. A handler must implement the following interface, defined in the package.

// SFHandler defines the SysFlow handler interface.
type SFHandler interface {
  RegisterChannel(pc SFPluginCache)
  RegisterHandler(hc SFHandlerCache)
  Init(conf map[string]interface{}) error
  IsEntityEnabled() bool
  HandleHeader(sf *CtxSysFlow, hdr *sfgo.SFHeader) error
  HandleContainer(sf *CtxSysFlow, cont *sfgo.Container) error
  HandleProcess(sf *CtxSysFlow, proc *sfgo.Process) error
  HandleFile(sf *CtxSysFlow, file *sfgo.File) error
  HandleNetFlow(sf *CtxSysFlow, nf *sfgo.NetworkFlow) error
  HandleNetEvt(sf *CtxSysFlow, ne *sfgo.NetworkEvent) error
  HandleFileFlow(sf *CtxSysFlow, ff *sfgo.FileFlow) error
  HandleFileEvt(sf *CtxSysFlow, fe *sfgo.FileEvent) error
  HandleProcFlow(sf *CtxSysFlow, pf *sfgo.ProcessFlow) error
  HandleProcEvt(sf *CtxSysFlow, pe *sfgo.ProcessEvent) error
  SetOutChan(ch []interface{})

Each Handle* function receives the current SysFlow record being processed along with its corresponding parsed record type. Custom processing code should be implemented using these functions.


The printer handler is a pluggable handler that logs select SysFlow records to the standard output. This plugin doesn’t define any output channels, so it acts as a plugin sink (last plugin in a pipeline).

To run this example, in the root of sf-processor, build the processor and the handler plugin. Note, this plugin’s shared object is generated in resources/handlers/

make build && make -C plugins/handlers/printer

Then, run:

cd driver && ./sfprocessor -log=info -config=../plugins/handlers/printer/pipeline.printer.json ../resources/traces/tcp.sf

Plugin builder

To build the plugin for release, Go requires the code to be compiled with the exact package versions that the SysFlow processor was compiled with. The easiest way to achieve this is to use the pre-built plugin-builder Docker image in your build. This option also works for building plugins for deployment with the SysFlow binary packages.

Below is an example of how this can be achieved. Set $TAG to a SysFlow release (>=0.4.0), edge, or dev.

First, build the plugin:

docker run --rm \
    -v $(pwd)/plugins:/go/src/ \
    -v $(pwd)/resources:/go/src/ \
    sysflowtelemetry/plugin-builder:$TAG \
    make -C /go/src/

To test it, run the pre-built processor with the example configuration and trace.

docker run --rm \
    -v $(pwd)/plugins:/usr/local/sysflow/plugins \
    -v $(pwd)/resources:/usr/local/sysflow/resources \
    -w /usr/local/sysflow/bin \
    --entrypoint=/usr/local/sysflow/bin/sfprocessor \
    sysflowtelemetry/sf-processor:$TAG \
    -log=info -config=../plugins/handlers/printer/pipeline.printer.json ../resources/traces/tcp.sf

The output on the above pre-recorded trace should look like this:

[Info] 2022/02/21 15:39:58 printer.go:118: ProcEvt ./server, 13823
[Info] 2022/02/21 15:39:58 printer.go:100: FileFlow ./server, 3
[Info] 2022/02/21 15:39:58 printer.go:100: FileFlow ./server, 3
[Info] 2022/02/21 15:39:58 printer.go:118: ProcEvt ./client, 13824
[Info] 2022/02/21 15:39:58 printer.go:100: FileFlow ./client, 3
[Info] 2022/02/21 15:39:58 printer.go:100: FileFlow ./client, 3
[Info] 2022/02/21 15:39:58 printer.go:94: NetworkFlow ./client, 8080
[Info] 2022/02/21 15:39:58 printer.go:118: ProcEvt ./client, 13824
[Info] 2022/02/21 15:39:58 printer.go:94: NetworkFlow ./server, 8080
[Info] 2022/02/21 15:39:58 printer.go:118: ProcEvt ./server, 13823

Action Plugins

User-defined actions can be plugged to SysFlow’s Policy Engine rule declarations to perform additional processing on matched records.


Actions are implemented via the golang plugin mechanism. An action must implement the following interface, defined in the package.

// Prototype of an action function
type ActionFunc func(r *Record) error

// Action interface for user-defined actions
type Action interface {
        GetName() string
        GetFunc() ActionFunc

Actions have a name and an action function. Within a single policy engine instance, action names must be unique. User-defined actions cannot re-declare built-in actions. Reusing names of user-defined actions overwrites previously registered actions.

The action function receives the current record as an argument and thus has access to all record attributes. The action result can be stored in the record context via the context modifier methods.


The now action is a pluggable action that creates a tag containing the current time in nanosecond precision.

First, in the root of sf-processor, build the processor and the action plugin. Note, this plugin’s shared object is generated in resources/actions/

make build && make -C plugins/actions/example

Then, run:

cd driver && ./sfprocessor -log=quiet -config=../plugins/actions/example/pipeline.actions.json ../resources/traces/tcp.sf

Plugin builder

To build the plugin for release, Go requires the code to be compiled with the exact package versions that the SysFlow processor was compiled with. The easiest way to achieve this is to use the pre-built plugin-builder Docker image in your build. This option also works for building plugins for deployment with the SysFlow binary packages.

Below is an example of how this can be achieved. Set $TAG to a SysFlow release (>=0.4.0), edge, or dev.

First, build the plugin:

docker run --rm \
    -v $(pwd)/plugins:/go/src/ \
    -v $(pwd)/resources:/go/src/ \
    sysflowtelemetry/plugin-builder:$TAG \
    make -C /go/src/

To test it, run the pre-built processor with the example configuration and trace.

docker run --rm \
    -v $(pwd)/plugins:/usr/local/sysflow/plugins \
    -v $(pwd)/resources:/usr/local/sysflow/resources \
    -w /usr/local/sysflow/bin \
    --entrypoint=/usr/local/sysflow/bin/sfprocessor \
    sysflowtelemetry/sf-processor:$TAG \
    -log=quiet -config=../plugins/actions/example/pipeline.actions.json ../resources/traces/tcp.sf

In the output, observe that all records matching the policy speficied in pipeline.actions.json are tagged by action now with the tag now_in_nanos. For example:

  "version": 4,
  "endts": 0,
  "opflags": [
  "policies": [
      "id": "Action example",
      "desc": "user-defined action example",
      "priority": 0
  "tags": [

Docker usage

Documentation and scripts for how to deploy the SysFlow Processor with docker compose can be found in here.

Processor environment

As mentioned in a previous section, all custom plugin attributes can be set using the following: <PLUGIN NAME>_<CONFIG ATTRIBUTE NAME> format. Note that the docker compose file sets several attributes including EXPORTER_TYPE, EXPORTER_HOST and EXPORTER_PORT.

The following are the default locations of the pipeline configuration and plugins directory:

  • pipeline.json: /usr/local/sysflow/conf/pipeline.json

  • drivers dir: /usr/local/sysflow/resources/drivers

  • plugins dir: /usr/local/sysflow/resources/plugins

  • handler dir: /usr/local/sysflow/resources/handlers

  • actions dir: /usr/local/sysflow/resources/actions

The default configuration can be changed by setting up a virtual mounts mapping the host directories/files into the container using the volumes section of the sf-processor in the docker-compose.yaml.

    container_name: sf-processor
    image: sysflowtelemetry/sf-processor:latest
    privileged: true
      - ./path/to/my.pipeline.json:/usr/local/sysflow/conf/pipeline.json

The policy location can be overwritten by setting the POLICYENGINE_POLICIES environment variable, which can point to a policy file or a directory containing policy files (must have yaml extension).

The docker container uses a default filter.yaml policy that outputs SysFlow records in json. You can use your own policy files from the host by mounting your policy directory into the container as follows, in which the custom pipeline points to the mounted policies.

    container_name: sf-processor
    image: sysflowtelemetry/sf-processor:latest
    privileged: true
      - ./path/to/my.pipeline.json:/usr/local/sysflow/conf/pipeline.json
      - ./path/to/policies/:/usr/local/sysflow/resources/policies/