Sinks are plugins that allow to store data in different formats.
The following sinks plugins are included as part of Flowbber.
This sink writes all collected data to a JSON file.
Important
This class inherits several inclusion and exclusion configuration options for filtering data before using it. See FilterSink Options for more information.
Dependencies:
pip3 install flowbber[archive]
Usage:
[[sinks]]
type = "archive"
id = "..."
[sinks.config]
output = "data.json"
encoding = "utf-8"
override = true
create_parents = true
pretty = false
compress = false
{
"sinks": [
{
"type": "archive",
"id": "...",
"config": {
"output": "data.json",
"encoding": "utf-8",
"override": true,
"create_parents": true,
"pretty": false,
"compress": false
}
}
]
}
Path to JSON file to write the collected data.
Default: N/A
Optional: False
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Encoding to use to encode the file.
Default: utf-8
Optional: True
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Override output file if already exists.
Default: False
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
Create output file parent directories if don’t exist.
Default: False
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
Compress the JSON output file in a Zip archive.
If using compression, the .zip
extension will be automatically appended
to the output
filename parameter if not present.
The created archive will contain a JSON file named as the output
parameter
without the .zip
extension. For example:
compress parameter |
output parameter |
Saved as | File inside Zip |
---|---|---|---|
True |
archive.json |
archive.json.zip |
archive.json |
True |
archive.json.zip |
archive.json.zip |
archive.json |
Compression uses Python’s ZipFile
module using the ZIP_DEFLATED
option,
thus requiring the zlib
module.
Default: False
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
This sink creates files with certain information from the whole data collected. The user needs to specify what data he/she wants to extract, which unique id to use for each file, where the files will be stored and what format does he/she wants for the file.
Important
This class inherits several inclusion and exclusion configuration options for filtering data before using it. See FilterSink Options for more information.
Dependencies:
pip3 install flowbber[data_splitter]
Usage:
[[sinks]]
type = "data_splitter"
id = "..."
[sinks.config]
create_parents = true
output = ".../hardware"
id_selector = "hardware.suites.pytest.tests.*"
data_selector = "hardware.suites.pytest.*"
format = "str"
{
"sinks": [
{
"type": "data_splitter",
"id": "...",
"config": {
"create_parents": true,
"output": ".../hardware",
"id_selector": "hardware.suites.pytest.tests.*",
"data_selector": "hardware.suites.pytest.*"
}
}
]
}
Encoding to use to encode the file.
Default: utf-8
Optional: True
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Pattern to a path to where collected data files will be stored.
The path should use the {{id}}
replacement pattern to allow the sink to
determine its final path using the IDs found.
For example:
path = "/tmp/myfiles/{{id}}/{{id}}.yaml"
Default: N/A
Optional: False
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Create output file parent directories if don’t exist.
Default: False
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
Selector is a pattern using fnmatch that matches branches in the data tree. The branches found will be used as ID for the filenames.
Default: N/A
Optional: False
Schema:
{
'type': 'list',
'empty': False,
}
Secret: False
Selector is a pattern using fnmatch that matches branches in the data tree. The branches found will be used as the data of the files.
Default: N/A
Optional: False
Schema:
{
'type': 'list',
'empty': False,
}
Secret: False
This sink writes all collected data to a InfluxDB time series database.
Important
This class inherits several inclusion and exclusion configuration options for filtering data before using it. See FilterSink Options for more information.
In order to be able to track in time the complex data collected by the sources this sink requires to “flatten” the data first. The process will transform an arbitrarily deep dictionary tree into a fixed depth dictionary that maps the keys of the sources with the measurements they performed.
To accomplish this, the flattening process will:
keysjoiner
character) until a leaf
value that has a datatype supported by InfluxDB (string, float, integer,
boolean or None) is found.So, for example, consider the data collected by the Cobertura
source:
{
'coverage': {
'files': {
'__init__.py': {
'line_rate': 1.0,
'total_misses': 0,
'total_statements': 10,
},
'__main__.py': {
'line_rate': 0.5,
'total_misses': 5,
'total_statements': 10,
},
# ...
},
'total': {
'line_rate': 0.75,
'total_misses': 5,
'total_statements': 20,
}
}
}
The above structure will be transformed into:
{
'coverage': {
'files.__init__:py.line_rate': 1.0,
'files.__init__:py.total_misses': 0,
'files.__init__:py.total_statements': 10,
'files.__main__:py.line_rate': 0.5,
'files.__main__:py.total_misses': 5,
'files.__main__:py.total_statements': 10,
# ...
'total.line_rate': 0.75,
'total.total_misses': 5,
'total.total_statements': 20,
}
}
Important
Please note the .
in the file name was changed to :
before joining
the keys. This replacement and joining is controlled by the
keysjoinerreplace
and keysjoiner
options.
Also note that the leaf keys must map to a value that can be used as field value in InfluxDB (string, float, integer, boolean or None), if not, the flattening will fail.
Or in case of lists:
{
'key1': ['a', 'b', 'c'],
'key2': [
{'a': 1},
{'b': 2},
{'c': 3},
]
}
The flattening results in:
{
'key.0' : 'a',
'key.1' : 'b',
'key.2' : 'c',
'key.0.a' : 1,
'key.1.b' : 2,
'key.2.c' : 3,
}
You may run this sink with DEBUG
verbosity (-vvv
) to analyze all
transformations performed.
Dependencies:
pip3 install flowbber[influxdb]
Usage:
[[sinks]]
type = "influxdb"
id = "..."
[sinks.config]
uri = "influxdb://localhost:8086/"
database = "flowbber"
key = "timestamp.iso8601"
{
"sinks": [
{
"type": "influxdb",
"id": "...",
"config": {
"uri": "influxdb://localhost:8086/",
"database": "flowbber",
"key": "timestamp.iso8601"
}
}
]
}
URI to connect to InfluxDB.
If this is set, options host
, port
, username
, password
and
ssl
will be ignored.
URI is in the form:
influxdb://username:password@localhost:8086/
Supported schemes are influxdb
, https+influxdb
and udp+influxdb
.
Check function from_dsn for more information.
Default: None
Optional: True
Schema:
{
'type': 'string',
'empty': False,
'nullable': True,
}
Secret: True
Hostname or IP to connect to InfluxDB.
Default: localhost
Optional: True
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Path of InfluxDB on the server to connect to.
Default: ''
Optional: True
Schema:
{
'type': 'string',
}
Secret: False
Port to connect to InfluxDB.
Default: 8086
Optional: True
Schema:
{
'type': 'integer',
'min': 0,
'max': 65535,
}
Secret: False
User to connect to InfluxDB.
Default: root
Optional: True
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Password of the user.
Default: root
Optional: True
Schema:
{
'type': 'string',
}
Secret: True
Use https instead of http to connect to InfluxDB.
Default: False
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
Verify SSL certificates for HTTPS requests.
Default: False
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
Database name to connect to.
Default: N/A
Optional: False
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Path to a value in the collected data that will be used as key timestamp when submitting the flattened data to the database.
Specify the path to the value by joining keys path with a .
(dot).
For example given the following structure:
{
'my_key': {
'a_sub_key': {
'yet_another': '2017-08-19T01:26:35.683529'
}
}
}
The corresponding path to use the value 2017-08-19T01:26:35.683529
as
timestamp is my_key.a_sub_key.yet_another
.
This option is nullable, and if null is provided (the default), the timestamp
will be determined when submitting the data by calling Python’s
datetime.now().isoformat()
.
In order to have a single timestamp for your pipeline you can include the Timestamp source and use a configuration similar to the following:
[[sources]]
type = "timestamp"
id = "timestamp"
[sources.config]
iso8601 = true
[[sinks]]
type = "influxdb"
id = "..."
[sinks.config]
uri = "..."
database = "..."
key = "timestamp.iso8601"
Default: None
Optional: True
Schema:
{
'type': 'string',
'empty': False,
'nullable': True,
}
Secret: False
Character used to join the keys when flattening the collected data (see data flattening process above).
Default: .
Optional: True
Schema:
{
'type': 'string',
}
Secret: False
Character used to replace occurrences of the keysjoiner
character in the
keys of the collected data previous to flatten it.
This option is nullable, and if null is provided, no character replacement to the keys will be done.
Default: :
Optional: True
Schema:
{
'type': 'string',
'nullable': True,
}
Secret: False
This sink plugin run lcov genhtml
executable to generate a html report of
coverage.
Note
This sink requires the genhtml
executable to be available in your system
to run.
Dependencies:
pip3 install flowbber[lcov_html]
Usage:
[[sinks]]
type = "lcov_html"
id = "..."
[sinks.config]
key = "<id of lcov source>"
output = "<output directory>"
override = true
create_parents = true
{
"sinks": [
{
"type": "lcov_html",
"id": "...",
"config": {
"key": "<id of lcov source>",
"output": "<output directory>",
"override": true,
"create_parents": true
}
}
]
}
Id of a lcov source in the pipeline.
Default: N/A
Optional: False
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Path to a directory to write the generated html.
Default: N/A
Optional: False
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Override output directory if already exists.
Default: False
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
Create output parent directories if don’t exist.
Default: True
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
This sink writes all collected data to a MongoDB NoSQL database.
Important
This class inherits several inclusion and exclusion configuration options for filtering data before using it. See FilterSink Options for more information.
The collected data is mostly unaltered from its original form, except that
MongoDB document keys cannot contain .
(dot) characters and cannot start
with a $
(dollar) sign.
This sink will transform the keys in the collected data so its safe to submit
it to MongoDB, for this the options dotreplace
and dollarreplace
are
used to perform this transformation.
Dependencies:
pip3 install flowbber[mongodb]
Usage:
[[sinks]]
type = "mongodb"
id = "..."
[sinks.config]
uri = "mongodb://localhost:27017/"
database = "flowbber"
collection = "pipeline1data"
key = "timestamp.epoch"
overwrite = true
{
"sinks": [
{
"type": "mongodb",
"id": "...",
"config": {
"uri": "mongodb://localhost:27017/",
"database": "flowbber",
"collection": "pipeline1data",
"key": "timestamp.epoch"
}
}
]
}
URI to connect to MongoDB.
If this is set, options host
, port
, username
and password
will
be ignored.
URI is in the form:
mongodb://username:password@host:port/
Except that for username and password reserved characters like :
, /
,
+
and @
must be percent encoded following RFC 2396:
from urllib.parse import quote_plus
uri = 'mongodb://{}:{}@{}:{}/'.format(
quote_plus(user),
quote_plus(password),
host, port,
)
host
can be a hostname or IP address. Unix domain sockets are also
supported but must be percent encoded:
uri = 'mongodb://{}:{}@{}/'.format(
quote_plus(user),
quote_plus(password),
quote_plus(socket_path)
)
For more information check MongoClient API and MongoDocs.
Default: None
Optional: True
Schema:
{
'type': 'string',
'empty': False,
'nullable': True,
}
Secret: True
Hostname, IP address or Unix domain socket path to connect to MongoDB.
Default: localhost
Optional: True
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Port to connect to MongoDB.
Default: 27017
Optional: True
Schema:
{
'type': 'integer',
'min': 0,
'max': 65535,
}
Secret: False
User to connect to MongoDB.
Default: None
Optional: True
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Password of the user.
Default: None
Optional: True
Schema:
{
'type': 'string',
}
Secret: True
Create the connection to the server using SSL.
Default: False
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
Database to connect to.
Default: N/A
Optional: False
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Collection to write to. This option defines two modes of operation depending if
it is set to None
or any other string value:
This option is nullable, if null is provided (the default), the collection will be determined for each entry in the collected data bundle using its key. For example, given the following pipeline:
[[sources]]
type = "somesource"
id = "first_id"
[[sources]]
type = "somesource"
id = "second_id"
[[sinks]]
type = "mongodb"
id = "mongodb"
[sinks.config]
uri = "..."
database = "mydatabase"
Collects the following data:
{
'first_id': {
'abc': 'def',
'xyz': '123',
},
'second_id': {
'123': 'abc',
'qwe': 'rty',
},
}
Will be stored in MongoDB as follows:
In collection first_id
:
{
'abc': 'def',
'xyz': '123',
}
In collection second_id
:
{
'123': 'abc',
'qwe': 'rty',
}
Otherwise, the whole data bundle will be stored to the collection specified.
In both cases the key
option will be honored.
Default: None
Optional: True
Schema:
{
'type': 'string',
'empty': False,
'nullable': True,
}
Secret: False
Path to a value in the collected data that will be used as object id when submitting the safe data to the database.
Specify the path to the value by joining keys path with a .
(dot).
For example given the following structure:
{
'my_key': {
'a_sub_key': {
'yet_another': 123123123
}
}
}
The corresponding path to use the value 123123123
as timestamp is
my_key.a_sub_key.yet_another
.
This option is nullable, and if null is provided (the default), the object id will be generated by MongoDB itself.
In order to have a single timestamp for your pipeline you can include the Timestamp source and use a configuration similar to the following:
[[sources]]
type = "timestamp"
id = "timestamp"
[sources.config]
epoch = true
[[sinks]]
type = "mongodb"
id = "..."
[sinks.config]
uri = "..."
database = "..."
collection = "..."
key = "timestamp.epoch"
Default: None
Optional: True
Schema:
{
'type': 'string',
'empty': False,
'nullable': True,
}
Secret: False
Indicates if mongo should allow an entry overwrite.
Default: False
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
Character used to replace any occurrence of a .
(dot) character in the keys
of the collected data (see MongoDB data safety above).
Default: :
Optional: True
Schema:
{
'type': 'string',
}
Secret: False
Character used to replace any occurrence of a $
(dollar) character at the
beginning of any key in the collected data (see MongoDB data safety above).
Default: &
Optional: True
Schema:
{
'type': 'string',
}
Secret: False
This sink plugin will pretty print all collected data to stdout
.
This module uses third party module pprintpp for better pretty printing of large data structures.
Important
This class inherits several inclusion and exclusion configuration options for filtering data before using it. See FilterSink Options for more information.
Dependencies:
pip3 install flowbber[print]
Usage:
[[sinks]]
type = "print"
id = "..."
{
"sinks": [
{
"type": "print",
"id": "...",
"config": {}
}
]
}
This sink will render specified Jinja2 template using the collected data as payload.
Important
This class inherits several inclusion and exclusion configuration options for filtering data before using it. See FilterSink Options for more information.
For the following collected data:
OrderedDict([
('timestamp', {
'epoch': 1503280511,
'epochf': 1503280511.560432,
'iso8601': '2017-08-20T19:55:11',
'strftime': '2017-08-20 19:55:11',
}),
('user', {'uid': 1000, 'user': 'kuralabs'}),
])
The following template could be used:
<h1>My Rendered Template</h1>
<h2>Timestamp:</h2>
<ul>
<li>Epoch: {{ data.timestamp.epoch }}</li>
<li>ISO8601: {{ data.timestamp.iso8601 }}</li>
</ul>
<h2>User:</h2>
<ul>
<li>UID: {{ data.user.uid }}</li>
<li>User: {{ data.user.user }}</li>
</ul>
And rendering it with that data will result in:
<h1>My Rendered Template</h1>
<h2>Timestamp:</h2>
<ul>
<li>Epoch: 1503280511</li>
<li>ISO8601: 2017-08-20T19:55:11</li>
</ul>
<h2>User:</h2>
<ul>
<li>UID: 1000</li>
<li>User: kuralabs</li>
</ul>
Dependencies:
pip3 install flowbber[template]
Usage:
[[sinks]]
type = "template"
id = "..."
[sinks.config]
template = "template1.tpl"
output = "render1.html"
override = true
create_parents = true
{
"sinks": [
{
"type": "template",
"id": "...",
"config": {
"template": "template1.tpl",
"output": "render1.html",
"override": true,
"create_parents": true
}
}
]
}
URI to the Jinja2 template. If no schema is specified, file://
will be
used.
Supported schemas:
file://
(the default): File system path to the template.
file://path/to/template.tpl
Or if using Substitutions:
file://{pipeline.dir}/template.tpl
When using this option, the selected template is able to load sibling
templates (in the same directory) using Jinja2 import
or extend
directives:
{% import "common.html" as common %}
{% extends "base.html" %}
python://
: A Python package and function name to load the content of the
template.
python://package.subpackage.function:template
This is particularly useful if your templates are included as
package_data
in a package with your custom plugins, or you want to load
the template using a custom logic from your flowconf.py
.
Specify the package and function using a dotted notation and the template
name separated by a :
(colon). The function receives the template name as
argument and must return the content of the template.
For example, in your flowconf.py
:
from jinja2 import TemplateNotFound
def my_loading_function(template_name):
if template_name == 'greeting_template':
return '<h1>Hello { data.user.name }!</h1>'
raise TemplateNotFound(template_name)
This can be used in your pipeline as:
When using this option, the selected template is able to load other templates
that the function is able to resolve using Jinja2 import
or extend
directives:
{% import "common" as common %}
{% extends "base" %}
Default: N/A
Optional: False
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Output file.
This option is nullable, if None
is provided (the default), the output file
name will be auto-determined using the name of the template and
appending an out
file extension.
For example:
template |
output |
---|---|
file://templates/the_template.tpl |
the_template.out |
python://mypackage.load_template:my_template |
my_template.out |
Default: None
Optional: True
Schema:
{
'type': 'string',
'empty': False,
'nullable': True
}
Secret: False
Override output file if already exists.
Default: False
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
Create output file parent directories if don’t exist.
Default: True
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
Extra data to pass to the template. Data provided using this configuration
option will be available to the template under the payload
variable.
Usage:
[[sinks]]
type = "template"
id = "mytemplate"
[sinks.config.payload]
project_name = "Flowbber"
project_url = "https://docs.kuralabs.io/flowbber/"
And then in your template:
<p>Visit our project page at
<a href="{{ payload.project_url }}">{{ payload.project_name }}</a>
</p>
Default: None
Optional: True
Schema:
{
'type': 'dict',
'nullable': True,
}
Secret: False
Custom filters to pass to the template.
This options must map the name of the filter with the path to the function that
implements it. Any path to a Python function is valid, including using the
local flowconf.py
file.
Usage:
[[sinks]]
type = "template"
id = "mytemplate"
[sinks.config.filters]
coverage_class = "flowconf.filter_coverage_class"
And then in your flowconf.py
(or package with custom components):
def filter_coverage_class(value, threshold=(0.5, 0.8)):
lower, higher = threshold
if value < lower:
return 'low'
if value < higher:
return 'mid'
return 'high'
The above filter can then be used as:
{% for filename, filedata in data.coverage.files.items() %}
<ul>
<li class="{{ filedata.line_rate|coverage_class }}">
{{ filename }}
</li>
</ul>
{% endfor %}
Default: None
Optional: True
Schema:
{
'type': 'dict',
'keysrules': {
'type': 'string',
'empty': False,
},
'valuesrules': {
'type': 'string',
'empty': False,
},
'nullable': True,
}
Secret: False