时序数据库之InfluxDB的安装与使用

news/2024/7/4 8:00:37 标签: 时序数据库, 数据库, database, java

InfluxDB数据库>时序数据库的使用

  • InfluxDB
    • InfluxDB概述
    • 相关概念
  • 安装InfluxDB
  • InfluxDB的操作
  • Spring Boot集成InfluxDB
    • 添加依赖
    • 创建数据库
    • 配置InfluxDB
    • 编码
    • 执行测试

InfluxDB

InfluxDB概述

InfluxDB 是一个从头开始构建的时间序列数据库,用于处理高写入和查询负载。InfluxDB 旨在用作任何涉及大量时间戳数据的用例的后备存储,包括 DevOps 监控、应用程序指标、物联网传感器数据和实时分析。

什么是时间序列的数据?从定义上来说,就是一串按时间维度索引的数据。

时间序列数据库(TSDB)特点:

  • 持续高并发写入、不用更新
  • 数据压缩存储
  • 低延时查询

常见的时间序列数据库除了InfluxDB之外还有:opentsdb、timeScaladb、Druid等。

官网: https://www.influxdata.com/

GitHub:https://github.com/influxdata/influxdb

中文文档:https://jasper-zhang1.gitbooks.io/influxdb/content/Introduction/installation.html

相关概念

nfuxDB传统数据库
database数据库
measurement数据库中的表
point表中的行

Point是由时间戳(time)、标签(tags)、数据(fields)三部分组成

point属性含义
time数据记录的时间,主索引,默认自动生成,相当于每行数据都具备的列
tags相当于有索引的列。tag中存储的值的类型是字符串类型
fieldsvalue值,没有索引的列。field中存储的值得类型:字符串、浮点数(Double)、整数、布尔型。一个field value总是和一个timestamp相关联

安装InfluxDB

拉取influxdb

java">docker pull influxdb:1.8

启动influxdb

java">docker run -d --name influxdb -p 8086:8086  influxdb:1.8

进入influxdb容器

java">docker exec -it influxdb /bin/bash

在命令行里输入influx,进入InfluxDB数据库环境

java">root@70783fdf1b06:/# influx
Connected to http://localhost:8086 version 1.8.10
InfluxDB shell version: 1.8.10
> 

InfluxDB的操作

用户管理

创建用户

java">> create user "root" with password 'root' with all privileges
> show users
user admin
---- -----
root true
> 

删除用户

java">> drop user root
> 
> show users
user admin
---- -----
> 

启用认证

配置influxdb.conf文件启用认证

java">docker cp influxdb:/etc/influxdb/influxdb.conf ./

docker cp ./influxdb.conf influxdb:/etc/influxdb/

docker restart influxdb

在influxdb.conf配置文件中添加如下内容

java[meta]">[http]
auth-enabled= true

使用账号密码登录

java">root@70783fdf1b06:/#  influx -username 'root' -password 'root'
Connected to http://localhost:8086 version 1.8.10
InfluxDB shell version: 1.8.10
> show users
user admin
---- -----
root true
> 

错误密码认证

java">root@70783fdf1b06:/#  influx -username 'root' -password 'root123'
Connected to http://localhost:8086 version 1.8.10
InfluxDB shell version: 1.8.10
> show users
ERR: authorization failed
Warning: It is possible this error is due to not setting a database.
Please set a database with the command "use <database>".
> 

数据库操作

显示数据库show databases

java">> show databases
name: databases
name
----
_internal
> 

新建数据库create database test

java">> create database test
> show databases
name: databases
name
----
_internal
test
> 

删除数据库命令:drop database test

java">> drop database test
> show databases
name: databases
name
----
_internal
> 

使用某个数据库,必须先创建create database test ,然后再使用use test

java">> create database test
> use test
Using database test
> 

数据库表操作

显示所有表:SHOW MEASUREMENTS

java">> SHOW MEASUREMENTS
> 

新建表:insert

InfluxDB中没有显式的新建表的语句,只能通过insert数据的方式来建立新表,或者在客户端程序代码中插入数据的时候就会自动建表。

以下命令表示:一个measurement为cpu,tag是host和region,value值为0.64的数据点被写入到InfluxDB中

java">> INSERT cpu,host=serverA,region=us_west value=0.64
> SHOW MEASUREMENTS
name: measurements
name
----
cpu
> 

删除表:drop measurement cpu

java">> drop measurement cpu
> SHOW MEASUREMENTS
> 

查询操作

InfluxQL是一种类似SQL的查询语言,用于与InfluxDB中的数据进行交互。

表数据查询:SELECT * FROM tb

java">>  SELECT "host", "region", "value" FROM "cpu"
name: cpu
time                host    region  value
----                ----    ------  -----
1642915045925737934 serverA us_west 0.64
> 

InfluxDB中的模糊搜索和mysql中的模糊搜索一样

查询包含给定字段数据

java">select fieldName from measurementName where fieldName=~/条件值/

查询以给定字段开始的数据

java">select fieldName from measurementName where fieldName=~/^条件值/

查询以给定字段结束的数据

java">select fieldName from measurementName where fieldName=~/条件值$/

统计记录个数

java">select count(value) from measurementName where time>='2021-01-01' and time<='2022-12-31' 

分页

java">select * from measurementName where time>='2021-01-01' and time<='2022-12-31' order by desc limit 10 offset 0

Spring Boot集成InfluxDB

添加依赖

java"><dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.21</version>
</dependency>

创建数据库

java">create database demo

配置InfluxDB

java">spring: 
  influx:
    user: root
    password: root
    url: http://IP:8086
    db: demo

编码

java">/**
 * 封装统计记录数
 **/
@Data
@Measurement(name = "bbu")
public class BbuCount {
    @Column(name = "count")
    private Long count;
}
java">/**
 * 在类上添加@Measurement注解用来标注最终存储的表名,这样在插入数据时会自动创建该Measurement,名字为bbu
 * <p>
 * 对应InfluxDB中tag的映射,在定义字段上加`@Column`注解,且用`name`来设置对应的tag名,并且设置`tag`为`true`证明是tag
 */
@Data
@Measurement(name = "bbu")
public class BbuInfo {

    /**
     * 设备编号
     */
    @Column(name = "deviceId", tag = true)
    private String deviceId;
    /**
     * 指标名称
     */
    @Column(name = "indicatorName", tag = true)
    private String indicatorName;
    /**
     * 是否告警  0:不告警  1:告警
     */
    @Column(name = "alarm", tag = true)
    private String alarm;
    /**
     * 告警级别
     */
    @Column(name = "level", tag = true)
    private String level;
    /**
     * 指标数值
     */
    @Column(name = "value")
    private Double value;
    /**
     * 时间序列
     */
    @Column(name = "time")
    private String time;
}

java">import cn.ybzy.demo.model.BbuCount;
import cn.ybzy.demo.model.BbuInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBResultMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class BbuInflux {

    @Autowired
    private InfluxDB influxDB;

    @Value("${spring.influx.db}")
    private String dbName;

    /**
     * 添加数据
     *
     * @param object
     */
    public void add(Object object) {

        Point.Builder pointBuilder = Point.measurementByPOJO(object.getClass());

        // 将对象中的所有属性转换为tag添加到point中
        Point point = pointBuilder.addFieldsFromPOJO(object)
                // 调用time方法设置当前时间,InfluxDB自动生成的time时间列为UTC时间,所以在存储时单独设置
                .time(LocalDateTime.now().plusHours(8).toInstant(ZoneOffset.of("+8")).toEpochMilli(), TimeUnit.MILLISECONDS)
                .build();

        // 设置要存储的数据库名称
        influxDB.setDatabase(dbName);
        // 将数据插入到表(Measurement)中
        influxDB.write(point);
        influxDB.close();
    }

    /**
     * 通用查询封装
     *
     * @param sql
     * @param clazz
     * @param <T>
     * @return
     */
    public <T> List<T> query(String sql, Class<T> clazz) {
        QueryResult queryResult = influxDB.query(new Query(sql, dbName));
        influxDB.close();
        InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
        return resultMapper.toPOJO(queryResult, clazz);
    }

    /**
     * 分页查询
     *
     * @param page
     * @param pageSize
     * @param start
     * @param end
     * @param deviceId
     * @return
     */
    public HashMap<String, Object> queryBbu(Long page, Long pageSize, String start, String end, String deviceId) {
        StringBuilder sql = new StringBuilder("select * from bbu  ");

        // 构建where条件
        sql.append(" where ");
        if (StringUtils.isNoneBlank(start)) {
            sql.append(" time>='" + start + "' ");
        }
        if (StringUtils.isNoneBlank(end)) {
            sql.append("and time<='" + end + "' ");
        }
        if (StringUtils.isNoneBlank(deviceId)) {
            sql.append("and deviceId=~/" + deviceId + "/ ");
        }

        // 分页
        sql.append("order by desc limit " + pageSize + " offset " + (page - 1) * pageSize);

        // 查询记录语句
        List<BbuInfo> quotaList = this.query(sql.toString(), BbuInfo.class);

        // 统计语句
        String countSql = "select count(value) from bbu ";
        List<BbuCount> bbuCount = this.query(countSql, BbuCount.class);

        if (bbuCount == null || bbuCount.size() <= 0) {
            return null;
        }

        Long totalCount = bbuCount.get(0).getCount();
        HashMap<String, Object> map = new HashMap<>();
        map.put("total", totalCount);
        map.put("data", quotaList);
        return map;
    }

    /**
     * 查询最新数据
     *
     * @param deviceId
     * @return
     */
    public HashMap<String, Object> queryLast(String deviceId) {
        String sql = " select last(value),* from bbu where deviceId='" + deviceId + "' group by deviceId";
        List<BbuInfo> quotaList = this.query(sql, BbuInfo.class);

        HashMap<String, Object> map = new HashMap<>();
        map.put("data", quotaList);
        return map;
    }
}

执行测试

java">@SpringBootTest
class SpringbootApplicationTests {

    @Autowired
    private BbuInflux bbuInflux;

    @Test
    public void testSave() {
        BbuInfo bbuInfo = new BbuInfo();
        bbuInfo.setDeviceId("0001");
        bbuInfo.setIndicatorName("温度");
        bbuInfo.setAlarm("0");
        bbuInfo.setLevel("正常");
        bbuInfo.setValue(65.55D);
        bbuInflux.add(bbuInfo);
    }

    @Test
    public void testQuery() {
        HashMap<String, Object> map = bbuInflux.queryBbu(1L, 10L, "2020-01-01", "2022-12-31", "");
        System.out.println("Total:" + map.get("total"));
        String toJSONString = JSON.toJSONString(map.get("data"));
        List<BbuInfo> bbuInfoList = JSON.parseArray(toJSONString, BbuInfo.class);
        for (BbuInfo bbuInfo : bbuInfoList) {
            // 时间格式处理
            LocalDateTime dateTime = LocalDateTime.parse(bbuInfo.getTime(), DateTimeFormatter.ISO_OFFSET_DATE_TIME);
            String time = dateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"));
            bbuInfo.setTime(time);
            System.out.println("bbuInfo:" + bbuInfo);
        }
    }

    @Test
    public void testQueryLast() {
        HashMap<String, Object> map = bbuInflux.queryLast("0001");
        String toJSONString = JSON.toJSONString(map.get("data"));
        List<BbuInfo> bbuInfoList = JSON.parseArray(toJSONString, BbuInfo.class);
        for (BbuInfo bbuInfo : bbuInfoList) {
            // 时间格式处理
            LocalDateTime dateTime = LocalDateTime.parse(bbuInfo.getTime(), DateTimeFormatter.ISO_OFFSET_DATE_TIME);
            String time = dateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"));
            bbuInfo.setTime(time);
            System.out.println("bbuInfo:" + bbuInfo);
        }
    }
}

java">> use demo
Using database demo
> select * from bbu
name: bbu
time                alarm deviceId indicatorName level  value
----                ----- -------- ------------- ----- ------
1642948114035000000 0     0001     温度           正常   56.5
> 

java">> select * from bbu   where  time>='2020-01-01' and time<='2022-12-31' order by desc limit 10 offset 0
name: bbu
time                alarm deviceId indicatorName level  value
----                ----- -------- ------------- -----  -----
1642950703148000000 0     0001     温度            正常    65.55
1642949227228000000 0     0001     温度            正常    89.56
1642948114035000000 0     0002     温度            正常    56.5

java">> select last(value),* from bbu where deviceId='0001' group by deviceId
name: bbu
tags: deviceId=0001
time                last  alarm indicatorName level  value
----                ----  ----- ------------- -----  -----
1642950703148000000 65.55 0     温度            正常   65.55
> 

http://www.niftyadmin.cn/n/876169.html

相关文章

FFmpeg+Video.js+Videojs-contrib-hls实现视频点播解决方案

FFmpegVideo.jsVideojs-contrib-hls实现视频点播解决方案视频点播解决方案HLSFFmpeg简单使用生成m3u8/ts文件码率的设置video.js下载video.js与videojs-contrib-hls常用属性与事件属性与事件的使用视频点播方案的实现页面HTML配置Nginx准备测试流媒体相关概念流媒体特征流式传输…

使用Web Uploader文件上传组件辅助Java实现断点续传

使用Web Uploader文件上传组件辅助Java实现断点续传一、断点续传断点续传概述文件分块文件合并二、Web UploaderWeb Uploader概述特点钩子方法事件三、服务端实现功能FileUploadControllerIFileUploadServiceFileUploadServiceImpl四、Web Uploader的使用引入webuploader初始化…

MySQL之慢查询和慢日志与mysqldumpslow与pt-query-digest慢查询日志分析工具的使用

MySQL之慢查询和慢日志与mysqldumpslow与pt-query-digest慢查询日志分析工具的使用慢查询与慢日志慢日志的查询与开启慢日志位置的查询及设置慢日志判断标准及修改判断标准分析慢查询日志&#xff08;核心)慢日志格式说明慢查询日志分析工具mysqldumpslow命令注意事项查看详细用…

Tomcat的性能优化点

Tomcat的性能优化点开启Tomcat管理界面禁用AJP协议启用线程池修改运行模式调整Tomcat相关JVM参数压缩属性设置远程jmx监控查TomcatJMeter压力测试开启Tomcat管理界面 修改tomcat\conf\tomcat-users.xml添加用户 <role rolename"manager"/> <role rolename…

Vue前端JavaScript实现PDF预览与图片预览

Vue前端JavaScript实现PDF预览与图片预览说明PDF.JS代码实现预览测试说明 图片预览&#xff1a;使用Blob创建一个指向类型化数组的URL PDF预览&#xff1a;借助PDF.JS实现 World预览&#xff1a;先Wolrd转PDF&#xff0c;再借助PDF.JS实现 PDF.JS pdf.js是 一个通用的、基…

Linux服务器搭建Hadoop单节点伪分布式

Linux服务器搭建Hadoop单节点伪分布式 官网&#xff1a;https://hadoop.apache.org/ 安装Hadoop 下载地址&#xff1a;https://archive.apache.org/dist/hadoop/core/ wget http://archive.apache.org/dist/hadoop/core/hadoop-3.3.2/hadoop-3.3.2.tar.gz解压且重命名 tar…

使用Kubeadm搭建Kubernetes集群

使用Kubeadm搭建Kubernetes集群kubeadmkubeadm概述搭建过程初始化准备关闭防火墙关闭Selinux关闭Swap添加主机名与IP对应关系配置内核参数配置时间同步搭建Kubernetes集群安装Docker设置Yum软件源安装kubeadm、kubelet、kubectl部署Kubernetes MasterNode节点加入K8S集群安装Po…

Minikube搭建Kubernetes集群

Minikube搭建Kubernetes集群Minikubekubectl下载Minikube启动MinikubeMinikube命令验证Minikube插件的使用管理集群额外启动参数测试Minikube minikube是一个工具&#xff0c; 能让你在本地运行一个单节点的Kubernetes集群&#xff0c;以便你来尝试Kubernetes或者开展每天的开发…