To implement your custom components you must create a class that subclasses one of the 3 available components as described in Implementing Components.
Once implemented, you can register it to make it available to Flowbber as described in Registering Components.
To allow passing configuration options to your component, implement the
declare_config()
method as described in Specifying Options.
Depending on the type of component you want to create, subclass one of the following classes and implement its abstract method.
To implement a Source component, subclass the Flowbber’s
flowbber.components.Source
class and implement the
flowbber.components.Source.collect()
abstract method.
flowbber.components.
Source
(index, type_, id_, optional=False, timeout=None, config=None)¶Main base class to implement a Source.
Source’s collect()
method must return a dictionary with the collected data,
using key as label and value as value of the label. The collected data
structure can be more complicated and nested as required, as long as JSON
serializable objects are used, for example built-in datatypes like integers,
booleans, floats, strings, and data structures like dictionaries and lists.
For example, consider a basic implementation of the built-in EnvSource that collects data from the environment:
from os import environ
from flowbber.components import Source
class MySource(Source):
def collect(self):
return {
key, value
for key, value in environ.items()
}
To implement an Aggregator component, subclass the Flowbber’s
flowbber.components.Aggregator
class and implement the
flowbber.components.Aggregator.accumulate()
abstract method.
flowbber.components.
Aggregator
(index, type_, id_, optional=False, timeout=None, config=None)¶Main base class to implement an Aggregator.
accumulate
(data)¶Perform analysis or accumulate from the given data.
The aggregator can:
Add values:
data['some_key'] = {'value': 1000}
Delete values:
del data['some_key']
Modify values:
data['some_key']['value'] = 2000
All aggregators subclasses must implement this abstract method.
Aggregator’s accumulate()
method takes the editable bundle of data
collected by the sources and doesn’t return anything, but it is allowed to
mutable, add or delete data from the bundle.
For example, consider a pipeline that uses twice the built-in CoberturaSource because your codebase has a part written in C and another one in Go, and you need a global coverage metric.
The pipeline’s sources could look like this:
[[sources]]
type = "cobertura"
id = "coverage_c"
[sources.config]
xmlpath = "{git.root}/build/test/c/coverage.xml"
[[sources]]
type = "cobertura"
id = "coverage_go"
[sources.config]
xmlpath = "{git.root}/build/test/go/coverage.xml"
The data gathered by both sources could look like this:
OrderedDict([
('coverage_c', {
'files': {
# ...
},
'total': {
'total_statements': 1000,
'total_misses': 200,
'line_rate': 0.80
}
}),
('coverage_go', {
'files': {
# ...
},
'total': {
'total_statements': 200,
'total_misses': 100,
'line_rate': 0.50
}
})
])
We could implement a basic aggregator that sums the total of both suites:
from flowbber.components import Aggregator
class MyAggregator(Aggregator):
def accumulate(self, data):
ids = ['coverage_c', 'coverage_go']
total_statements = 0
total_misses = 0
for datakey in ids:
total = data[datakey]['total']
total_statements += total['total_statements']
total_misses += total['total_misses']
total_hits = total_statements - total_misses
line_rate = total_hits / total_statements
data[self.id] = {
'total_statements': total_statements,
'total_misses': total_misses,
'line_rate': line_rate
}
Assuming that we register this aggregator as
my_aggregator
, we could add it to the pipeline definition file like this:
[[aggregators]]
type = "my_aggregator"
id = "total_coverage"
The above aggregator will transform the collected data like this:
OrderedDict([
('coverage_c', {
# ...
}),
('coverage_go', {
# ...
}),
('total_coverage', {
'total_statements': 1200,
'total_misses': 300,
'line_rate': 0.75
})
])
Note that while we hardwired the ids of the sources this could be parametrized
to the component as described in Specifying Options. Also note they way we used
self.id
as the key to add to the bundle.
To implement a Sink component, subclass the Flowbber’s
flowbber.components.Sink
class and implement the
flowbber.components.Sink.distribute()
abstract method.
flowbber.components.
Sink
(index, type_, id_, optional=False, timeout=None, config=None)¶Main base class to implement a Sink.
distribute
(data)¶Distribute the collected data.
All sinks subclasses must implement this abstract method.
Sink’s distribute()
method takes a copy of the final collected data (after
passing all aggregators) and returns nothing.
For example, consider a basic and naive implementation of the built-in MongoDBSink that submits data to a MongoDB database:
from flowbber.components import Sink
from flowbber.logging import get_logger
log = get_logger(__name__)
class SimpleMongoDBSink(Sink):
def distribute(self, data):
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
database = client['mydatabase']
collection = database['mycollection']
data_id = collection.insert_one(data).inserted_id
log.info('Inserted data to MongoDB with id {}'.format(data_id))
In this example we hardwired the connection URI, database name and collection
name. This can be parametrized as described in Specifying Options. Check the
code of flowbber.plugins.sinks.mongodb.MongoDBSink
for a full example
on the implementation of the MongoDB sink.
Flowbber uses setuptools’ entrypoints to perform dynamic discovery of plugins.
Asumming your package is called mypackage
, add an entry_points
keyword
argument to your package setup.py
’s setup()
function as follows:
setup(
# ...
# Entry points
entry_points={
'flowbber_plugin_sources_1_0': [
'mysource = mypackage.sources.mycustom:MyCustomSource'
],
'flowbber_plugin_aggregators_1_0': [
'myaggregator = mypackage.aggregators.mycustom:MyCustomAggregator'
],
'flowbber_plugin_sinks_1_0': [
'mycustom = mypackage.sinks.mycustom:MyCustomSink'
]
}
)
As key, use one of the following entrypoints:
flowbber_plugin_sources_1_0
flowbber_plugin_aggregators_1_0
flowbber_plugin_sinks_1_0
The elements in the list associated with those entrypoints have the following structure:
<your_new_type> = <yourpackage>.<submodule>.<submodule>:<YourComponentClass>
Please note the :
(colon) after the module path. Once installed alongside
Flowbber, your package will be available for use using type = your_new_type
in your pipeline definition file.
In many situations, creating a whole Python package with just a couple of components, installing it and keeping track of its version is too much overhead.
For example, what if your custom pipeline can be implemented with Flowbber’s built-in Sources and Sinks, but you only require one additional simple aggregator?
Flowbber supports creating local configuration modules that allows to create components specific to a pipeline.
Create a flowconf.py
file next to your pipeline definition file. If you
have placed your pipeline under version control, keeping track of both the
components and the definition of the pipeline is straightforward. Flowbber
will load the components defined in this way and made it available to your
pipeline.
In the flowconf.py
file, you can register your components using the
register
function for each loader:
Sources:
from flowbber.loaders import source
from flowbber.components import Source
@source.register('my_source')
class MySource(Source):
def collect(self):
return {'my_value': 1000}
Aggregators:
from flowbber.loaders import aggregator
from flowbber.components import Aggregator
@aggregator.register('my_aggregator')
class MyAggregator(Aggregator):
def accumulate(self, data):
data['num_sources'] = {'total': len(data.keys())}
Sinks:
from flowbber.loaders import sink
from flowbber.components import Sink
@sink.register('my_sink')
class MySink(Sink):
def distribute(self, data):
print(data)
With the above configuration file, the pipeline definition file can use those components:
[[sources]]
type = "my_source"
id = "my_source1"
[[sources]]
type = "my_source"
id = "my_source2"
[[aggregators]]
type = "my_aggregator"
id = "my_aggregator1"
[[sinks]]
type = "my_sink"
id = "my_sink1"
The resulting collected data will be:
OrderedDict([
('my_source1', {'my_value': 1000}),
('my_source2', {'my_value': 1000}),
('num_sources', {'total': 2})
])
Any component can implement the method
flowbber.components.base.Component.declare_config()
:
Component.
declare_config
(config)Declare the configuration options of this component.
flowbber.config.Configurator
) – The configuration manager for this component.It is expected that for each option the component requires to declare, a call
to flowbber.config.Configurator.add_option()
is performed.
Configurator.
add_option
(key, default=None, optional=False, schema=None, secret=False)Declare an option.
For example, let’s retake our previous simple
MongoDB sink example. We want to parametrize,
among other things, the uri
, database
and collection
.
def declare_config(self, config):
config.add_option(
'uri',
default=None,
optional=True,
schema={
'type': 'string',
'empty': False,
'nullable': True,
},
secret=True,
)
config.add_option(
'database',
schema={
'type': 'string',
'empty': False,
},
)
config.add_option(
'collection',
schema={
'type': 'string',
'empty': False,
},
)
The schema
keyword argument receives a dictionary that describes the schema
for which the value will be validated. Validation is performed using the
awesome Cerberus library, and allows to create schemas for complex data types
and values. If the schema is set to None
, no validation will be performed.
Once validated and normalized, configuration options will be available under
the read only self.config
attribute. The keys of the configuration options
are mapped to an object with three attributes:
Key of the configuration option. Same as given as first argument to
add_option
.
The validated and normalized or default value of the option.
Boolean indicating that the option is a secret and thus shouldn’t
be printed, logged, stored in plain text, etc. Same as add_option
’s
secret
keyword argument.
In this sense, it is important to note that Flowbber won’t log or store any option marked as secret.
Following our simple MongoDB example, we could use the configurations options as follows:
from flowbber.components import Sink
class SimpleMongoDBSink(Sink):
def declare_config(self, config):
# Declare options ...
def distribute(self, data):
from pymongo import MongoClient
client = MongoClient(self.config.uri.value)
database = client[self.config.database.value]
collection = database[self.config.collection.value]
# ...
Finally, in some situations a basic option by option validation is insufficient, for example, if two options are incompatible between them.
In this situations the component can register a custom validation function
using the flowbber.config.Configurator.add_validator()
method:
Configurator.
add_validator
(validator)Add a custom validation function.
The function registered will receive the validated data as a dictionary just before its freezing and can perform any modification required on the data.
For example, consider the MongoDBSink that allows to define the connection parameters either as a single string or as multiple values:
def declare_config(self, config):
# ... calls to add_option()
# Check if uri is defined and if so then delete other keys
def custom_validator(validated):
if validated['uri'] is not None:
del validated['host']
del validated['port']
del validated['username']
del validated['password']
else:
del validated['uri']
config.add_validator(custom_validator)
This validation function could perform checks against values, for example in TimestampSource where at least one format needs to be enabled:
def declare_config(self, config):
# ... calls to add_option()
# Check that at least one format is enabled
def custom_validator(validated):
if not any(validated.values()):
raise ValueError(
'The timestamp source requires at least one timestamp '
'format enabled'
)
config.add_validator(custom_validator)
Both sources and sinks run in subprocesses, so
logging and printing directly to stdout
or stderr
will mangle the
output. In order to log and print from any context without issues Flowbber
provides the functions:
flowbber.logging.get_logger()
that allows to get a multiprocess safe
logger.flowbber.logging.print()
that allows to safely print to stdout
or
stderr
in a multiprocess context.flowbber.logging.
get_logger
(name)Return a multiprocess safe logger.
logging.Logger
.flowbber.logging.
print
(obj, fd='stdout')Enqueue a print to the given fd.
Nevertheless, it is important to note that in Flowbber, Python’s traditional
logging.getLogger()
loggers will also work correctly, for example,
inside a third party library used in your sources or sinks. So no change needs
to be done if Flowbber’s get_logger
function isn’t used for logging, it is
just the recommended one.
Usage:
from flowbber.logging import get_logger, print
log = get_logger(__name__)
def do_foo(say):
print(say)
print(say, fd='stderr')
log.info(say)
It is possible to build a Python package that implements a specific pipeline based on Flowbber.
For example, consider a daemon that reads values from connected Arduino based sensors inside a RaspberryPI and submits it to a server in the cloud, either using a web service or a database management system.
As another example consider a monitoring daemon that grabs the CPU usage information from the system and submits it to a InfluxDB and / or MongoDB database.
Let’s code a basic version of our hypothetical daemon cpud
. Let’s assume
that its configuration file is located at /etc/cpud.toml
and look like
this:
[cpud]
verbosity = 3
# Take a sample each 10 seconds
frequency = 10
[influxdb]
uri = "influxdb://localhost:8086/"
database = "cpud"
[mongodb]
uri = "mongodb://localhost:27017/"
database = "cpud"
collection = "cpuddata"
A basic and naive implementation of such daemon could looks like this:
from toml import loads
from flowbber.pipeline import Pipeline
from flowbber.scheduler import Scheduler
from flowbber.logging import setup_logging
from flowbber.inputs import validate_definition
def build_definition(config):
"""
Build a pipeline definition based on given configuration.
"""
definition = {
'sources': [
{'type': 'timestamp', 'id': 'timekeys', 'config': {
'epoch': True, # Key for MongoDB
'iso8601': True, # Key for InfluxDB
}},
{'type': 'cpu', 'id': 'cpu'},
],
'sinks': [],
'aggregators': [],
}
if config['influxdb']:
definition['sinks'].append({
'type': 'influxdb',
'id': 'influxdb',
'config': {
'uri': config['influxdb']['uri'],
'database': config['influxdb']['database'],
'key': 'timekeys.iso8601',
}
})
if config['mongodb']:
definition['sinks'].append({
'type': 'mongodb',
'id': 'mongodb',
'config': {
'uri': config['mongodb']['uri'],
'database': config['mongodb']['database'],
'collection': config['mongodb']['collection'],
'key': 'timekeys.epoch',
}
})
if not definition['sinks']:
raise RuntimeError('No sinks configured')
return definition
def main():
# Read configuration
with open('/etc/cpud.toml') as fd:
config = loads(fd.read())
# Setup multiprocess logging
setup_logging(config['cpud']['verbosity'])
# Build pipeline definition
definition = build_definition(config)
# Validate pipeline definition
validated = validate_definition(definition)
# Build pipeline
pipeline = Pipeline(
validated,
'cpud',
app='cpud',
)
# Build and run scheduler
scheduler = Scheduler(
pipeline,
config['cpud']['frequency'],
stop_on_failure=True
)
scheduler.run()
The most relevant parts of this example is that:
We programmatically build the Pipeline Definition from
cpud
’s own configuration file.
Note that the package itself could package the required components (Sources, Aggregators or Sinks) altogether.
In particular, we use flowbber.inputs.validate_definition()
that
normalizes and validates the pipeline definition we just built.
We create an instance of flowbber.pipeline.Pipeline
with the
definition.
For one time execution applications we can use
flowbber.pipeline.Pipeline.run()
method.
flowbber.pipeline.
Pipeline
(pipeline, name, app='flowbber')Pipeline executor class.
This class will fetch all components from locally declared components and entrypoint plugins, create instance of all of them and execute the pipeline in order, creating subprocesses when needed.
Execution of the pipeline is registered in a journal that is returned and / or saved when the execution of the pipeline ends.
executed
Number of times this pipeline has been executed.
name
Name of this pipeline.
run
()Execute pipeline.
This method can be called several times after instantiating the pipeline.
We create an instance of flowbber.scheduler.Scheduler
that will
control the continuous execution of the pipeline.
flowbber.scheduler.
Scheduler
(pipeline, frequency, samples=None, start=None, stop_on_failure=False)Schedule the execution of a pipeline.
The scheduler will try to run the pipeline on schedule. If the pipeline
takes too long, longer that the programmed frequency, the scheduler will
increment the runs_missed
counter for each execution that failed to
run at the expected schedule because the previous run was still running and
will start the missed pipeline execution right away.
flowbber.pipeline.Pipeline
.) – The pipeline to execute.None
, the scheduler will continue taking samples
forever.None
, the scheduler will start immediately.last_run
Timestamp in seconds since the epoch of the last run of the pipeline.
run
()Start the scheduler.
runs
Read-only dictionary with the numbers of categorized runs of the pipeline:
{
'passed': 10,
'failed': 2,
'missed': 0,
}
We first thing we do is call flowbber.logging.setup_logging()
with the
configured verbosity level to setup Flowbber’s multiprocess safe logging and
printing.
If this function isn’t called as soon as possible or not called at all then logging performed from sources and sinks that run in their own subprocess will mangle the logging output.
flowbber.logging.
setup_logging
(verbosity=0)Setup logging for this process.
The first time it is called it will create a subprocess to manage the logging and printing, setup the subprocess for stream logging to stdout and setup the main process to queue logging.
In consequence, any subprocess of the main process will inherit the queue logging and become multiprocess logging safe.
This method can be called from subprocesses, but if at least one logging has been performed it will fail as the handler will already exists.
LoggingManager.LEVELS
. The greater the number the more information
is provided, with 0 as initial level.