flowbber.plugins.sinks.influxdb
¶This sink writes all collected data to a InfluxDB time series database.
Important
This class inherits several inclusion and exclusion configuration options for filtering data before using it. See FilterSink Options for more information.
In order to be able to track in time the complex data collected by the sources this sink requires to “flatten” the data first. The process will transform an arbitrarily deep dictionary tree into a fixed depth dictionary that maps the keys of the sources with the measurements they performed.
To accomplish this, the flattening process will:
keysjoiner
character) until a leaf
value that has a datatype supported by InfluxDB (string, float, integer,
boolean or None) is found.So, for example, consider the data collected by the Cobertura
source:
{
'coverage': {
'files': {
'__init__.py': {
'line_rate': 1.0,
'total_misses': 0,
'total_statements': 10,
},
'__main__.py': {
'line_rate': 0.5,
'total_misses': 5,
'total_statements': 10,
},
# ...
},
'total': {
'line_rate': 0.75,
'total_misses': 5,
'total_statements': 20,
}
}
}
The above structure will be transformed into:
{
'coverage': {
'files.__init__:py.line_rate': 1.0,
'files.__init__:py.total_misses': 0,
'files.__init__:py.total_statements': 10,
'files.__main__:py.line_rate': 0.5,
'files.__main__:py.total_misses': 5,
'files.__main__:py.total_statements': 10,
# ...
'total.line_rate': 0.75,
'total.total_misses': 5,
'total.total_statements': 20,
}
}
Important
Please note the .
in the file name was changed to :
before joining
the keys. This replacement and joining is controlled by the
keysjoinerreplace
and keysjoiner
options.
Also note that the leaf keys must map to a value that can be used as field value in InfluxDB (string, float, integer, boolean or None), if not, the flattening will fail.
Or in case of lists:
{
'key1': ['a', 'b', 'c'],
'key2': [
{'a': 1},
{'b': 2},
{'c': 3},
]
}
The flattening results in:
{
'key.0' : 'a',
'key.1' : 'b',
'key.2' : 'c',
'key.0.a' : 1,
'key.1.b' : 2,
'key.2.c' : 3,
}
You may run this sink with DEBUG
verbosity (-vvv
) to analyze all
transformations performed.
Dependencies:
pip3 install flowbber[influxdb]
Usage:
[[sinks]]
type = "influxdb"
id = "..."
[sinks.config]
uri = "influxdb://localhost:8086/"
database = "flowbber"
key = "timestamp.iso8601"
{
"sinks": [
{
"type": "influxdb",
"id": "...",
"config": {
"uri": "influxdb://localhost:8086/",
"database": "flowbber",
"key": "timestamp.iso8601"
}
}
]
}
URI to connect to InfluxDB.
If this is set, options host
, port
, username
, password
and
ssl
will be ignored.
URI is in the form:
influxdb://username:password@localhost:8086/
Supported schemes are influxdb
, https+influxdb
and udp+influxdb
.
Check function from_dsn for more information.
Default: None
Optional: True
Schema:
{
'type': 'string',
'empty': False,
'nullable': True,
}
Secret: True
Hostname or IP to connect to InfluxDB.
Default: localhost
Optional: True
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Path of InfluxDB on the server to connect to.
Default: ''
Optional: True
Schema:
{
'type': 'string',
}
Secret: False
Port to connect to InfluxDB.
Default: 8086
Optional: True
Schema:
{
'type': 'integer',
'min': 0,
'max': 65535,
}
Secret: False
User to connect to InfluxDB.
Default: root
Optional: True
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Password of the user.
Default: root
Optional: True
Schema:
{
'type': 'string',
}
Secret: True
Use https instead of http to connect to InfluxDB.
Default: False
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
Verify SSL certificates for HTTPS requests.
Default: False
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
Database name to connect to.
Default: N/A
Optional: False
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Path to a value in the collected data that will be used as key timestamp when submitting the flattened data to the database.
Specify the path to the value by joining keys path with a .
(dot).
For example given the following structure:
{
'my_key': {
'a_sub_key': {
'yet_another': '2017-08-19T01:26:35.683529'
}
}
}
The corresponding path to use the value 2017-08-19T01:26:35.683529
as
timestamp is my_key.a_sub_key.yet_another
.
This option is nullable, and if null is provided (the default), the timestamp
will be determined when submitting the data by calling Python’s
datetime.now().isoformat()
.
In order to have a single timestamp for your pipeline you can include the Timestamp source and use a configuration similar to the following:
[[sources]]
type = "timestamp"
id = "timestamp"
[sources.config]
iso8601 = true
[[sinks]]
type = "influxdb"
id = "..."
[sinks.config]
uri = "..."
database = "..."
key = "timestamp.iso8601"
Default: None
Optional: True
Schema:
{
'type': 'string',
'empty': False,
'nullable': True,
}
Secret: False
Character used to join the keys when flattening the collected data (see data flattening process above).
Default: .
Optional: True
Schema:
{
'type': 'string',
}
Secret: False
Character used to replace occurrences of the keysjoiner
character in the
keys of the collected data previous to flatten it.
This option is nullable, and if null is provided, no character replacement to the keys will be done.
Default: :
Optional: True
Schema:
{
'type': 'string',
'nullable': True,
}
Secret: False
InfluxDBSink
:
Common sink base class that adds several inclusion and exclusionflowbber.plugins.sinks.influxdb.
InfluxDBSink
(index, type_, id_, optional=False, timeout=None, config=None)¶Inheritance