Sinks

Sinks are plugins that allow to store data in different formats.

The following sinks plugins are included as part of Flowbber.

Archive

This sink writes all collected data to a JSON file.

Dependencies:

pip3 install flowbber[archive]

Usage:

[[sinks]]
type = "archive"
id = "..."

    [sinks.config]
    output = "data.json"
    override = true
    create_parents = true
    pretty = false
{
    "sinks": [
        {
            "type": "archive",
            "id": "...",
            "config": {
                "output": "data.json",
                "override": true,
                "create_parents": true,
                "pretty": false
            }
        }
    ]
}

output

Path to JSON file to write the collected data.

  • Default: N/A

  • Optional: False

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

override

Override output file if already exists.

  • Default: False

  • Optional: True

  • Schema:

    {
        'type': 'boolean',
    }
    
  • Secret: False

create_parents

Create output file parent directories if don’t exist.

  • Default: False

  • Optional: True

  • Schema:

    {
        'type': 'boolean',
    }
    
  • Secret: False

pretty

Pretty output.

  • Default: False

  • Optional: True

  • Schema:

    {
        'type': 'boolean',
    }
    
  • Secret: False

InfluxDB

This sink writes all collected data to a InfluxDB time series database.

In order to be able to track in time the complex data collected by the sources this sink requires to “flatten” the data first. The process will transform an arbitrarily deep dictionary tree into a fixed depth dictionary that maps the keys of the sources with the measurements they performed.

To accomplish this, the flattening process will:

  • Join all dictionaries keys (using the keysjoiner character) until a leaf value that has a datatype supported by InfluxDB (string, float, integer, boolean or None) is found.
  • Lists are converted to a dictionary that maps the index of the element with the element previous to the flattening.

So, for example, consider the data collected by the Cobertura source:

{
    'coverage': {
        'files': {
            '__init__.py': {
                'line_rate': 1.0,
                'total_misses': 0,
                'total_statements': 10,
            },
            '__main__.py': {
                'line_rate': 0.5,
                'total_misses': 5,
                'total_statements': 10,
            },
            # ...
        },
        'total': {
            'line_rate': 0.75,
            'total_misses': 5,
            'total_statements': 20,
        }
    }
}

The above structure will be transformed into:

{
    'coverage': {
        'files.__init__:py.line_rate': 1.0,
        'files.__init__:py.total_misses': 0,
        'files.__init__:py.total_statements': 10,
        'files.__main__:py.line_rate': 0.5,
        'files.__main__:py.total_misses': 5,
        'files.__main__:py.total_statements': 10,
        # ...
        'total.line_rate': 0.75,
        'total.total_misses': 5,
        'total.total_statements': 20,
    }
}

Important

Please note the . in the file name was changed to : before joining the keys. This replacement and joining is controlled by the keysjoinerreplace and keysjoiner options.

Also note that the leaf keys must map to a value that can be used as field value in InfluxDB (string, float, integer, boolean or None), if not, the flattening will fail.

Or in case of lists:

{
    'key1': ['a', 'b', 'c'],
    'key2': [
        {'a': 1},
        {'b': 2},
        {'c': 3},
    ]
}

The flattening results in:

{
    'key.0' : 'a',
    'key.1' : 'b',
    'key.2' : 'c',
    'key.0.a' : 1,
    'key.1.b' : 2,
    'key.2.c' : 3,
}

You may run this sink with DEBUG verbosity (-vvv) to analyze all transformations performed.

Dependencies:

pip3 install flowbber[influxdb]

Usage:

[[sinks]]
type = "influxdb"
id = "..."

    [sinks.config]
    uri = "influxdb://localhost:8086/"
    database = "flowbber"
    key = "timestamp.iso8601"
{
    "sinks": [
        {
            "type": "influxdb",
            "id": "...",
            "config": {
                "uri": "influxdb://localhost:8086/",
                "database": "flowbber",
                "key": "timestamp.iso8601"
            }
        }
    ]
}

uri

URI to connect to InfluxDB.

If this is set, options host, port, username, password and ssl will be ignored.

URI is in the form:

influxdb://username:password@localhost:8086/

Supported schemes are influxdb, https+influxdb and udp+influxdb.

Check function from_dsn for more information.

  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
        'nullable': True,
    }
    
  • Secret: True

host

Hostname or IP to connect to InfluxDB.

  • Default: localhost

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

path

Path of InfluxDB on the server to connect to.

  • Default: ''

  • Optional: True

  • Schema:

    {
        'type': 'string',
    }
    
  • Secret: False

port

Port to connect to InfluxDB.

  • Default: 8086

  • Optional: True

  • Schema:

    {
        'type': 'integer',
        'min': 0,
        'max': 65535,
    }
    
  • Secret: False

username

User to connect to InfluxDB.

  • Default: root

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

password

Password of the user.

  • Default: root

  • Optional: True

  • Schema:

    {
        'type': 'string',
    }
    
  • Secret: True

ssl

Use https instead of http to connect to InfluxDB.

  • Default: False

  • Optional: True

  • Schema:

    {
        'type': 'boolean',
    }
    
  • Secret: False

verify_ssl

Verify SSL certificates for HTTPS requests.

  • Default: False

  • Optional: True

  • Schema:

    {
        'type': 'boolean',
    }
    
  • Secret: False

database

Database name to connect to.

  • Default: N/A

  • Optional: False

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

key

Path to a value in the collected data that will be used as key timestamp when submitting the flattened data to the database.

Specify the path to the value by joining keys path with a . (dot).

For example given the following structure:

{
    'my_key': {
        'a_sub_key': {
            'yet_another': '2017-08-19T01:26:35.683529'
        }
    }
}

The corresponding path to use the value 2017-08-19T01:26:35.683529 as timestamp is my_key.a_sub_key.yet_another.

This option is nullable, and if null is provided (the default), the timestamp will be determined when submitting the data by calling Python’s datetime.now().isoformat().

In order to have a single timestamp for your pipeline you can include the Timestamp source and use a configuration similar to the following:

[[sources]]
type = "timestamp"
id = "timestamp"

    [sources.config]
    iso8601 = true

[[sinks]]
type = "influxdb"
id = "..."

    [sinks.config]
    uri = "..."
    database = "..."
    key = "timestamp.iso8601"
  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
        'nullable': True,
    }
    
  • Secret: False

keysjoiner

Character used to join the keys when flattening the collected data (see data flattening process above).

  • Default: .

  • Optional: True

  • Schema:

    {
        'type': 'string',
    }
    
  • Secret: False

keysjoinerreplace

Character used to replace occurrences of the keysjoiner character in the keys of the collected data previous to flatten it.

This option is nullable, and if null is provided, no character replacement to the keys will be done.

  • Default: :

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'nullable': True,
    }
    
  • Secret: False

Lcov HTML

This sink plugin run lcov genhtml executable to generate a html report of coverage.

Note

This sink requires the genhtml executable to be available in your system to run.

Dependencies:

pip3 install flowbber[lcov]

Usage:

[[sinks]]
type = "lcov_html"
id = "..."

    [sinks.config]
    key = "<id of lcov source>"
    output = "<output directory>"
    override = true
    create_parents = true
{
    "sinks": [
        {
            "type": "lcov_html",
            "id": "...",
            "config": {
                "key": "<id of lcov source>",
                "output": "<output directory>",
                "override": true,
                "create_parents": true
            }
        }
    ]
}

key

Id of a lcov source in the pipeline.

  • Default: N/A

  • Optional: False

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

output

Path to a directory to write the generated html.

  • Default: N/A

  • Optional: False

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

override

Override output directory if already exists.

  • Default: False

  • Optional: True

  • Schema:

    {
        'type': 'boolean',
    }
    
  • Secret: False

create_parents

Create output parent directories if don’t exist.

  • Default: True

  • Optional: True

  • Schema:

    {
        'type': 'boolean',
    }
    
  • Secret: False

MongoDB

This sink writes all collected data to a MongoDB NoSQL database.

The collected data is mostly unaltered from its original form, except that MongoDB document keys cannot contain . (dot) characters and cannot start with a $ (dollar) sign.

This sink will transform the keys in the collected data so its safe to submit it to MongoDB, for this the options dotreplace and dollarreplace are used to perform this transformation.

Dependencies:

pip3 install flowbber[mongodb]

Usage:

[[sinks]]
type = "mongodb"
id = "..."

    [sinks.config]
    uri = "mongodb://localhost:27017/"
    database = "flowbber"
    collection = "pipeline1data"
    key = "timestamp.epoch"
{
    "sinks": [
        {
            "type": "mongodb",
            "id": "...",
            "config": {
                "uri": "mongodb://localhost:27017/",
                "database": "flowbber",
                "collection": "pipeline1data",
                "key": "timestamp.epoch"
            }
        }
    ]
}

uri

URI to connect to MongoDB.

If this is set, options host, port, username and password will be ignored.

URI is in the form:

mongodb://username:password@host:port/

Except that for username and password reserved characters like :, /, + and @ must be percent encoded following RFC 2396:

from urllib.parse import quote_plus

uri = 'mongodb://{}:{}@{}:{}/'.format(
    quote_plus(user),
    quote_plus(password),
    host, port,
)

host can be a hostname or IP address. Unix domain sockets are also supported but must be percent encoded:

uri = 'mongodb://{}:{}@{}/'.format(
    quote_plus(user),
    quote_plus(password),
    quote_plus(socket_path)
)

For more information check MongoClient API and MongoDocs.

  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
        'nullable': True,
    }
    
  • Secret: True

host

Hostname, IP address or Unix domain socket path to connect to MongoDB.

  • Default: localhost

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

port

Port to connect to MongoDB.

  • Default: 27017

  • Optional: True

  • Schema:

    {
        'type': 'integer',
        'min': 0,
        'max': 65535,
    }
    
  • Secret: False

username

User to connect to MongoDB.

  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

password

Password of the user.

  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'string',
    }
    
  • Secret: True

ssl

Create the connection to the server using SSL.

  • Default: False

  • Optional: True

  • Schema:

    {
        'type': 'boolean',
    }
    
  • Secret: False

database

Database to connect to.

  • Default: N/A

  • Optional: False

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

collection

Collection to write to. This option defines two modes of operation depending if it is set to None or any other string value:

  1. This option is nullable, if null is provided (the default), the collection will be determined for each entry in the collected data bundle using its key. For example, given the following pipeline:

    [[sources]]
    type = "somesource"
    id = "first_id"
    
    [[sources]]
    type = "somesource"
    id = "second_id"
    
    [[sinks]]
    type = "mongodb"
    id = "mongodb"
    
        [sinks.config]
        uri = "..."
        database = "mydatabase"
    

    Collects the following data:

    {
        'first_id': {
            'abc': 'def',
            'xyz': '123',
        },
        'second_id': {
            '123': 'abc',
            'qwe': 'rty',
        },
    }
    

    Will be stored in MongoDB as follows:

    1. In collection first_id:

      {
          'abc': 'def',
          'xyz': '123',
      }
      
    2. In collection second_id:

      {
          '123': 'abc',
          'qwe': 'rty',
      }
      
  2. Otherwise, the whole data bundle will be stored to the collection specified.

In both cases the key option will be honored.

  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
        'nullable': True,
    }
    
  • Secret: False

key

Path to a value in the collected data that will be used as object id when submitting the safe data to the database.

Specify the path to the value by joining keys path with a . (dot).

For example given the following structure:

{
    'my_key': {
        'a_sub_key': {
            'yet_another': 123123123
        }
    }
}

The corresponding path to use the value 123123123 as timestamp is my_key.a_sub_key.yet_another.

This option is nullable, and if null is provided (the default), the object id will be generated by MongoDB itself.

In order to have a single timestamp for your pipeline you can include the Timestamp source and use a configuration similar to the following:

[[sources]]
type = "timestamp"
id = "timestamp"

    [sources.config]
    epoch = true

[[sinks]]
type = "mongodb"
id = "..."

    [sinks.config]
    uri = "..."
    database = "..."
    collection = "..."
    key = "timestamp.epoch"
  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
        'nullable': True,
    }
    
  • Secret: False

dotreplace

Character used to replace any occurrence of a . (dot) character in the keys of the collected data (see MongoDB data safety above).

  • Default: :

  • Optional: True

  • Schema:

    {
        'type': 'string',
    }
    
  • Secret: False

dollarreplace

Character used to replace any occurrence of a $ (dollar) character at the beginning of any key in the collected data (see MongoDB data safety above).

  • Default: &

  • Optional: True

  • Schema:

    {
        'type': 'string',
    }
    
  • Secret: False

Print

This sink plugin will pretty print all collected data to stdout.

This module uses third party module pprintpp for better pretty printing of large data structures.

Dependencies:

pip3 install flowbber[print]

Usage:

[[sinks]]
type = "print"
id = "..."
{
    "sinks": [
        {
            "type": "print",
            "id": "...",
            "config": {}
        }
    ]
}

Template

This sink will render specified Jinja2 template using the collected data as payload.

For the following collected data:

OrderedDict([
    ('timestamp', {
        'epoch': 1503280511,
        'epochf': 1503280511.560432,
        'iso8601': '2017-08-20T19:55:11',
        'strftime': '2017-08-20 19:55:11',
    }),
    ('user', {'uid': 1000, 'user': 'kuralabs'}),
])

The following template could be used:

<h1>My Rendered Template</h1>

<h2>Timestamp:</h2>

<ul>
    <li>Epoch: {{ data.timestamp.epoch }}</li>
    <li>ISO8601: {{ data.timestamp.iso8601 }}</li>
</ul>

<h2>User:</h2>

<ul>
    <li>UID: {{ data.user.uid }}</li>
    <li>User: {{ data.user.user }}</li>
</ul>

And rendering it with that data will result in:

<h1>My Rendered Template</h1>

<h2>Timestamp:</h2>

<ul>
    <li>Epoch: 1503280511</li>
    <li>ISO8601: 2017-08-20T19:55:11</li>
</ul>

<h2>User:</h2>

<ul>
    <li>UID: 1000</li>
    <li>User: kuralabs</li>
</ul>

Dependencies:

pip3 install flowbber[template]

Usage:

[[sinks]]
type = "template"
id = "..."

    [sinks.config]
    template = "template1.tpl"
    output = "render1.html"
    override = true
    create_parents = true
{
    "sinks": [
        {
            "type": "template",
            "id": "...",
            "config": {
                "template": "template1.tpl",
                "output": "render1.html",
                "override": true,
                "create_parents": true
            }
        }
    ]
}

template

URI to the Jinja2 template. If no schema is specified, file:// will be used.

Supported schemas:

  • file:// (the default): File system path to the template.

    file://path/to/template.tpl
    

    Or if using Substitutions:

    file://{pipeline.dir}/template.tpl
    

    When using this option, the selected template is able to load sibling templates (in the same directory) using Jinja2 import or extend directives:

    {% import "common.html" as common %}
    
    {% extends "base.html" %}
    
  • python://: A Python package and function name to load the content of the template.

    python://package.subpackage.function:template
    

    This is particularly useful if your templates are included as package_data in a package with your custom plugins, or you want to load the template using a custom logic from your flowconf.py.

    Specify the package and function using a dotted notation and the template name separated by a : (colon). The function receives the template name as argument and must return the content of the template.

    For example, in your flowconf.py:

    from jinja2 import TemplateNotFound
    
    def my_loading_function(template_name):
       if template_name == 'greeting_template':
           return '<h1>Hello { data.user.name }!</h1>'
       raise TemplateNotFound(template_name)
    

    This can be used in your pipeline as:

    When using this option, the selected template is able to load other templates that the function is able to resolve using Jinja2 import or extend directives:

    {% import "common" as common %}
    
    {% extends "base" %}
    
  • Default: N/A

  • Optional: False

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

output

Output file.

This option is nullable, if None is provided (the default), the output file name will be auto-determined using the name of the template and appending an out file extension.

For example:

template output
file://templates/the_template.tpl the_template.out
python://mypackage.load_template:my_template my_template.out
  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
        'nullable': True
    }
    
  • Secret: False

override

Override output file if already exists.

  • Default: False

  • Optional: True

  • Schema:

    {
        'type': 'boolean',
    }
    
  • Secret: False

create_parents

Create output file parent directories if don’t exist.

  • Default: True

  • Optional: True

  • Schema:

    {
        'type': 'boolean',
    }
    
  • Secret: False

payload

Extra data to pass to the template. Data provided using this configuration option will be available to the template under the payload variable.

Usage:

[[sinks]]
type = "template"
id = "mytemplate"

    [sinks.config.payload]
    project_name = "Flowbber"
    project_url = "https://docs.kuralabs.io/flowbber/"

And then in your template:

<p>Visit our project page at
    <a href="{{ payload.project_url }}">{{ payload.project_name }}</a>
</p>
  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'dict',
        'nullable': True,
    }
    
  • Secret: False

filters

Custom filters to pass to the template.

This options must map the name of the filter with the path to the function that implements it. Any path to a Python function is valid, including using the local flowconf.py file.

Usage:

[[sinks]]
type = "template"
id = "mytemplate"

    [sinks.config.filters]
    coverage_class = "flowconf.filter_coverage_class"

And then in your flowconf.py (or package with custom components):

def filter_coverage_class(value, threshold=(0.5, 0.8)):
    lower, higher = threshold
    if value < lower:
        return 'low'
    if value < higher:
        return 'mid'
    return 'high'

The above filter can then be used as:

{% for filename, filedata in data.coverage.files.items() %}
<ul>
    <li class="{{ filedata.line_rate|coverage_class }}">
        {{ filename }}
    </li>
</ul>
{% endfor %}
  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'dict',
        'keyschema': {
            'type': 'string',
            'empty': False,
        },
        'valueschema': {
            'type': 'string',
            'empty': False,
        },
        'nullable': True,
    }
    
  • Secret: False