Python读写时序数据库InfluxDb

时间:2025-04-12 19:50:13

InfluxDb 简介

InfluxDb 是高性能的时间序列数据库,能够存取高吞吐量时间序列数据,每秒可达几百万数据点。数据点(或时间序列数据)可能是CPU运行指标、不同服务器的日志信息、传感器数据、股票市场数据等。

  • InfluxDb 使用Go语言开发,无需外部依赖。

  • InfluxDb 提供了类SQL接口查询数据。

  • 自动压缩数据和降低采样率有助于最大限度地减少存储空间。

  • 通过连续查询和数据保留策略,可以让数据库中的旧数据过期。

InfluxDb Schema

  • InfluxDb 存储数据为数据点。
  • 每个数据点以行协议格式存储在数据库中
  • 在InfluxDb的行协议中,没行或记录称为时间序列点,与关系型数据库的行概念对应。
  • 每行以度量名称开头,度量与关系型数据库的表对应
  • 接着度量名称后面的是标签和字段。标签是索引,字段不是索引。
  • 每种度量都存在数据库中,数据库通过InfluxDb控制台创建。
  • InfluxDb的schema是灵活的,字段和标签直接通过数据一起添加,意味着数据点之间字段或标签可能不同,随着数据文件增长,新的字段可能被加在时间序列的数据点上(记录或行)。

行协议语法

weather,location=us-midwest temperature=82 1465839830100400200
  |    -------------------- --------------  |
  |             |             |             |
  |             |             |             |
+-----------+--------+-+---------+-+---------+
|measurement|,tag_set| |field_set| |timestamp|
+-----------+--------+-+---------+-+---------+

特别需要说明的是:使用空白字符分割标签集、字段集和时间戳。多个标签或字段使用逗号分割。

下面示例度量名称为weather,包括两个标签location和season

weather,location=us-midwest,season=summer temperature=82 1465839830100400200

再看一个示例,除了标签,还包括两个字段:temperature和bug_concentration

weather,location=us-midwest temperature=82,bug_concentration=98 1465839830100400200

最后就是时间戳是可选的,缺省则使用服务端的时间戳。

weather,location=us-midwest temperature=82

Python读写InfluxDb

使用Python访问InfluxDb,需要使用Python客户端库:influxdb-python。安装命令为:

$ pip install influxdb

连接数据库需要提供下面信息:

  • 数据库的IP地址或主机名称、端口号
  • 用户名和密码
  • 数据库名称

主要使用InfluxDBClient对象的query和write_points方法读写数据。

建立连接

连接本地数据库:

client = InfluxDBClient(host='localhost', port=8086)

连接远程数据库:

client = InfluxDBClient(host='', port=8086, username='myuser', password='mypass' ssl=True, verify_ssl=True)

创建数据库:

client.create_database('pyexample')

# 查看数据库
client.get_list_database()
# [{'name': 'telegraf'}, {'name': '_internal'}, {'name': 'pyexample'}]
# telegraf 、_internal 是内置数据库

# 切换数据库
client.switch_database('pyexample')

插入数据

# 定义待插入的数据
json_body = [
    {
        "measurement": "brushEvents",
        "tags": {
            "user": "Carol",
            "brushId": "6c89f539-71c6-490d-a28d-6c5d84c0ee2f"
        },
        "time": "2018-03-28T8:01:00Z",
        "fields": {
            "duration": 127
        }
    },
    {
        "measurement": "brushEvents",
        "tags": {
            "user": "Carol",
            "brushId": "6c89f539-71c6-490d-a28d-6c5d84c0ee2f"
        },
        "time": "2018-03-29T8:04:00Z",
        "fields": {
            "duration": 132
        }
    },
    {
        "measurement": "brushEvents",
        "tags": {
            "user": "Carol",
            "brushId": "6c89f539-71c6-490d-a28d-6c5d84c0ee2f"
        },
        "time": "2018-03-30T8:02:00Z",
        "fields": {
            "duration": 129
        }
    }
]

# 执行插入操作,执行成果返回True
client.write_points(json_body)

查询数据

前面已经插入了几条数据,下面我们演示查询。

records = ('SELECT duration FROM brushEvents WHERE time > now() - 4d GROUP BY user')
print(records)