Implementing Custom Pipelines

To implement your custom components you must create a class that subclasses one of the 3 available components as described in Implementing Components.

Once implemented, you can register it to make it available to Flowbber as described in Registering Components.

To allow passing configuration options to your component, implement the declare_config() method as described in Specifying Options.

Implementing Components

Depending on the type of component you want to create, subclass one of the following classes and implement its abstract method.

Sources

To implement a Source component, subclass the Flowbber’s flowbber.components.Source class and implement the flowbber.components.Source.collect() abstract method.

class flowbber.components.Source(index, type_, id_, optional=False, timeout=None, config=None)

Main base class to implement a Source.

collect()

Collect some arbitrary data.

All sources subclasses must implement this abstract method.

Returns
A dictionary with the data collected by this source.
Return type
dict

Source’s collect() method must return a dictionary with the collected data, using key as label and value as value of the label. The collected data structure can be more complicated and nested as required, as long as JSON serializable objects are used, for example built-in datatypes like integers, booleans, floats, strings, and data structures like dictionaries and lists.

For example, consider a basic implementation of the built-in EnvSource that collects data from the environment:

from os import environ
from flowbber.components import Source


class MySource(Source):
    def collect(self):
        return {
            key, value
            for key, value in environ.items()
        }

Aggregators

To implement an Aggregator component, subclass the Flowbber’s flowbber.components.Aggregator class and implement the flowbber.components.Aggregator.accumulate() abstract method.

class flowbber.components.Aggregator(index, type_, id_, optional=False, timeout=None, config=None)

Main base class to implement an Aggregator.

accumulate(data)

Perform analysis or accumulate from the given data.

The aggregator can:

  1. Add values:

    data['some_key'] = {'value': 1000}
    
  2. Delete values:

    del data['some_key']
    
  3. Modify values:

    data['some_key']['value'] = 2000
    

All aggregators subclasses must implement this abstract method.

Parameters
data (OrderedDict) – The data collected by the sources. Any modifications performed will be reflected on the final collected data.

Aggregator’s accumulate() method takes the editable bundle of data collected by the sources and doesn’t return anything, but it is allowed to mutable, add or delete data from the bundle.

For example, consider a pipeline that uses twice the built-in CoberturaSource because your codebase has a part written in C and another one in Go, and you need a global coverage metric.

The pipeline’s sources could look like this:

[[sources]]
type = "cobertura"
id = "coverage_c"

    [sources.config]
    xmlpath = "{git.root}/build/test/c/coverage.xml"

[[sources]]
type = "cobertura"
id = "coverage_go"

    [sources.config]
    xmlpath = "{git.root}/build/test/go/coverage.xml"

The data gathered by both sources could look like this:

OrderedDict([
    ('coverage_c', {
        'files': {
            # ...
        },
        'total': {
            'total_statements': 1000,
            'total_misses': 200,
            'line_rate': 0.80
        }
    }),
    ('coverage_go', {
        'files': {
            # ...
        },
        'total': {
            'total_statements': 200,
            'total_misses': 100,
            'line_rate': 0.50
        }
    })
])

We could implement a basic aggregator that sums the total of both suites:

from flowbber.components import Aggregator


class MyAggregator(Aggregator):
    def accumulate(self, data):

        ids = ['coverage_c', 'coverage_go']
        total_statements = 0
        total_misses = 0

        for datakey in ids:
            total = data[datakey]['total']
            total_statements += total['total_statements']
            total_misses += total['total_misses']

        total_hits = total_statements - total_misses
        line_rate = total_hits / total_statements

        data[self.id] = {
            'total_statements': total_statements,
            'total_misses': total_misses,
            'line_rate': line_rate
        }

Assuming that we register this aggregator as my_aggregator, we could add it to the pipeline definition file like this:

[[aggregators]]
type = "my_aggregator"
id = "total_coverage"

The above aggregator will transform the collected data like this:

OrderedDict([
    ('coverage_c', {
        # ...
    }),
    ('coverage_go', {
        # ...
    }),
    ('total_coverage', {
        'total_statements': 1200,
        'total_misses': 300,
        'line_rate': 0.75
    })
])

Note that while we hardwired the ids of the sources this could be parametrized to the component as described in Specifying Options. Also note they way we used self.id as the key to add to the bundle.

Sinks

To implement a Sink component, subclass the Flowbber’s flowbber.components.Sink class and implement the flowbber.components.Sink.distribute() abstract method.

class flowbber.components.Sink(index, type_, id_, optional=False, timeout=None, config=None)

Main base class to implement a Sink.

distribute(data)

Distribute the collected data.

All sinks subclasses must implement this abstract method.

Parameters
data (OrderedDict) – The collected data. This dictionary can be modified as required without consequences for the pipeline.

Sink’s distribute() method takes a copy of the final collected data (after passing all aggregators) and returns nothing.

For example, consider a basic and naive implementation of the built-in MongoDBSink that submits data to a MongoDB database:

from flowbber.components import Sink
from flowbber.logging import get_logger


log = get_logger(__name__)


class SimpleMongoDBSink(Sink):
    def distribute(self, data):
        from pymongo import MongoClient

        client = MongoClient('mongodb://localhost:27017/')
        database = client['mydatabase']
        collection = database['mycollection']

        data_id = collection.insert_one(data).inserted_id
        log.info('Inserted data to MongoDB with id {}'.format(data_id))

In this example we hardwired the connection URI, database name and collection name. This can be parametrized as described in Specifying Options. Check the code of flowbber.plugins.sinks.mongodb.MongoDBSink for a full example on the implementation of the MongoDB sink.

Registering Components

Entrypoints

Flowbber uses setuptools’ entrypoints to perform dynamic discovery of plugins.

Asumming your package is called mypackage, add an entry_points keyword argument to your package setup.py’s setup() function as follows:

setup(
    # ...
    # Entry points
    entry_points={
        'flowbber_plugin_sources_1_0': [
            'mysource = mypackage.sources.mycustom:MyCustomSource'
        ],
        'flowbber_plugin_aggregators_1_0': [
            'myaggregator = mypackage.aggregators.mycustom:MyCustomAggregator'
        ],
        'flowbber_plugin_sinks_1_0': [
            'mycustom = mypackage.sinks.mycustom:MyCustomSink'
        ]
    }
)

As key, use one of the following entrypoints:

Sources
flowbber_plugin_sources_1_0
Aggregators
flowbber_plugin_aggregators_1_0
Sinks
flowbber_plugin_sinks_1_0

The elements in the list associated with those entrypoints have the following structure:

<your_new_type> = <yourpackage>.<submodule>.<submodule>:<YourComponentClass>

Please note the : (colon) after the module path. Once installed alongside Flowbber, your package will be available for use using type = your_new_type in your pipeline definition file.

Pipeline’s flowconf

In many situations, creating a whole Python package with just a couple of components, installing it and keeping track of its version is too much overhead.

For example, what if your custom pipeline can be implemented with Flowbber’s built-in Sources and Sinks, but you only require one additional simple aggregator?

Flowbber supports creating local configuration modules that allows to create components specific to a pipeline.

Create a flowconf.py file next to your pipeline definition file. If you have placed your pipeline under version control, keeping track of both the components and the definition of the pipeline is straightforward. Flowbber will load the components defined in this way and made it available to your pipeline.

In the flowconf.py file, you can register your components using the register function for each loader:

Sources:

from flowbber.loaders import source
from flowbber.components import Source


@source.register('my_source')
class MySource(Source):
    def collect(self):
        return {'my_value': 1000}

Aggregators:

from flowbber.loaders import aggregator
from flowbber.components import Aggregator


@aggregator.register('my_aggregator')
class MyAggregator(Aggregator):
    def accumulate(self, data):
        data['num_sources'] = {'total': len(data.keys())}

Sinks:

from flowbber.loaders import sink
from flowbber.components import Sink


@sink.register('my_sink')
class MySink(Sink):
    def distribute(self, data):
        print(data)

With the above configuration file, the pipeline definition file can use those components:

[[sources]]
type = "my_source"
id = "my_source1"

[[sources]]
type = "my_source"
id = "my_source2"

[[aggregators]]
type = "my_aggregator"
id = "my_aggregator1"

[[sinks]]
type = "my_sink"
id = "my_sink1"

The resulting collected data will be:

OrderedDict([
    ('my_source1', {'my_value': 1000}),
    ('my_source2', {'my_value': 1000}),
    ('num_sources', {'total': 2})
])

Specifying Options

Any component can implement the method flowbber.components.base.Component.declare_config():

Component.declare_config(config)

Declare the configuration options of this component.

Parameters
config (flowbber.config.Configurator) – The configuration manager for this component.

It is expected that for each option the component requires to declare, a call to flowbber.config.Configurator.add_option() is performed.

Configurator.add_option(key, default=None, optional=False, schema=None, secret=False)

Declare an option.

Parameters
  • key (str) – Key of the configuration option. Must be representable as a public Python variable.
  • default – Default value for this option if optional and no value was provided.
  • optional (bool) – Is this option mandatory or optional. Default is mandatory.
  • schema (dict) – Schema to validate the user value against.
  • secret (bool) – Boolean indicating that the options is a secret and thus shouldn’t be printed, logged, stored in plain text, etc.

For example, let’s retake our previous simple MongoDB sink example. We want to parametrize, among other things, the uri, database and collection.

def declare_config(self, config):
    config.add_option(
        'uri',
        default=None,
        optional=True,
        schema={
            'type': 'string',
            'empty': False,
            'nullable': True,
        },
        secret=True,
    )

    config.add_option(
        'database',
        schema={
            'type': 'string',
            'empty': False,
        },
    )

    config.add_option(
        'collection',
        schema={
            'type': 'string',
            'empty': False,
        },
    )

The schema keyword argument receives a dictionary that describes the schema for which the value will be validated. Validation is performed using the awesome Cerberus library, and allows to create schemas for complex data types and values. If the schema is set to None, no validation will be performed.

Once validated and normalized, configuration options will be available under the read only self.config attribute. The keys of the configuration options are mapped to an object with three attributes:

key

Key of the configuration option. Same as given as first argument to add_option.

value

The validated and normalized or default value of the option.

is_secret

Boolean indicating that the option is a secret and thus shouldn’t be printed, logged, stored in plain text, etc. Same as add_option’s secret keyword argument.

In this sense, it is important to note that Flowbber won’t log or store any option marked as secret.

Following our simple MongoDB example, we could use the configurations options as follows:

from flowbber.components import Sink


class SimpleMongoDBSink(Sink):
    def declare_config(self, config):
        # Declare options ...

    def distribute(self, data):
        from pymongo import MongoClient

        client = MongoClient(self.config.uri.value)
        database = client[self.config.database.value]
        collection = database[self.config.collection.value]

        # ...

Finally, in some situations a basic option by option validation is insufficient, for example, if two options are incompatible between them.

In this situations the component can register a custom validation function using the flowbber.config.Configurator.add_validator() method:

Configurator.add_validator(validator)

Add a custom validation function.

Parameters
validator (function) – A custom validator function.

The function registered will receive the validated data as a dictionary just before its freezing and can perform any modification required on the data.

For example, consider the MongoDBSink that allows to define the connection parameters either as a single string or as multiple values:

def declare_config(self, config):

    # ... calls to add_option()

    # Check if uri is defined and if so then delete other keys
    def custom_validator(validated):
        if validated['uri'] is not None:
            del validated['host']
            del validated['port']
            del validated['username']
            del validated['password']
        else:
            del validated['uri']

    config.add_validator(custom_validator)

This validation function could perform checks against values, for example in TimestampSource where at least one format needs to be enabled:

def declare_config(self, config):

    # ... calls to add_option()

    # Check that at least one format is enabled
    def custom_validator(validated):
        if not any(validated.values()):
            raise ValueError(
                'The timestamp source requires at least one timestamp '
                'format enabled'
            )

    config.add_validator(custom_validator)

Logging Considerations

Both sources and sinks run in subprocesses, so logging and printing directly to stdout or stderr will mangle the output. In order to log and print from any context without issues Flowbber provides the functions:

  1. flowbber.logging.get_logger() that allows to get a multiprocess safe logger.
  2. flowbber.logging.print() that allows to safely print to stdout or stderr in a multiprocess context.
flowbber.logging.get_logger(name)

Return a multiprocess safe logger.

Parameters
name (str) – Name of the logger.
Returns
A multiprocess safe logger.
Return type
logging.Logger.
flowbber.logging.print(obj, fd='stdout')

Enqueue a print to the given fd.

Parameters
  • obj – Object to print.
  • fd (str) – Name of the file descriptor. Either stdout or stderr only.

Nevertheless, it is important to note that in Flowbber, Python’s traditional logging.getLogger() loggers will also work correctly, for example, inside a third party library used in your sources or sinks. So no change needs to be done if Flowbber’s get_logger function isn’t used for logging, it is just the recommended one.

Usage:

from flowbber.logging import get_logger, print


log = get_logger(__name__)


def do_foo(say):
    print(say)
    print(say, fd='stderr')
    log.info(say)

Pipeline API Usage

It is possible to build a Python package that implements a specific pipeline based on Flowbber.

For example, consider a daemon that reads values from connected Arduino based sensors inside a RaspberryPI and submits it to a server in the cloud, either using a web service or a database management system.

As another example consider a monitoring daemon that grabs the CPU usage information from the system and submits it to a InfluxDB and / or MongoDB database.

Let’s code a basic version of our hypothetical daemon cpud. Let’s assume that its configuration file is located at /etc/cpud.toml and look like this:

[cpud]
verbosity = 3

# Take a sample each 10 seconds
frequency = 10

[influxdb]
uri = "influxdb://localhost:8086/"
database = "cpud"

[mongodb]
uri = "mongodb://localhost:27017/"
database = "cpud"
collection = "cpuddata"

A basic and naive implementation of such daemon could looks like this:

from toml import loads

from flowbber.pipeline import Pipeline
from flowbber.scheduler import Scheduler
from flowbber.logging import setup_logging
from flowbber.inputs import validate_definition


def build_definition(config):
    """
    Build a pipeline definition based on given configuration.
    """
    definition = {
        'sources': [
            {'type': 'timestamp', 'id': 'timekeys', 'config': {
                'epoch': True,  # Key for MongoDB
                'iso8601': True,  # Key for InfluxDB
            }},
            {'type': 'cpu', 'id': 'cpu'},
        ],
        'sinks': [],
        'aggregators': [],
    }

    if config['influxdb']:
        definition['sinks'].append({
            'type': 'influxdb',
            'id': 'influxdb',
            'config': {
                'uri': config['influxdb']['uri'],
                'database': config['influxdb']['database'],
                'key': 'timekeys.iso8601',
            }
        })

    if config['mongodb']:
        definition['sinks'].append({
            'type': 'mongodb',
            'id': 'mongodb',
            'config': {
                'uri': config['mongodb']['uri'],
                'database': config['mongodb']['database'],
                'collection': config['mongodb']['collection'],
                'key': 'timekeys.epoch',
            }
        })

    if not definition['sinks']:
        raise RuntimeError('No sinks configured')

    return definition


def main():

    # Read configuration
    with open('/etc/cpud.toml') as fd:
        config = loads(fd.read())

    # Setup multiprocess logging
    setup_logging(config['cpud']['verbosity'])

    # Build pipeline definition
    definition = build_definition(config)

    # Validate pipeline definition
    validated = validate_definition(definition)

    # Build pipeline
    pipeline = Pipeline(
        validated,
        'cpud',
        app='cpud',
    )

    # Build and run scheduler
    scheduler = Scheduler(
        pipeline,
        config['cpud']['frequency'],
        stop_on_failure=True
    )

    scheduler.run()

The most relevant parts of this example is that:

  1. We programmatically build the Pipeline Definition from cpud’s own configuration file.

    Note that the package itself could package the required components (Sources, Aggregators or Sinks) altogether.

    In particular, we use flowbber.inputs.validate_definition() that normalizes and validates the pipeline definition we just built.

  2. We create an instance of flowbber.pipeline.Pipeline with the definition.

    For one time execution applications we can use flowbber.pipeline.Pipeline.run() method.

    class flowbber.pipeline.Pipeline(pipeline, name, app='flowbber')

    Pipeline executor class.

    This class will fetch all components from locally declared components and entrypoint plugins, create instance of all of them and execute the pipeline in order, creating subprocesses when needed.

    Execution of the pipeline is registered in a journal that is returned and / or saved when the execution of the pipeline ends.

    Parameters
    • pipeline (dict) – A pipeline definition data structure.
    • name (str) – Name of the pipeline. Used only for pretty printing only.
    • app (str) – Name of the application running the pipeline. This name is used mainly to set the process name and the journals directory.
    executed

    Number of times this pipeline has been executed.

    name

    Name of this pipeline.

    run()

    Execute pipeline.

    This method can be called several times after instantiating the pipeline.

    Returns
    The journal of the execution.
    Return type
    dict
  3. We create an instance of flowbber.scheduler.Scheduler that will control the continuous execution of the pipeline.

    class flowbber.scheduler.Scheduler(pipeline, frequency, samples=None, start=None, stop_on_failure=False)

    Schedule the execution of a pipeline.

    The scheduler will try to run the pipeline on schedule. If the pipeline takes too long, longer that the programmed frequency, the scheduler will increment the runs_missed counter for each execution that failed to run at the expected schedule because the previous run was still running and will start the missed pipeline execution right away.

    Parameters
    • pipeline (flowbber.pipeline.Pipeline.) – The pipeline to execute.
    • frequency (float) – Sampling frequency in seconds.
    • samples (int) – Number of samples (successful executions of the pipeline) to take before stopping the scheduler. If missing or None, the scheduler will continue taking samples forever.
    • start (int) – An absolute timestamp in seconds since the epoch that mark when the scheduler should start executing the pipeline. This timestamp must be in the future. If missing or None, the scheduler will start immediately.
    • stop_on_failure (bool) – Stop the the scheduler if the pipeline fails one execution. Else keep scheduling run even on failure.
    last_run

    Timestamp in seconds since the epoch of the last run of the pipeline.

    run()

    Start the scheduler.

    runs

    Read-only dictionary with the numbers of categorized runs of the pipeline:

    {
        'passed': 10,
        'failed': 2,
        'missed': 0,
    }
    
  4. We first thing we do is call flowbber.logging.setup_logging() with the configured verbosity level to setup Flowbber’s multiprocess safe logging and printing.

    If this function isn’t called as soon as possible or not called at all then logging performed from sources and sinks that run in their own subprocess will mangle the logging output.

    flowbber.logging.setup_logging(verbosity=0)

    Setup logging for this process.

    The first time it is called it will create a subprocess to manage the logging and printing, setup the subprocess for stream logging to stdout and setup the main process to queue logging.

    In consequence, any subprocess of the main process will inherit the queue logging and become multiprocess logging safe.

    This method can be called from subprocesses, but if at least one logging has been performed it will fail as the handler will already exists.

    Parameters
    verbosity (int) – Verbosity level, as defined by LoggingManager.LEVELS. The greater the number the more information is provided, with 0 as initial level.