Skip to content

Series Helper

influxdb.helper

Helper class for InfluxDB.

Classes

SeriesHelper

Bases: object

Subclass this helper eases writing data points in bulk.

All data points are immutable, ensuring they do not get overwritten. Each subclass can write to its own database. The time series names can also be based on one or more defined fields. The field "time" can be specified when creating a point, and may be any of the time types supported by the client (i.e. str, datetime, int). If the time is not specified, the current system time (utc) will be used.

Annotated example::

class MySeriesHelper(SeriesHelper):
    class Meta:
        # Meta class stores time series helper configuration.
        series_name = 'events.stats.{server_name}'
        # Series name must be a string, curly brackets for dynamic use.
        fields = ['time', 'server_name']
        # Defines all the fields in this time series.
        ### Following attributes are optional. ###
        client = TestSeriesHelper.client
        # Client should be an instance of InfluxDBClient.
        :warning: Only used if autocommit is True.
        bulk_size = 5
        # Defines the number of data points to write simultaneously.
        # Only applicable if autocommit is True.
        autocommit = True
        # If True and no bulk_size, then will set bulk_size to 1.
        retention_policy = 'your_retention_policy'
        # Specify the retention policy for the data points
        time_precision = "h"|"m"|s"|"ms"|"u"|"ns"
        # Default is ns (nanoseconds)
        # Setting time precision while writing point
        # You should also make sure time is set in the given precision
Source code in influxdb/helper.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class SeriesHelper(object):
    """Subclass this helper eases writing data points in bulk.

    All data points are immutable, ensuring they do not get overwritten.
    Each subclass can write to its own database.
    The time series names can also be based on one or more defined fields.
    The field "time" can be specified when creating a point, and may be any of
    the time types supported by the client (i.e. str, datetime, int).
    If the time is not specified, the current system time (utc) will be used.

    Annotated example::

        class MySeriesHelper(SeriesHelper):
            class Meta:
                # Meta class stores time series helper configuration.
                series_name = 'events.stats.{server_name}'
                # Series name must be a string, curly brackets for dynamic use.
                fields = ['time', 'server_name']
                # Defines all the fields in this time series.
                ### Following attributes are optional. ###
                client = TestSeriesHelper.client
                # Client should be an instance of InfluxDBClient.
                :warning: Only used if autocommit is True.
                bulk_size = 5
                # Defines the number of data points to write simultaneously.
                # Only applicable if autocommit is True.
                autocommit = True
                # If True and no bulk_size, then will set bulk_size to 1.
                retention_policy = 'your_retention_policy'
                # Specify the retention policy for the data points
                time_precision = "h"|"m"|s"|"ms"|"u"|"ns"
                # Default is ns (nanoseconds)
                # Setting time precision while writing point
                # You should also make sure time is set in the given precision

    """

    __initialized__ = False

    def __new__(cls, *args, **kwargs):  # noqa: C901
        """Initialize class attributes for subsequent constructor calls.

        Note:
            *args and **kwargs are not explicitly used in this function,
            but needed for Python 2 compatibility.

        """
        if not cls.__initialized__:
            cls.__initialized__ = True
            try:
                _meta = cls.Meta
            except AttributeError as e:
                raise AttributeError("Missing Meta class in {0}.".format(cls.__name__)) from e

            for attr in ["series_name", "fields", "tags"]:
                try:
                    setattr(cls, "_" + attr, getattr(_meta, attr))
                except AttributeError as e:
                    raise AttributeError("Missing {0} in {1} Meta class.".format(attr, cls.__name__)) from e

            cls._autocommit = getattr(_meta, "autocommit", False)
            cls._time_precision = getattr(_meta, "time_precision", None)

            allowed_time_precisions = ["h", "m", "s", "ms", "u", "ns", None]
            if cls._time_precision not in allowed_time_precisions:
                raise AttributeError(
                    "In {}, time_precision is set, but invalid use any of {}.".format(
                        cls.__name__, ",".join(allowed_time_precisions)
                    )
                )

            cls._retention_policy = getattr(_meta, "retention_policy", None)

            cls._client = getattr(_meta, "client", None)
            if cls._autocommit and not cls._client:
                raise AttributeError("In {0}, autocommit is set to True, but no client is set.".format(cls.__name__))

            try:
                cls._bulk_size = _meta.bulk_size
                if cls._bulk_size < 1 and cls._autocommit:
                    warn("Definition of bulk_size in {0} forced to 1, was less than 1.".format(cls.__name__), stacklevel=2)
                    cls._bulk_size = 1
            except AttributeError:
                cls._bulk_size = -1
            else:
                if not cls._autocommit:
                    warn(
                        "Definition of bulk_size in {0} has no affect because autocommit is false.".format(cls.__name__),
                        stacklevel=2,
                    )

            cls._datapoints = defaultdict(list)

            if "time" in cls._fields:
                cls._fields.remove("time")
            cls._type = namedtuple(cls.__name__, ["time"] + cls._tags + cls._fields)
            cls._type.__new__.__defaults__ = (None,) * len(cls._fields)

        return super(SeriesHelper, cls).__new__(cls)

    def __init__(self, **kw):
        """Call to constructor creates a new data point.

        Note:
            Data points written when bulk_size is reached per Helper.

        Warning:
            Data points are immutable (namedtuples).

        """
        cls = self.__class__
        timestamp = kw.pop("time", self._current_timestamp())
        tags = set(cls._tags)
        fields = set(cls._fields)
        keys = set(kw.keys())

        # all tags should be passed, and keys - tags should be a subset of keys
        if not (tags <= keys):
            raise NameError("Expected arguments to contain all tags {0}, instead got {1}.".format(cls._tags, kw.keys()))
        if not (keys - tags <= fields):
            raise NameError("Got arguments not in tags or fields: {0}".format(keys - tags - fields))

        cls._datapoints[cls._series_name.format(**kw)].append(cls._type(time=timestamp, **kw))

        if cls._autocommit and sum(len(series) for series in cls._datapoints.values()) >= cls._bulk_size:
            cls.commit()

    @classmethod
    def commit(cls, client=None):
        """Commit everything from datapoints via the client.

        Args:
            client (InfluxDBClient): InfluxDBClient instance for writing points to InfluxDB.
                Any provided client will supersede the class client.

        Returns:
            bool: result of client.write_points.

        """
        if not client:
            client = cls._client

        rtn = client.write_points(
            cls._json_body_(),
            time_precision=cls._time_precision,
            retention_policy=cls._retention_policy,
        )
        # will be None if not set and will default to ns
        cls._reset_()
        return rtn

    @classmethod
    def _json_body_(cls):
        """Return the JSON body of given datapoints.

        Returns:
            list: JSON body of these datapoints.

        """
        json = []
        if not cls.__initialized__:
            cls._reset_()
        for series_name, data in iter(cls._datapoints.items()):
            for point in data:
                json_point = {
                    "measurement": series_name,
                    "fields": {},
                    "tags": {},
                    "time": point.time,
                }

                for field in cls._fields:
                    value = getattr(point, field)
                    if value is not None:
                        json_point["fields"][field] = value

                for tag in cls._tags:
                    json_point["tags"][tag] = getattr(point, tag)

                json.append(json_point)
        return json

    @classmethod
    def _reset_(cls):
        """Reset data storage."""
        cls._datapoints = defaultdict(list)

    @staticmethod
    def _current_timestamp():
        return datetime.now(timezone.utc)
Functions
__new__(*args, **kwargs)

Initialize class attributes for subsequent constructor calls.

Note

args and *kwargs are not explicitly used in this function, but needed for Python 2 compatibility.

Source code in influxdb/helper.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def __new__(cls, *args, **kwargs):  # noqa: C901
    """Initialize class attributes for subsequent constructor calls.

    Note:
        *args and **kwargs are not explicitly used in this function,
        but needed for Python 2 compatibility.

    """
    if not cls.__initialized__:
        cls.__initialized__ = True
        try:
            _meta = cls.Meta
        except AttributeError as e:
            raise AttributeError("Missing Meta class in {0}.".format(cls.__name__)) from e

        for attr in ["series_name", "fields", "tags"]:
            try:
                setattr(cls, "_" + attr, getattr(_meta, attr))
            except AttributeError as e:
                raise AttributeError("Missing {0} in {1} Meta class.".format(attr, cls.__name__)) from e

        cls._autocommit = getattr(_meta, "autocommit", False)
        cls._time_precision = getattr(_meta, "time_precision", None)

        allowed_time_precisions = ["h", "m", "s", "ms", "u", "ns", None]
        if cls._time_precision not in allowed_time_precisions:
            raise AttributeError(
                "In {}, time_precision is set, but invalid use any of {}.".format(
                    cls.__name__, ",".join(allowed_time_precisions)
                )
            )

        cls._retention_policy = getattr(_meta, "retention_policy", None)

        cls._client = getattr(_meta, "client", None)
        if cls._autocommit and not cls._client:
            raise AttributeError("In {0}, autocommit is set to True, but no client is set.".format(cls.__name__))

        try:
            cls._bulk_size = _meta.bulk_size
            if cls._bulk_size < 1 and cls._autocommit:
                warn("Definition of bulk_size in {0} forced to 1, was less than 1.".format(cls.__name__), stacklevel=2)
                cls._bulk_size = 1
        except AttributeError:
            cls._bulk_size = -1
        else:
            if not cls._autocommit:
                warn(
                    "Definition of bulk_size in {0} has no affect because autocommit is false.".format(cls.__name__),
                    stacklevel=2,
                )

        cls._datapoints = defaultdict(list)

        if "time" in cls._fields:
            cls._fields.remove("time")
        cls._type = namedtuple(cls.__name__, ["time"] + cls._tags + cls._fields)
        cls._type.__new__.__defaults__ = (None,) * len(cls._fields)

    return super(SeriesHelper, cls).__new__(cls)
__init__(**kw)

Call to constructor creates a new data point.

Note

Data points written when bulk_size is reached per Helper.

Warning

Data points are immutable (namedtuples).

Source code in influxdb/helper.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def __init__(self, **kw):
    """Call to constructor creates a new data point.

    Note:
        Data points written when bulk_size is reached per Helper.

    Warning:
        Data points are immutable (namedtuples).

    """
    cls = self.__class__
    timestamp = kw.pop("time", self._current_timestamp())
    tags = set(cls._tags)
    fields = set(cls._fields)
    keys = set(kw.keys())

    # all tags should be passed, and keys - tags should be a subset of keys
    if not (tags <= keys):
        raise NameError("Expected arguments to contain all tags {0}, instead got {1}.".format(cls._tags, kw.keys()))
    if not (keys - tags <= fields):
        raise NameError("Got arguments not in tags or fields: {0}".format(keys - tags - fields))

    cls._datapoints[cls._series_name.format(**kw)].append(cls._type(time=timestamp, **kw))

    if cls._autocommit and sum(len(series) for series in cls._datapoints.values()) >= cls._bulk_size:
        cls.commit()
commit(client=None) classmethod

Commit everything from datapoints via the client.

Parameters:

Name Type Description Default
client InfluxDBClient

InfluxDBClient instance for writing points to InfluxDB. Any provided client will supersede the class client.

None

Returns:

Name Type Description
bool

result of client.write_points.

Source code in influxdb/helper.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@classmethod
def commit(cls, client=None):
    """Commit everything from datapoints via the client.

    Args:
        client (InfluxDBClient): InfluxDBClient instance for writing points to InfluxDB.
            Any provided client will supersede the class client.

    Returns:
        bool: result of client.write_points.

    """
    if not client:
        client = cls._client

    rtn = client.write_points(
        cls._json_body_(),
        time_precision=cls._time_precision,
        retention_policy=cls._retention_policy,
    )
    # will be None if not set and will default to ns
    cls._reset_()
    return rtn