flowbber.plugins.sinks.mongodb

MongoDB

This sink writes all collected data to a MongoDB NoSQL database.

Important

This class inherits several inclusion and exclusion configuration options for filtering data before using it. See FilterSink Options for more information.

The collected data is mostly unaltered from its original form, except that MongoDB document keys cannot contain . (dot) characters and cannot start with a $ (dollar) sign.

This sink will transform the keys in the collected data so its safe to submit it to MongoDB, for this the options dotreplace and dollarreplace are used to perform this transformation.

Dependencies:

pip3 install flowbber[mongodb]

Usage:

[[sinks]]
type = "mongodb"
id = "..."

    [sinks.config]
    uri = "mongodb://localhost:27017/"
    database = "flowbber"
    collection = "pipeline1data"
    key = "timestamp.epoch"
    overwrite = true
{
    "sinks": [
        {
            "type": "mongodb",
            "id": "...",
            "config": {
                "uri": "mongodb://localhost:27017/",
                "database": "flowbber",
                "collection": "pipeline1data",
                "key": "timestamp.epoch"
            }
        }
    ]
}

uri

URI to connect to MongoDB.

If this is set, options host, port, username and password will be ignored.

URI is in the form:

mongodb://username:password@host:port/

Except that for username and password reserved characters like :, /, + and @ must be percent encoded following RFC 2396:

from urllib.parse import quote_plus

uri = 'mongodb://{}:{}@{}:{}/'.format(
    quote_plus(user),
    quote_plus(password),
    host, port,
)

host can be a hostname or IP address. Unix domain sockets are also supported but must be percent encoded:

uri = 'mongodb://{}:{}@{}/'.format(
    quote_plus(user),
    quote_plus(password),
    quote_plus(socket_path)
)

For more information check MongoClient API and MongoDocs.

  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
        'nullable': True,
    }
    
  • Secret: True

host

Hostname, IP address or Unix domain socket path to connect to MongoDB.

  • Default: localhost

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

port

Port to connect to MongoDB.

  • Default: 27017

  • Optional: True

  • Schema:

    {
        'type': 'integer',
        'min': 0,
        'max': 65535,
    }
    
  • Secret: False

username

User to connect to MongoDB.

  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

password

Password of the user.

  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'string',
    }
    
  • Secret: True

ssl

Create the connection to the server using SSL.

  • Default: False

  • Optional: True

  • Schema:

    {
        'type': 'boolean',
    }
    
  • Secret: False

database

Database to connect to.

  • Default: N/A

  • Optional: False

  • Schema:

    {
        'type': 'string',
        'empty': False,
    }
    
  • Secret: False

collection

Collection to write to. This option defines two modes of operation depending if it is set to None or any other string value:

  1. This option is nullable, if null is provided (the default), the collection will be determined for each entry in the collected data bundle using its key. For example, given the following pipeline:

    [[sources]]
    type = "somesource"
    id = "first_id"
    
    [[sources]]
    type = "somesource"
    id = "second_id"
    
    [[sinks]]
    type = "mongodb"
    id = "mongodb"
    
        [sinks.config]
        uri = "..."
        database = "mydatabase"
    

    Collects the following data:

    {
        'first_id': {
            'abc': 'def',
            'xyz': '123',
        },
        'second_id': {
            '123': 'abc',
            'qwe': 'rty',
        },
    }
    

    Will be stored in MongoDB as follows:

    1. In collection first_id:

      {
          'abc': 'def',
          'xyz': '123',
      }
      
    2. In collection second_id:

      {
          '123': 'abc',
          'qwe': 'rty',
      }
      
  2. Otherwise, the whole data bundle will be stored to the collection specified.

In both cases the key option will be honored.

  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
        'nullable': True,
    }
    
  • Secret: False

key

Path to a value in the collected data that will be used as object id when submitting the safe data to the database.

Specify the path to the value by joining keys path with a . (dot).

For example given the following structure:

{
    'my_key': {
        'a_sub_key': {
            'yet_another': 123123123
        }
    }
}

The corresponding path to use the value 123123123 as timestamp is my_key.a_sub_key.yet_another.

This option is nullable, and if null is provided (the default), the object id will be generated by MongoDB itself.

In order to have a single timestamp for your pipeline you can include the Timestamp source and use a configuration similar to the following:

[[sources]]
type = "timestamp"
id = "timestamp"

    [sources.config]
    epoch = true

[[sinks]]
type = "mongodb"
id = "..."

    [sinks.config]
    uri = "..."
    database = "..."
    collection = "..."
    key = "timestamp.epoch"
  • Default: None

  • Optional: True

  • Schema:

    {
        'type': 'string',
        'empty': False,
        'nullable': True,
    }
    
  • Secret: False

overwrite

Indicates if mongo should allow an entry overwrite.

  • Default: False

  • Optional: True

  • Schema:

    {
        'type': 'boolean',
    }
    
  • Secret: False

dotreplace

Character used to replace any occurrence of a . (dot) character in the keys of the collected data (see MongoDB data safety above).

  • Default: :

  • Optional: True

  • Schema:

    {
        'type': 'string',
    }
    
  • Secret: False

dollarreplace

Character used to replace any occurrence of a $ (dollar) character at the beginning of any key in the collected data (see MongoDB data safety above).

  • Default: &

  • Optional: True

  • Schema:

    {
        'type': 'string',
    }
    
  • Secret: False

Classes

  • MongoDBSink: Common sink base class that adds several inclusion and exclusion
class flowbber.plugins.sinks.mongodb.MongoDBSink(index, type_, id_, optional=False, timeout=None, config=None)

Inheritance

Inheritance diagram of MongoDBSink