0%

influxdb

influxdb

InfluxDB是一个用于存储和分析时间序列数据的开源数据库。也是使用最多的一个时序数据库。

什么是时序数据库?时序数据库是近几年一个新的概念,与传统的Mysql关系型数据库相比,它的最大的特点是:数据按照时间顺序存储。

举例来说,日志数据,是以时间顺序存储的,所以用时序数据库存储是一种很好的选择。使用Mysql在存储的过程中,不是对这种基于时间的数据进行优化的,所以,在查询、插入上有一些瓶颈。

InfluxDB 的特点

InfluxDB有很多特点,如下:

  • 内置HTTP接口,使用方便

  • 数据可以打标记,这样查询可以很灵活

  • 类SQL的查询语句

  • 安装管理很简单,并且读写数据很高效

  • 能够实时查询,数据在写入时被索引后就能够被立即查出

对常见关系型数据库(MySQL)的基础概念对比

概念 MySQL InfluxDB
数据库(同) database database
表(不同) table measurement(测量; 度量)
列(不同) column tag(带索引的,非必须)、field(不带索引)、timestemp(唯一主键)

Influxdb相关概念

1
2
3
4
database:数据库;
measurement:数据库中的表;
points:表里面的一行数据。
influxDB中独有的一些概念:Point由时间戳(time)、数据(field)和标签(tags)组成。

point

Point相当于传统数据库里的一行数据,如下表所示:

Point属性 传统数据库中的概念
timestamp(时间戳) 每个数据记录时间,是数据库中的主索引(会自动生成)。 每行记录都有一列time,主索引,记录时间戳,单位纳秒,时区UTC(东八区减8小时)
fields(字段、数据) 各种记录值(没有索引的属性)也就是记录的值:温度, 湿度。 普通列,key-value结构,value数据类型支持型(float、integer、string、boolean)
tags(标签) 各种有索引的属性:地区,海拔 索引列,key-value结构,value数据类型只支持string

image-20230316133947847

1
首先时间戳是必须的,查询的主要条件基本也是以时间作为单位(本身就是时序数据库),所以这里时间对应了mysql表的唯一主键。其次tag主要是索引,fields就是普通字段。

fields数据类型

类型 备注
float influxdb的fields默认是float浮点型
integer 整型,insert语句如需写入field是整型,需在数值后面加个i
string 字符串,insert语句如需写入field是字符串,需英文双引号包含数值
boolean 布尔型,真可以用 t , T , true , True , TRUE表示;假可以用 f , F , false , False 或者 FALSE表示

Influxdb常用命令

常用sql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- 查看所有的数据库
show databases;
-- 使用特定的数据库
use database_name;
-- 查看所有的measurement
show measurements;
-- 查询10条数据
select * from measurement_name limit 10;
-- 数据中的时间字段默认显示的是一个纳秒时间戳,改成可读格式
precision rfc3339; -- 之后再查询,时间就是rfc3339标准格式
-- 或可以在连接数据库的时候,直接带该参数
influx -precision rfc3339
-- 查看一个measurement中所有的tag key
show tag keys
-- 查看一个measurement中所有的field key
show field keys
-- 查看一个measurement中所有的保存策略(可以有多个,一个标识为default)
show retention policies;

登录

1
2
3
4
5
$influx -h
$influx
Connected to http://localhost:8086 version 1.8.0
InfluxDB shell version: 1.8.0
>

用户创建

1
2
3
4
5
6
> CREATE USER knight WITH PASSWORD 'knight' WITH ALL PRIVILEGES
> show users
user admin
---- -----
knight true
>

权限管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 创建用户
create user xxx with password 'pwd'

# 重设密码
set password for xxx='newpwd'

# 删除用户
drop user xxx

# 删除库
drop database db_hw_ces

# 给所有用户权限
GRANT ALL PRIVILEGES TO <username>

# 给库的权限
GRANT [READ,WRITE,ALL] privileges ON <database_name> TO <username>
GRANT all privileges ON db_hw_ces TO devops_rw

##
GRANT READ ON tsdb_tbox_dev TO hwy_tbox_rw
GRANT WRITE ON tsdb_tbox_dev TO hwy_tbox_rw


# 权限回收
REVOKE ALL PRIVILEGES FROM <username>
REVOKE [READ,WRITE,ALL] ON <database_name> FROM <username>

# 查询权限
show grants for <username>

HTTP API

1
2
3
4
5
6
7
8
9
10
11
12
# 创建数据库
curl -i -XPOST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE test01"

## 写入数据
### 写入单条数据
curl -i -XPOST http://localhost:8086/write?db=test01 --data-binary "cpu,host=me03 load=0.1,usage=0.33"

### 写入多条数据
curl -i -XPOST http://localhost:8086/write?db=test01 --data-binary "cpu,host=me03 load=0.1,usage=0.22 166"

## 查询数据
curl -G http://localhost:8086/query?db=test01 --data-urlencode "q=SELECT * FROM \"cpu\""

其他数据操作命令基本与mysql一致(sql)。

influxdb数据保留策略

InfluxDB 数据保留策略说明

InfluxDB的数据保留策略(RP)用来定义数据在InfluxDB中存放的时间,或者定义保存某个期间的数据。
一个数据库可以有多个保留策略, 但每个策略必须是独一无二的。

InfluxDB数据保留策略目的

InfluxDB本身不提供数据的删除操作, 因此用来控制数据量的方式就是定义数据保留策略。
因此定义数据保留策略的目的是让InfluxDB能够知道可以丢弃哪些数据, 节省数据存储空间,避免数据冗余的情况。

查看数据库保留策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
show retention policies on 数据库名

# 选择使用telegraf数据库
> use telegraf;
Using database telegraf
>
>
# 查询数据保留策略
> show retention policies on telegraf
name duration shardGroupDuration replicaN default
---- -------- ------------------ -------- -------
autogen 0s 168h0m0s 1 true


name 策略名称:默认autogen
duration 持续时间: 0s 代表无限制
shardGroupDuration shardGroup数据存储时间:shardGroup是InfluxDB的一个基本存储结构, 大于这个时间的数据在查询效率上应该有所降低。
replicaN 副本个数:1 代表只有一个副本
default 是否默认策略:true 代表设置为该数据库的默认策略

默认是168h0m0s 的降低查询效率策略且永不删除。

新建数据保留策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 新建一个策略
CREATE RETENTION POLICY "策略名称" ON 数据库名 DURATION 时长 REPLICATION 副本个数;

# 新建一个策略并且直接设置为默认策略
CREATE RETENTION POLICY "策略名称" ON 数据库名 DURATION 时长 REPLICATION 副本个数 DEFAULT;
————————————————

# 创建新的默认策略之前的策略
> show retention policies on telegraf
name duration shardGroupDuration replicaN default
---- -------- ------------------ -------- -------
autogen 0s 168h0m0s 1 true
>
>
# 创建新的默认策略role_01保留数据时长1小时
> CREATE RETENTION POLICY "role_01" ON telegraf DURATION 1h REPLICATION 1 DEFAULT;
>
# 查看策略的变化
> show retention policies on telegraf
name duration shardGroupDuration replicaN default
---- -------- ------------------ -------- -------
autogen 0s 168h0m0s 1 false
role_01 1h0m0s 1h0m0s 1 true

————————————————

修改数据保留策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ALTER RETENTION POLICY "策略名称" ON "数据库名" DURATION 时长
ALTER RETENTION POLICY "策略名称" ON "数据库名" DURATION 时长 DEFAULT
————————————————
> show retention policies on telegraf
name duration shardGroupDuration replicaN default
---- -------- ------------------ -------- -------
autogen 0s 168h0m0s 1 false
role_01 1h0m0s 1h0m0s 1 true
>
>
# 执行修改时长为2小时
> ALTER RETENTION POLICY "role_01" ON "telegraf" DURATION 2h
>
# 可以看到role_01的duration为2h
> show retention policies on telegraf
name duration shardGroupDuration replicaN default
---- -------- ------------------ -------- -------
autogen 0s 168h0m0s 1 false
role_01 2h0m0s 1h0m0s 1 true
————————————————

删除数据保留策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
drop retention POLICY "策略名" ON "数据库名"
————————————————
# 查看当前的数据保留策略
> show retention policies on telegraf
name duration shardGroupDuration replicaN default
---- -------- ------------------ -------- -------
autogen 0s 168h0m0s 1 false
role_01 2h0m0s 1h0m0s 1 true
>
# 删除role_01的策略
> drop retention POLICY "role_01" ON "telegraf"
>
# 查看删除后的策略,可以看到剩余的策略autogen并不会自动设置为默认default策略
> show retention policies on telegraf
name duration shardGroupDuration replicaN default
---- -------- ------------------ -------- -------
autogen 0s 168h0m0s 1 false
>
# 修改autogen策略为default策略
> ALTER RETENTION POLICY "autogen" ON "telegraf" DEFAULT
>
> show retention policies on telegraf
name duration shardGroupDuration replicaN default
---- -------- ------------------ -------- -------
autogen 0s 168h0m0s 1 true
>

部署

infludb官网:https://docs.influxdata.com/

因公司只用到了kubernetes,二进制部署这里就不详细说明了,influxdb和mysql一样是标准的c/s架构服务,部署好influxdb用influxclient连接使用即可。

helm部署influxb(K8S)

1
2
3
4
5
6
7
8
9
10
11
12
helm repo add stable  http://mirror.azure.cn/kubernetes/charts
helm pull bitnami/influxdb
tar -xvf influxdb-4.3.2.tgz #解压helm包
cd influxdb
vi values.yaml
----
persistence:
enabled: true
storageClass: "nfs" #主要是要配置pv/pvc能正常创建
---
helm install influxdb . -n 部署的名称空间
如果这里需要外部访问还需要改一下svc配置

备份迁移

influxdb常规迁移方式

influxdb数据库默认只启用了 8086 的数据库访问端口服务,而如果要使用 备份 与 还原 的功能,则需要单独另外启用一个端口服务。配置如下:

influxdb.conf

1
2
3
4
5
在influxdb的配置文件中,取消注释,这里如果是远程备份恢复,就要改一下ip。
#bind-address = "127.0.0.1:8088"
bind-address = "0.0.0.0:8088"
---
配置好了之后,重启一下 influxdb 数据库。

备份

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
备份单个库:
influxd backup -portable -database mydatabase -host <remote-node-IP>:8088 /tmp/mysnapshot
---
mydatabase: 为数据库的名称
<remote-node-IP>:为刚刚在备份数据库设置的备份服务器IP,如果是本机就127.0.0.1
/tmp/mysnapshot: 备份数据库的文件夹

备份所有的数据库:
influxd backup -portable /path/to/backup-directory -host <remote-node-IP>:8088

备份特定的时间范围:

influxd backup -portable -start xxxx-01-01T00:00:00Z -stop xxxx-02-01T00:00:00Z /path/to/backup-directory
-host <remote-node-IP>:8088
备份特定的保留策略:
influxd backup -portable -db example-db -rp example-retention-policy /path/to/backup-directory -host <remote-node-IP>:8088
备份特定的shard:
influxd backup -portable -rp example-retention-policy -shard 123 /path/to/backup-directory -host <remote-node-IP>:8088

恢复

1
2
3
4
5
将所有数据恢复到远程InfluxDB实例:
influxd restore -portable -host 203.0.113.0:8088 /path/to/backup-directory -host <remote-node-IP>:8088

恢复一个特定的数据库:
influxd restore -portable -db example-db /path/to/backup-directory -host <remote-node-IP>:8088

这里备份恢复如果是本地的话可以不带-host :8088参数

influxdb非常规迁移方式

问题

如果influxdb备份服务器端本身无法监听或开启8088端口,那就无法再使用influxd backup 方式备份。

解决方法

可以通过select查询库中的表,导出为指定的格式,然后处理数据再导入到新的库中,这里的处理指的是influxdb通过查询导出的数据格式是无法直接就导入到influxdb的库中的,并且这种方式需要逐个表导出导入,很麻烦,属于是没有办法的办法。

实现

1
2
3
4
5
这里一般导出可以导出为json或者csv格式,但格式不一样数据处理方式和导入方式也不一样
json导出:
influx -ssl -host influxdbhost -port 8086 -username admin -password admin -database 'env_pro' -execute "select * from transformer" -format json > transormer.json
csv导出:
influx -ssl -host influxdbhost -port 8086 -username admin -password admin -database 'env_pro' -execute "select * from transformer" -format csv > transormer.csv
json处理并导入

这里使用python导入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# #transformer单个json
import json
from influxdb import InfluxDBClient
import datetime
# 连接 InfluxDB
client = InfluxDBClient(host='xxxx', port=8086, username='admin', password='xxx', database='数据库名')
# 从 JSON 文件中读取数据并写入 InfluxDB
with open(f'C:\\Users\\Finsiot\\Desktop\\临时文件夹\\transormer.json', encoding='utf-8') as f:
data = json.load(f)
for result in data['results']:
for series in result['series']:
measurement = series['name']
columns = series['columns']
values = series['values']
points = []
for value in values:
point = {'measurement': measurement} #表名
tags = {}
fields = {}
datatime = {}
for i, col in enumerate(columns):
if col == 'id': #TAGS
tags[col] = value[i]
elif col == 'time':
# 处理时间戳
timestamp = int(value[i] / 1e9) # 将纳秒级别的Unix时间戳转换为秒级别的时间戳
dt_object = datetime.datetime.utcfromtimestamp(timestamp)
time_string = dt_object.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
point['time'] = time_string
elif col == 'current': #这里写普通字段类型转换(非tag字段)
fields[col] = float(value[i]) # 将 current 字段转换为 float 类型
else:
fields[col] = value[i]
point['tags'] = tags
point['fields'] = fields
point['timestamp'] = timestamp
points.append(point)
client.write_points(points) #写入到influxdb
---
备注: 1.这里如果tag字段都放在if col == 'id':中,这里表示id是tags,如果有多个字段例如id、name都是tags,那么应该写成 if col == 'id' or 'name':
2. elif col == 'time'下面是处理时间戳的代码
3.elif col == 'current':这里写需要转换字段类型的非tags字段,如果tags需要转换就卸载第一条下面
4.其他(默认为字符串型)字段都写在else下面
csv处理并导入

csv可以转换成文本格式并导入,这个速度远快于json,但缺点是需要将数据转换成txt文本格式并通过influx客户端命令导入,而python可以直接通过influxdb模块导入json格式,这里要多一步。还有一个缺点是文本格式无法传NULL空字符串。

首先导出的csv数据格式大概如下:表名、时间戳、字段

image-20230420180310020

转换成可直接导入的文件文件格式大致如下

1
2
3
4
5
6
7
# DDL
CREATE DATABASE 数据库名

# DML
# CONTEXT-DATABASE: highway
temperature,id=408 current=1.907 1665384025
temperature,id=725 current=32.832 1665384024

格式解析: 导入的字段数据主要格式为 “表名,tag字段 普通字段 时间戳”,如果有多个字段则可以写成 “”表名,tags字段=value,tags字段2=value 普通字段=value,普通字段2=value 时间戳”

这个也是默认为字符串(string)格式,如果要使用其他格式,这里需要在字段value后面加上冒号:和对应字段类型的符号。例如

1
2
3
temperature,location=office value=72.6:f 1556896326000000000
temperature,location=kitchen value=68.3:f 1556896428000000000
这里:f就是定义字段类型
1
2
3
4
5
字段类型:
i:整数类型
f:浮点类型
u:无符号整数类型
b:布尔类型

然后就是influxdb导入txt文本格式的命令

1
2
influx -import -path=/path/to/myfile.txt -precision=s -database=my_database  -username <username> -password <password> 
这里需要注意的是-precision可以有两种值:s和ns ,代码是秒还是纳秒,如果本身是纳秒级数据却导入秒可能造成数据合并丢失,这个需要注意。

最后是将csv转换为txt的python脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import pandas as pd
import time
import os
import csv
import math
tablename = 'demo_transformer'
databasename = 'env_pro'
t = time.time()
df = pd.read_csv(f'C:\\Users\\Finsiot\\Desktop\\临时文件夹\\{tablename}.csv')
# df = df[['name','time','id','current',]]
feiguanjianziduan=['']
with open(f'C:\\Users\\Finsiot\\Desktop\\临时文件夹\\{tablename}.txt', 'w') as f1:
f1.write('# DDL\n')
f1.write(f'CREATE DATABASE {databasename}\n')
f1.write('\n# DML')
f1.write(f'\n# CONTEXT-DATABASE: {databasename}')
for i in range(1,2):
for index, row in df.iterrows():
f1.write(f'\n{row["name"]}') # 表名
f1.write(',id=' + f"{row['id']}") # tag名称
f1.write(' current=' + f"{row['current']}")
f1.write(' ' + f"{int(pd.Timestamp(row['time']).timestamp())}") # 时间戳,时间字段
f1.write('\n')