之前介绍了运维监控系统Prometheus,然后就有同鞋问我关于时序数据库的情况,所以这里总结一下时序数据库,并以InfluxDB为例,介绍时序数据库的功能特性和使用方式,希望能对大家有所帮助。
一、时序数据库概述1.1 什么是时序数据库
时序数据是一组按照时间维度索引的数据。时序数据在日常生活中随处可见,比如每个整点的温度、湿度等天气数据,每分钟的股票价格数据等。我们常用曲线图、柱状图等形式去展现时序数据,也就是我们常常听到的“数据可视化”。
时序数据库是一种非关系型数据库,以时间作为数据主键,专门用来存储时序数据。
1.2 时序数据库的特点
1.3 时序数据库的使用场景
目前比较流行的时序数据库有:InfluxDB、Prometheus、OpenTSDB、TDengine等,其中使用最广泛的当属InfluxDB,行业内应用最广泛。还有就是刚进入业内视野的国产时序数据库TDengine。而Prometheus则是Prometheus监控系统自带的数据库。
二、InfluxDB简介2.1 什么是InfluxDB
InfluxDB 是一个用于存储和分析时间序列数据的开源数据库。由 Golang 语言编写,也是由 Golang 编写的软件中比较著名的一个,在很多 Golang 的沙龙或者文章中可能都会把 InfluxDB 当标杆来介绍,这也间接帮助 InfluxDB 提高了知名度。
2.2 InfluxDB的特性
在最新的 DB-ENGINES 给出的时间序列数据库的排名中,InfluxDB 高居第一位,可以预见,InfluxDB 会越来越得到广泛的使用。
2.3 InfluxDB几个基本概念
时序数据库由于其存储海量时序数据的特性,因此与传统数据库有些许不同,下面先对influxdb中涉及的基本概念作出解释。
influxdb数据库由database、measurement、point等三部分构成。分别对应关系数据库中的数据库、表、数据行。
概念
MySQL
InfluxDB
数据库(同)
database
database
表(不同)
table
measurement(测量; 度量)
列(不同)
column
Point,包括:tag(带索引的,非必须)、field(不带索引)、timestemp(唯一主键)
2.4 Point数据构成
由于database和measurement与传统数据库基本相同,这里不做过多解释,以下针对influxdb中特有的Point进行讲解。
Point是InfluxDB中独有的概念,由时间(time)、数据(field)、标签(tags)三类字段组成。
(1)time:代表每条数据的时间字段,是measurement中的数据主键,因此time字段具有索引属性。一条point只能有一个time。
(2)field:代表各种数据的字段,例如气温、压力、股价等,field字段没有索引属性。一条point可以包括多个field。
(3)tag:代表各类非数据字段,例如设备编码、地区、姓名等,tag字段有索引属性。一条point可以包括多个tag。
例如:监控系统系统中,保存某个服务器的cpu和内存等资源使用情况,使用cpu_usage_total 的表名(measurement)保存数据。以下表示某一个point的样例数据:
其中time为time字段,记录数据产生的时间;cpu_usage和memory_usage分别代表CPU使用率和内存使用率,因此他们是field字段,真正的监控数据;cpu 和host代表CPU的名字和服务器IP,所以,他们是tag字段,用于查询和检索。
在使用和设计InfluxDB数据结构时,需要注意以下几点:
三、InfluxDB安装
InfluxDB安装非常简单,根据操作系统执行对应的安装命令即可。这里以window为例,演示如何安装InfluxDB。
3.1 下载
InfluxDB:https://dl.influxdata.com/influxdb/releases/influxdb-1.7.4_windows_amd64.zip
chronograf :https://dl.influxdata.com/chronograf/releases/chronograf-1.7.8_windows_amd64.zip
chronograf为InfluxDB的Web后台管理端,InfluxDB提供了控制台命令端,如果使用不习惯,可以使用chronograf。
3.2 解压安装包
软件下载成功后,解压,我们可以看到influxDB的数据库文件非常简单。如下图所示:
3.3 修改配置文件
InfluxDB 的数据存储主要有三个目录。默认情况下是 meta, wal 以及 data 三个目录,程序启动后会自动生成。
接下来修改influxdb.conf 配置文件,修改以下部分的路径。
另外,InfluxDB服务默认端口为8086,如果需要更改端口号,则增加以下配置。
3.4 启动InfluxDB服务
配置文件修改完成后,接下来启动InfluxDB服务。直接运行Influxd.exe使用默认配置运行即可。如果需要使用自定义的配置文件,则指定conf文件进行启动,启动命令如下:
#先cmd 进入influxDB目录influxd.exe -config influxdb.conf
看到如下输出,说明InfluxDB启动成功。
四、InfluxDB使用
InfluxQL是一种类似于SQL的查询语言,用于与InfluxDB进行交互。如果你使用过关系数据库及SQL,那么你可以很快速的掌握InfluxQL。但是,InfluxQL又不完全是SQL,缺乏SQL中的一些高级的语法,例如UNION,JOIN,HAVING等。
那么InfluxDB的到底如何操作呢?接下来介绍InfluxQL语言的使用方法。
4.1 连接InfluxDB服务
进入到InfluxDB目录后,在cmd中输入influx命令即可,命令如下:
# 使用Command命令行进入influxdbinflux -port 8086
如果使用的是默认配置,可以不需要加端口,直接influx即可。
4.2 操作InfluxDB
InfluxQL与SQL命令语法类似。接下来我们看一看InfluxQL 是怎么使用的?
4.2.1创建数据库# 创建数据库CREATE DATABASE weiz_tes# 显示所有数据库SHOW DATABASES# 删除数据库DROP DATABASE weiz_test# 使用数据库USE weiz_test
4.2.2 表操作
1.创建表
InfluxDB没有专门的创建表的命令,当插入一条数据point至某A表时,此A表会自动创建,并且表的格式、字段名、字段类型也由此条插入命令决定。
2.修改表
InfluxDB没有修改表的命令,但当插入一条新数据point至表A时,如果此point中的字段多于原A表的字段,会自动修改A表与此条插入数据的格式字段等一致。
注意:此种情况仅限于新插入的数据字段与表A字段的交集即表A的情况,如果新插入数据字段与表A完全不同则会插入失败。
3.查询表
# 显示该数据库中的表SHOW MEASUREMENTS
4.删除表:
DROP MEASUREMENT "measurementName"
5.插入数据
insert host_cpu_usage_total,host_name=host1,cpu_core=core1 cpu_usage=0.26,cpu_idle=0.76
上面,我们新增一条数据,measurement为host_cpu_usage_total, tag为host_name,cpu_core, field为cpu_usage,cpu_idle。
我们简单小结一下插入的语句写法:
6.查询数据
select * from host_cpu_usage_total
查询语句使用select 关键字,格式与mysql 基本一致。
4.2.3 用户管理
InfluxDB 默认管理员账号:admin,密码为空。我们可以新增用户和权限。命令如下:
#显示用户show users#创建用户create user "username" with password 'password'#创建管理员权限用户create user "username" with password 'password' with all privileges#删除用户drop user "username"
以上是对InfluxDB数据库操作的基本总结,其他复杂的用法可以参考官网教程。官网教程地址:https://docs.influxdata.com/influxdb/v1.7/。
五、SpringBoot整合InfluxDB
前面介绍了InfluxDB的基本安装和使用。接下来我们介绍SpringBoot项目如何整合InfluxDB,实现数据的增删改查。这里使用的Spring Boot版本为2.4.1。接下来看看如何实现的。
5.1 添加依赖
首先创建springboot项目spring-boot-starter-influxdb,并添加相关依赖,具体依赖如下:
org.springframework.bootspring-boot-starterorg.springframework.bootspring-boot-starter-testtestorg.influxdbinfluxdb-JAVA2.14
5.2 修改Application.properties 配置
接下来修改application.properties 配置文件,增加InfluxDB的相关配置,具体如下:
#influxdb 配置spring.influx.url=http://localhost:8086spring.influx.user=adminspring.influx.password=spring.influx.database=weiz_test
上面配置的是InfluxDB数据库连接配置,默认url为:http://localhost:8086 ,数据库为之前创建的weiz_test数据库。用户名为admin,密码默认为空。
5.3 读取配置文件
创建InfluxDBConfig类,负责读取Influx的数据库连接配置。具体代码如下:
@Configurationpublic class InfluxDBConfig {@Value("${spring.influx.user}")public String userName;@Value("${spring.influx.password}")public String password;@Value("${spring.influx.url}")public String url;//数据库@Value("${spring.influx.database}")public String database;
5.4 数据库操作类
创建数据库操作类InfluxDBService,负责数据库的初始化,增删改查等操作的具体实现,示例代码如下:
@Servicepublic class InfluxDBService {@Autowiredprivate InfluxDBConfig influxDBConfig;@PostConstructpublic void initInfluxDb() {this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy;this.influxDB = influxDbBuild();//保留策略private String retentionPolicy;private InfluxDB influxDB;* 设置数据保存策略 defalut 策略名 /database 数据库名/ 30d 数据保存时限30天/ 1 副本个数为1/ 结尾DEFAULT* 表示 设为默认的策略public void createRetentionPolicy() {String command = String.format("CREATE RETENTION POLICY "%s" ON "%s" DURATION %s REPLICATION %s DEFAULT","defalut", influxDBConfig.database, "30d", 1);this.query(command);* 连接时序数据库;获得InfluxDBprivate InfluxDB influxDbBuild() {if (influxDB == null) {influxDB = InfluxDBFactory.connect(influxDBConfig.url, influxDBConfig.userName, influxDBConfig.password);influxDB.setDatabase(influxDBConfig.database);return influxDB;* 插入* @param measurement 表* @param tags 标签* @param fields 字段public void insert(String measurement, Map tags, Map fields) {influxDbBuild();Point.Builder builder = Point.measurement(measurement);builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS);builder.tag(tags);builder.fields(fields);influxDB.write(influxDBConfig.database, "", builder.build());* @desc 插入,带时间time* @date 2021/3/27*@param measurement*@param time*@param tags*@param fields* @return voidpublic void insert(String measurement, long time, Map tags, Map fields) {influxDbBuild();Point.Builder builder = Point.measurement(measurement);builder.time(time, TimeUnit.MILLISECONDS);builder.tag(tags);builder.fields(fields);influxDB.write(influxDBConfig.database, "", builder.build());* @desc influxDB开启UDP功能,默认端口:8089,默认数据库:udp,没提供代码传数据库功能接口* @date 2021/3/13*@param measurement*@param time*@param tags*@param fields* @return voidpublic void insertUDP(String measurement, long time, Map tags, Map fields) {influxDbBuild();Point.Builder builder = Point.measurement(measurement);builder.time(time, TimeUnit.MILLISECONDS);builder.tag(tags);builder.fields(fields);int udpPort = 8089;influxDB.write(udpPort, builder.build());* 查询* @param command 查询语句* @returnpublic QueryResult query(String command) {influxDbBuild();return influxDB.query(new Query(command, influxDBConfig.database));* @desc 查询结果处理* @date 2021/5/12*@param queryResultpublic List> queryResultProcess(QueryResult queryResult) {List> mapList = new ArrayList<>();List resultList = queryResult.getResults();//把查询出的结果集转换成对应的实体对象,聚合成listfor(QueryResult.Result query : resultList){List seriesList = query.getSeries();if(seriesList != null && seriesList.size() != 0) {for(QueryResult.Series series : seriesList){List columns = series.getColumns();String[] keys = columns.toArray(new String[columns.size()]);List> values = series.getValues();if(values != null && values.size() != 0) {for(List value : values){Map map = new HashMap(keys.length);for (int i = 0; i < keys.length; i++) {map.put(keys[i], value.get(i));mapList.add(map);return mapList;* @desc InfluxDB 查询 count总条数* @date 2021/4/8public long countResultProcess(QueryResult queryResult) {long count = 0;List> list = queryResultProcess(queryResult);if(list != null && list.size() != 0) {Map map = list.get(0);double num = (Double)map.get("count");count = new Double(num).longValue();return count;* 查询* @param dbName 创建数据库* @returnpublic void createDB(String dbName) {influxDbBuild();influxDB.createDatabase(dbName);* 批量写入测点* @param batchPointspublic void batchInsert(BatchPoints batchPoints) {influxDbBuild();influxDB.write(batchPoints);* 批量写入数据* @param database 数据库* @param retentionPolicy 保存策略* @param consistency 一致性* @param records 要保存的数据(调用BatchPoints.lineProtocol()可得到一条record)public void batchInsert(final String database, final String retentionPolicy,final InfluxDB.ConsistencyLevel consistency, final List records) {influxDbBuild();influxDB.write(database, retentionPolicy, consistency, records);* @desc 批量写入数据* @date 2021/3/19*@param consistency*@param recordspublic void batchInsert(final InfluxDB.ConsistencyLevel consistency, final List records) {influxDbBuild();influxDB.write(influxDBConfig.database, "", consistency, records);
5.5 测试验证
接下来,我们写几个单元测试,验证数据的增删改查等操作是否成功。单元测试代码如下:
@SpringBootTestclass Example01ApplicationTests {@Autowiredprivate InfluxDBService influxDBService;@Testvoid contextLoads() {@Testpublic void testSave(){String measurement = "host_cpu_usage_total";Map tags = new HashMap<>();tags.put("host_name","host2");tags.put("cpu_core","core0");Map fields = new HashMap<>();fields.put("cpu_usage",0.22);fields.put("cpu_idle",0.56);influxDBService.insert(measurement, tags, fields);@Testpublic void testGetdata(){String command = "select * from host_cpu_usage_total";QueryResult queryResult = influxDBService.query(command);List> result = influxDBService.queryResultProcess(queryResult);for (Map map: result) {System.out.println("time:"+ map.get("time")+" host_name:" + map.get("host_name")+" cpu_core:" + map.get("cpu_core")+" cpu_usage:" + map.get("host_name")+" cpu_idle:" + map.get("host_name"));
运行上面的新增和查询等单元测试,单击Run Test或在方法上右击,选择Run 'testSave' ,查看单元测试结果,运行结果如下图所示。
接下来调用数据验证数据查询,运行'testGetData'测试方法,运行结果如下图所示:
保存和查询等功能的单元测试运行成功,说明InfluxDB的增加和查询操作执行成功。
最后
以上,我们就把时序数据库InfluxDB介绍完了,并通过示例介绍了如何在SpringBoot项目中整合InfluxDB。示例代码也会同步上传:https://gitee.com/weizhong1988/spring-boot-starter 。如有疑问,请在下方留言!
InfluxDB在系统监控、物联网等方面的应用越来越多,希望大家能够熟练掌握。