flowbber.plugins.sinks.influxdb

InfluxDB

This sink writes all collected data to a InfluxDB time series database.

In order to be able to track in time the complex data collected by the sources this sink requires to “flatten” the data first. The process will transform an arbitrarily deep dictionary tree into a fixed depth dictionary that maps the keys of the sources with the measurements they performed.

To accomplish this, the flattening process will:

  • Join all dictionaries keys (using the keysjoiner character) until a leaf value that has a datatype supported by InfluxDB (string, float, integer, boolean or None) is found.
  • Lists are converted to a dictionary that maps the index of the element with the element previous to the flattening.

So, for example, consider the data collected by the Cobertura source:

{
    'coverage': {
        'files': {
            '__init__.py': {
                'line_rate': 1.0,
                'total_misses': 0,
                'total_statements': 10,
            },
            '__main__.py': {
                'line_rate': 0.5,
                'total_misses': 5,
                'total_statements': 10,
            },
            # ...
        },
        'total': {
            'line_rate': 0.75,
            'total_misses': 5,
            'total_statements': 20,
        }
    }
}

The above structure will be transformed into:

{
    'coverage': {
        'files.__init__:py.line_rate': 1.0,
        'files.__init__:py.total_misses': 0,
        'files.__init__:py.total_statements': 10,
        'files.__main__:py.line_rate': 0.5,
        'files.__main__:py.total_misses': 5,
        'files.__main__:py.total_statements': 10,
        # ...
        'total.line_rate': 0.75,
        'total.total_misses': 5,
        'total.total_statements': 20,
    }
}

Important

Please note the . in the file name was changed to : before joining the keys. This replacement and joining is controlled by the keysjoinerreplace and keysjoiner options.

Also note that the leaf keys must map to a value that can be used as field value in InfluxDB (string, float, integer, boolean or None), if not, the flattening will fail.

Or in case of lists:

{
    'key1': ['a', 'b', 'c'],
    'key2': [
        {'a': 1},
        {'b': 2},
        {'c': 3},
    ]
}

The flattening results in:

{
    'key.0' : 'a',
    'key.1' : 'b',
    'key.2' : 'c',
    'key.0.a' : 1,
    'key.1.b' : 2,
    'key.2.c' : 3,
}

You may run this sink with DEBUG verbosity (-vvv) to analyze all transformations performed.

Dependencies:

pip3 install flowbber[influxdb]

Usage:

[[sinks]]
type = "influxdb"
id = "..."

    [sinks.config]
    uri = "influxdb://localhost:8086/"
    database = "flowbber"
    key = "timestamp.iso8601"
{
    "sinks": [
        {
            "type": "influxdb",
            "id": "...",
            "config": {
                "uri": "influxdb://localhost:8086/",
                "database": "flowbber",
                "key": "timestamp.iso8601"
            }
        }
    ]
}

uri

URI to connect to InfluxDB.

If this is set, options host, port, username, password and ssl will be ignored.

URI is in the form:

influxdb://username:password@localhost:8086/

Supported schemes are influxdb, https+influxdb and udp+influxdb.

Check function from_dsn for more information.

  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
        'nullable': True,
    }
    
  • Secret: True

host

Hostname or IP to connect to InfluxDB.

  • Default: localhost

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

path

Path of InfluxDB on the server to connect to.

  • Default: ''

  • Optional: True

  • Schema:

    {
        'type': 'string',
    }
    
  • Secret: False

port

Port to connect to InfluxDB.

  • Default: 8086

  • Optional: True

  • Schema:

    {
        'type': 'integer',
        'min': 0,
        'max': 65535,
    }
    
  • Secret: False

username

User to connect to InfluxDB.

  • Default: root

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

password

Password of the user.

  • Default: root

  • Optional: True

  • Schema:

    {
        'type': 'string',
    }
    
  • Secret: True

ssl

Use https instead of http to connect to InfluxDB.

  • Default: False

  • Optional: True

  • Schema:

    {
        'type': 'boolean',
    }
    
  • Secret: False

verify_ssl

Verify SSL certificates for HTTPS requests.

  • Default: False

  • Optional: True

  • Schema:

    {
        'type': 'boolean',
    }
    
  • Secret: False

database

Database name to connect to.

  • Default: N/A

  • Optional: False

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

key

Path to a value in the collected data that will be used as key timestamp when submitting the flattened data to the database.

Specify the path to the value by joining keys path with a . (dot).

For example given the following structure:

{
    'my_key': {
        'a_sub_key': {
            'yet_another': '2017-08-19T01:26:35.683529'
        }
    }
}

The corresponding path to use the value 2017-08-19T01:26:35.683529 as timestamp is my_key.a_sub_key.yet_another.

This option is nullable, and if null is provided (the default), the timestamp will be determined when submitting the data by calling Python’s datetime.now().isoformat().

In order to have a single timestamp for your pipeline you can include the Timestamp source and use a configuration similar to the following:

[[sources]]
type = "timestamp"
id = "timestamp"

    [sources.config]
    iso8601 = true

[[sinks]]
type = "influxdb"
id = "..."

    [sinks.config]
    uri = "..."
    database = "..."
    key = "timestamp.iso8601"
  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
        'nullable': True,
    }
    
  • Secret: False

keysjoiner

Character used to join the keys when flattening the collected data (see data flattening process above).

  • Default: .

  • Optional: True

  • Schema:

    {
        'type': 'string',
    }
    
  • Secret: False

keysjoinerreplace

Character used to replace occurrences of the keysjoiner character in the keys of the collected data previous to flatten it.

This option is nullable, and if null is provided, no character replacement to the keys will be done.

  • Default: :

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'nullable': True,
    }
    
  • Secret: False

Classes

  • InfluxDBSink: Common sink base class that adds a include and exclude configuration
class flowbber.plugins.sinks.influxdb.InfluxDBSink(index, type_, id_, optional=False, timeout=None, config=None)

Inheritance

Inheritance diagram of InfluxDBSink