flowbber.plugins.sinks.mongodb
¶This sink writes all collected data to a MongoDB NoSQL database.
Important
This class inherits several inclusion and exclusion configuration options for filtering data before using it. See FilterSink Options for more information.
The collected data is mostly unaltered from its original form, except that
MongoDB document keys cannot contain .
(dot) characters and cannot start
with a $
(dollar) sign.
This sink will transform the keys in the collected data so its safe to submit
it to MongoDB, for this the options dotreplace
and dollarreplace
are
used to perform this transformation.
Dependencies:
pip3 install flowbber[mongodb]
Usage:
[[sinks]]
type = "mongodb"
id = "..."
[sinks.config]
uri = "mongodb://localhost:27017/"
database = "flowbber"
collection = "pipeline1data"
key = "timestamp.epoch"
overwrite = true
{
"sinks": [
{
"type": "mongodb",
"id": "...",
"config": {
"uri": "mongodb://localhost:27017/",
"database": "flowbber",
"collection": "pipeline1data",
"key": "timestamp.epoch"
}
}
]
}
URI to connect to MongoDB.
If this is set, options host
, port
, username
and password
will
be ignored.
URI is in the form:
mongodb://username:password@host:port/
Except that for username and password reserved characters like :
, /
,
+
and @
must be percent encoded following RFC 2396:
from urllib.parse import quote_plus
uri = 'mongodb://{}:{}@{}:{}/'.format(
quote_plus(user),
quote_plus(password),
host, port,
)
host
can be a hostname or IP address. Unix domain sockets are also
supported but must be percent encoded:
uri = 'mongodb://{}:{}@{}/'.format(
quote_plus(user),
quote_plus(password),
quote_plus(socket_path)
)
For more information check MongoClient API and MongoDocs.
Default: None
Optional: True
Schema:
{
'type': 'string',
'empty': False,
'nullable': True,
}
Secret: True
Hostname, IP address or Unix domain socket path to connect to MongoDB.
Default: localhost
Optional: True
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Port to connect to MongoDB.
Default: 27017
Optional: True
Schema:
{
'type': 'integer',
'min': 0,
'max': 65535,
}
Secret: False
User to connect to MongoDB.
Default: None
Optional: True
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Password of the user.
Default: None
Optional: True
Schema:
{
'type': 'string',
}
Secret: True
Create the connection to the server using SSL.
Default: False
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
Database to connect to.
Default: N/A
Optional: False
Schema:
{
'type': 'string',
'empty': False,
}
Secret: False
Collection to write to. This option defines two modes of operation depending if
it is set to None
or any other string value:
This option is nullable, if null is provided (the default), the collection will be determined for each entry in the collected data bundle using its key. For example, given the following pipeline:
[[sources]]
type = "somesource"
id = "first_id"
[[sources]]
type = "somesource"
id = "second_id"
[[sinks]]
type = "mongodb"
id = "mongodb"
[sinks.config]
uri = "..."
database = "mydatabase"
Collects the following data:
{
'first_id': {
'abc': 'def',
'xyz': '123',
},
'second_id': {
'123': 'abc',
'qwe': 'rty',
},
}
Will be stored in MongoDB as follows:
In collection first_id
:
{
'abc': 'def',
'xyz': '123',
}
In collection second_id
:
{
'123': 'abc',
'qwe': 'rty',
}
Otherwise, the whole data bundle will be stored to the collection specified.
In both cases the key
option will be honored.
Default: None
Optional: True
Schema:
{
'type': 'string',
'empty': False,
'nullable': True,
}
Secret: False
Path to a value in the collected data that will be used as object id when submitting the safe data to the database.
Specify the path to the value by joining keys path with a .
(dot).
For example given the following structure:
{
'my_key': {
'a_sub_key': {
'yet_another': 123123123
}
}
}
The corresponding path to use the value 123123123
as timestamp is
my_key.a_sub_key.yet_another
.
This option is nullable, and if null is provided (the default), the object id will be generated by MongoDB itself.
In order to have a single timestamp for your pipeline you can include the Timestamp source and use a configuration similar to the following:
[[sources]]
type = "timestamp"
id = "timestamp"
[sources.config]
epoch = true
[[sinks]]
type = "mongodb"
id = "..."
[sinks.config]
uri = "..."
database = "..."
collection = "..."
key = "timestamp.epoch"
Default: None
Optional: True
Schema:
{
'type': 'string',
'empty': False,
'nullable': True,
}
Secret: False
Indicates if mongo should allow an entry overwrite.
Default: False
Optional: True
Schema:
{
'type': 'boolean',
}
Secret: False
Character used to replace any occurrence of a .
(dot) character in the keys
of the collected data (see MongoDB data safety above).
Default: :
Optional: True
Schema:
{
'type': 'string',
}
Secret: False
Character used to replace any occurrence of a $
(dollar) character at the
beginning of any key in the collected data (see MongoDB data safety above).
Default: &
Optional: True
Schema:
{
'type': 'string',
}
Secret: False
MongoDBSink
:
Common sink base class that adds several inclusion and exclusionflowbber.plugins.sinks.mongodb.
MongoDBSink
(index, type_, id_, optional=False, timeout=None, config=None)¶Inheritance