influxdb InfluxDB是一个用于存储和分析时间序列数据的开源数据库。也是使用最多的一个时序数据库。
什么是时序数据库?时序数据库是近几年一个新的概念,与传统的Mysql关系型数据库相比,它的最大的特点是:数据按照时间顺序存储。
举例来说,日志数据,是以时间顺序存储的,所以用时序数据库存储是一种很好的选择。使用Mysql在存储的过程中,不是对这种基于时间的数据进行优化的,所以,在查询、插入上有一些瓶颈。
InfluxDB 的特点 InfluxDB有很多特点,如下:
对常见关系型数据库(MySQL)的基础概念对比
概念
MySQL
InfluxDB
数据库(同)
database
database
表(不同)
table
measurement(测量; 度量)
列(不同)
column
tag(带索引的,非必须)、field(不带索引)、timestemp(唯一主键)
Influxdb相关概念 1 2 3 4 database:数据库; measurement:数据库中的表; points:表里面的一行数据。 influxDB中独有的一些概念:Point由时间戳(time)、数据(field)和标签(tags)组成。
point Point相当于传统数据库里的一行数据,如下表所示:
Point属性
传统数据库中的概念
timestamp(时间戳)
每个数据记录时间,是数据库中的主索引(会自动生成)。 每行记录都有一列time,主索引,记录时间戳,单位纳秒,时区UTC(东八区减8小时)
fields(字段、数据)
各种记录值(没有索引的属性)也就是记录的值 :温度, 湿度。 普通列,key-value结构,value数据类型支持型(float、integer、string、boolean)
tags(标签)
各种有索引的属性 :地区,海拔 索引列,key-value结构,value数据类型只支持string
1 首先时间戳是必须的,查询的主要条件基本也是以时间作为单位(本身就是时序数据库),所以这里时间对应了mysql表的唯一主键。其次tag主要是索引,fields就是普通字段。
fields数据类型
类型
备注
float
influxdb的fields默认是float浮点型
integer
整型,insert语句如需写入field是整型,需在数值后面加个i
string
字符串,insert语句如需写入field是字符串,需英文双引号包含数值
boolean
布尔型,真可以用 t , T , true , True , TRUE表示;假可以用 f , F , false , False 或者 FALSE表示
Influxdb常用命令 常用sql 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 -- 查看所有的数据库 show databases; -- 使用特定的数据库 use database_name; -- 查看所有的measurement show measurements; -- 查询10条数据 select * from measurement_name limit 10; -- 数据中的时间字段默认显示的是一个纳秒时间戳,改成可读格式 precision rfc3339; -- 之后再查询,时间就是rfc3339标准格式 -- 或可以在连接数据库的时候,直接带该参数 influx -precision rfc3339 -- 查看一个measurement中所有的tag key show tag keys -- 查看一个measurement中所有的field key show field keys -- 查看一个measurement中所有的保存策略(可以有多个,一个标识为default) show retention policies;
登录 1 2 3 4 5 $influx -h $influx Connected to http://localhost:8086 version 1.8.0 InfluxDB shell version: 1.8.0 >
用户创建 1 2 3 4 5 6 > CREATE USER knight WITH PASSWORD 'knight' WITH ALL PRIVILEGES > show users user admin ---- ----- knight true >
权限管理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 # 创建用户 create user xxx with password 'pwd' # 重设密码 set password for xxx='newpwd' # 删除用户 drop user xxx # 删除库 drop database db_hw_ces # 给所有用户权限 GRANT ALL PRIVILEGES TO <username> # 给库的权限 GRANT [READ,WRITE,ALL] privileges ON <database_name> TO <username> GRANT all privileges ON db_hw_ces TO devops_rw ## GRANT READ ON tsdb_tbox_dev TO hwy_tbox_rw GRANT WRITE ON tsdb_tbox_dev TO hwy_tbox_rw # 权限回收 REVOKE ALL PRIVILEGES FROM <username> REVOKE [READ,WRITE,ALL] ON <database_name> FROM <username> # 查询权限 show grants for <username>
HTTP API 1 2 3 4 5 6 7 8 9 10 11 12 # 创建数据库 curl -i -XPOST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE test01" ## 写入数据 ### 写入单条数据 curl -i -XPOST http://localhost:8086/write?db=test01 --data-binary "cpu,host=me03 load=0.1,usage=0.33" ### 写入多条数据 curl -i -XPOST http://localhost:8086/write?db=test01 --data-binary "cpu,host=me03 load=0.1,usage=0.22 166" ## 查询数据 curl -G http://localhost:8086/query?db=test01 --data-urlencode "q=SELECT * FROM \"cpu\""
其他数据操作命令基本与mysql一致(sql)。
influxdb数据保留策略 InfluxDB 数据保留策略说明 InfluxDB的数据保留策略(RP)用来定义数据在InfluxDB中存放的时间,或者定义保存某个期间的数据。 一个数据库可以有多个保留策略, 但每个策略必须是独一无二的。
InfluxDB数据保留策略目的 InfluxDB本身不提供数据的删除操作, 因此用来控制数据量的方式就是定义数据保留策略。 因此定义数据保留策略的目的是让InfluxDB能够知道可以丢弃哪些数据, 节省数据存储空间,避免数据冗余的情况。
查看数据库保留策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 show retention policies on 数据库名 # 选择使用telegraf数据库 > use telegraf; Using database telegraf > > # 查询数据保留策略 > show retention policies on telegraf name duration shardGroupDuration replicaN default ---- -------- ------------------ -------- ------- autogen 0s 168h0m0s 1 true name 策略名称:默认autogen duration 持续时间: 0s 代表无限制 shardGroupDuration shardGroup数据存储时间:shardGroup是InfluxDB的一个基本存储结构, 大于这个时间的数据在查询效率上应该有所降低。 replicaN 副本个数:1 代表只有一个副本 default 是否默认策略:true 代表设置为该数据库的默认策略
默认是168h0m0s 的降低查询效率策略且永不删除。
新建数据保留策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 # 新建一个策略 CREATE RETENTION POLICY "策略名称" ON 数据库名 DURATION 时长 REPLICATION 副本个数; # 新建一个策略并且直接设置为默认策略 CREATE RETENTION POLICY "策略名称" ON 数据库名 DURATION 时长 REPLICATION 副本个数 DEFAULT; ———————————————— # 创建新的默认策略之前的策略 > show retention policies on telegraf name duration shardGroupDuration replicaN default ---- -------- ------------------ -------- ------- autogen 0s 168h0m0s 1 true > > # 创建新的默认策略role_01保留数据时长1小时 > CREATE RETENTION POLICY "role_01" ON telegraf DURATION 1h REPLICATION 1 DEFAULT; > # 查看策略的变化 > show retention policies on telegraf name duration shardGroupDuration replicaN default ---- -------- ------------------ -------- ------- autogen 0s 168h0m0s 1 false role_01 1h0m0s 1h0m0s 1 true ————————————————
修改数据保留策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 ALTER RETENTION POLICY "策略名称" ON "数据库名" DURATION 时长 ALTER RETENTION POLICY "策略名称" ON "数据库名" DURATION 时长 DEFAULT ———————————————— > show retention policies on telegraf name duration shardGroupDuration replicaN default ---- -------- ------------------ -------- ------- autogen 0s 168h0m0s 1 false role_01 1h0m0s 1h0m0s 1 true > > # 执行修改时长为2小时 > ALTER RETENTION POLICY "role_01" ON "telegraf" DURATION 2h > # 可以看到role_01的duration为2h > show retention policies on telegraf name duration shardGroupDuration replicaN default ---- -------- ------------------ -------- ------- autogen 0s 168h0m0s 1 false role_01 2h0m0s 1h0m0s 1 true ————————————————
删除数据保留策略 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 drop retention POLICY "策略名" ON "数据库名" ———————————————— # 查看当前的数据保留策略 > show retention policies on telegraf name duration shardGroupDuration replicaN default ---- -------- ------------------ -------- ------- autogen 0s 168h0m0s 1 false role_01 2h0m0s 1h0m0s 1 true > # 删除role_01的策略 > drop retention POLICY "role_01" ON "telegraf" > # 查看删除后的策略,可以看到剩余的策略autogen并不会自动设置为默认default策略 > show retention policies on telegraf name duration shardGroupDuration replicaN default ---- -------- ------------------ -------- ------- autogen 0s 168h0m0s 1 false > # 修改autogen策略为default策略 > ALTER RETENTION POLICY "autogen" ON "telegraf" DEFAULT > > show retention policies on telegraf name duration shardGroupDuration replicaN default ---- -------- ------------------ -------- ------- autogen 0s 168h0m0s 1 true >
部署 infludb官网:https://docs.influxdata.com/
因公司只用到了kubernetes,二进制部署这里就不详细说明了,influxdb和mysql一样是标准的c/s架构服务,部署好influxdb用influxclient连接使用即可。
helm部署influxb(K8S) 1 2 3 4 5 6 7 8 9 10 11 12 helm repo add stable http://mirror.azure.cn/kubernetes/charts helm pull bitnami/influxdb tar -xvf influxdb-4.3.2.tgz #解压helm包 cd influxdb vi values.yaml ---- persistence: enabled: true storageClass: "nfs" #主要是要配置pv/pvc能正常创建 --- helm install influxdb . -n 部署的名称空间 如果这里需要外部访问还需要改一下svc配置
备份迁移 influxdb常规迁移方式 influxdb数据库默认只启用了 8086 的数据库访问端口服务,而如果要使用 备份 与 还原 的功能,则需要单独另外启用一个端口服务。配置如下:
influxdb.conf
1 2 3 4 5 在influxdb的配置文件中,取消注释,这里如果是远程备份恢复,就要改一下ip。 #bind-address = "127.0.0.1:8088" bind-address = "0.0.0.0:8088" --- 配置好了之后,重启一下 influxdb 数据库。
备份 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 备份单个库: influxd backup -portable -database mydatabase -host <remote-node-IP>:8088 /tmp/mysnapshot --- mydatabase: 为数据库的名称 <remote-node-IP>:为刚刚在备份数据库设置的备份服务器IP,如果是本机就127.0.0.1 /tmp/mysnapshot: 备份数据库的文件夹 备份所有的数据库: influxd backup -portable /path/to/backup-directory -host <remote-node-IP>:8088 备份特定的时间范围: influxd backup -portable -start xxxx-01-01T00:00:00Z -stop xxxx-02-01T00:00:00Z /path/to/backup-directory -host <remote-node-IP>:8088 备份特定的保留策略: influxd backup -portable -db example-db -rp example-retention-policy /path/to/backup-directory -host <remote-node-IP>:8088 备份特定的shard: influxd backup -portable -rp example-retention-policy -shard 123 /path/to/backup-directory -host <remote-node-IP>:8088
恢复 1 2 3 4 5 将所有数据恢复到远程InfluxDB实例: influxd restore -portable -host 203.0.113.0:8088 /path/to/backup-directory -host <remote-node-IP>:8088 恢复一个特定的数据库: influxd restore -portable -db example-db /path/to/backup-directory -host <remote-node-IP>:8088
这里备份恢复如果是本地的话可以不带-host :8088参数
influxdb非常规迁移方式 问题 如果influxdb备份服务器端本身无法监听或开启8088端口,那就无法再使用influxd backup 方式备份。
解决方法 可以通过select查询库中的表,导出为指定的格式,然后处理数据再导入到新的库中,这里的处理指的是influxdb通过查询导出的数据格式是无法直接就导入到influxdb的库中的,并且这种方式需要逐个表导出导入,很麻烦,属于是没有办法的办法。
实现 1 2 3 4 5 这里一般导出可以导出为json或者csv格式,但格式不一样数据处理方式和导入方式也不一样 json导出: influx -ssl -host influxdbhost -port 8086 -username admin -password admin -database 'env_pro' -execute "select * from transformer" -format json > transormer.json csv导出: influx -ssl -host influxdbhost -port 8086 -username admin -password admin -database 'env_pro' -execute "select * from transformer" -format csv > transormer.csv
json处理并导入 这里使用python导入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 # #transformer单个json import json from influxdb import InfluxDBClient import datetime # 连接 InfluxDB client = InfluxDBClient(host='xxxx', port=8086, username='admin', password='xxx', database='数据库名') # 从 JSON 文件中读取数据并写入 InfluxDB with open(f'C:\\Users\\Finsiot\\Desktop\\临时文件夹\\transormer.json', encoding='utf-8') as f: data = json.load(f) for result in data['results']: for series in result['series']: measurement = series['name'] columns = series['columns'] values = series['values'] points = [] for value in values: point = {'measurement': measurement} #表名 tags = {} fields = {} datatime = {} for i, col in enumerate(columns): if col == 'id': #TAGS tags[col] = value[i] elif col == 'time': # 处理时间戳 timestamp = int(value[i] / 1e9) # 将纳秒级别的Unix时间戳转换为秒级别的时间戳 dt_object = datetime.datetime.utcfromtimestamp(timestamp) time_string = dt_object.strftime('%Y-%m-%dT%H:%M:%S.%fZ') point['time'] = time_string elif col == 'current': #这里写普通字段类型转换(非tag字段) fields[col] = float(value[i]) # 将 current 字段转换为 float 类型 else: fields[col] = value[i] point['tags'] = tags point['fields'] = fields point['timestamp'] = timestamp points.append(point) client.write_points(points) #写入到influxdb --- 备注: 1.这里如果tag字段都放在if col == 'id':中,这里表示id是tags,如果有多个字段例如id、name都是tags,那么应该写成 if col == 'id' or 'name': 2. elif col == 'time'下面是处理时间戳的代码 3.elif col == 'current':这里写需要转换字段类型的非tags字段,如果tags需要转换就卸载第一条下面 4.其他(默认为字符串型)字段都写在else下面
csv处理并导入 csv可以转换成文本格式并导入,这个速度远快于json,但缺点是需要将数据转换成txt文本格式并通过influx客户端命令导入,而python可以直接通过influxdb模块导入json格式,这里要多一步。还有一个缺点是文本格式无法传NULL空字符串。
首先导出的csv数据格式大概如下:表名、时间戳、字段
转换成可直接导入的文件文件格式大致如下
1 2 3 4 5 6 7 # DDL CREATE DATABASE 数据库名 # DML # CONTEXT-DATABASE: highway temperature,id=408 current=1.907 1665384025 temperature,id=725 current=32.832 1665384024
格式解析: 导入的字段数据主要格式为 “表名,tag字段 普通字段 时间戳”,如果有多个字段则可以写成 “”表名,tags字段=value,tags字段2=value 普通字段=value,普通字段2=value 时间戳”
这个也是默认为字符串(string)格式,如果要使用其他格式,这里需要在字段value后面加上冒号:和对应字段类型的符号。例如
1 2 3 temperature,location=office value=72.6:f 1556896326000000000 temperature,location=kitchen value=68.3:f 1556896428000000000 这里:f就是定义字段类型
1 2 3 4 5 字段类型: i:整数类型 f:浮点类型 u:无符号整数类型 b:布尔类型
然后就是influxdb导入txt文本格式的命令
1 2 influx -import -path=/path/to/myfile.txt -precision=s -database=my_database -username <username> -password <password> 这里需要注意的是-precision可以有两种值:s和ns ,代码是秒还是纳秒,如果本身是纳秒级数据却导入秒可能造成数据合并丢失,这个需要注意。
最后是将csv转换为txt的python脚本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import pandas as pd import time import os import csv import math tablename = 'demo_transformer' databasename = 'env_pro' t = time.time() df = pd.read_csv(f'C:\\Users\\Finsiot\\Desktop\\临时文件夹\\{tablename}.csv') # df = df[['name','time','id','current',]] feiguanjianziduan=[''] with open(f'C:\\Users\\Finsiot\\Desktop\\临时文件夹\\{tablename}.txt', 'w') as f1: f1.write('# DDL\n') f1.write(f'CREATE DATABASE {databasename}\n') f1.write('\n# DML') f1.write(f'\n# CONTEXT-DATABASE: {databasename}') for i in range(1,2): for index, row in df.iterrows(): f1.write(f'\n{row["name"]}') # 表名 f1.write(',id=' + f"{row['id']}") # tag名称 f1.write(' current=' + f"{row['current']}") f1.write(' ' + f"{int(pd.Timestamp(row['time']).timestamp())}") # 时间戳,时间字段 f1.write('\n')