Java操作Influxdb2.x

news/2024/7/3 17:30:17 标签: java, 时序数据库

本片文章不讲怎么安装,只讲安装后如何用JAVA代码操作库表

  • 1.创建数据库
  • 2.为bucket添加TELEGRAF配置
  • 3.TELEGRAF配置参数说明
  • 4.配置数据库的访问权限API TOKENS
  • 5.JAVA代码操作库表
    • 5.1 yaml
    • 5.2 pom依赖
    • 5.3 config
    • 5.4 controller
    • 5.5 查询方法、结果集提取方法

1.创建数据库

Influxdb2.x是有管理界面平台的,以本地为例,游览器访问 :http://127.0.0.1:8086,登录后,即可看到该界面,根据图片顺序操作即可

在这里插入图片描述

这里的bucket(桶)就是数据库

在这里插入图片描述

选择(配置)数据库数据保存策略

在这里插入图片描述

2.为bucket添加TELEGRAF配置

在这里插入图片描述

这里选择第1步创建的数据库

在这里插入图片描述

来源这里数据sys后会自动筛选,点击即可

在这里插入图片描述

点击后,右下角的创建按钮会亮起,点击按钮进行配置

在这里插入图片描述

数据库的配置文件名称后,点击保存和测试,配置内容不需要自己填写会自动生成

在这里插入图片描述

保存后出现该界面代表创建完成,会返回给两个配置信息:export INFLUX_TOKEN 和 telegraf --config
点击后关闭界面

在这里插入图片描述

点击配置文件名称会打开配置文件的内容

在这里插入图片描述

配置内容如下:

# Configuration for telegraf agent
[agent]
  ## Default data collection interval for all inputs
  interval = "10s"
  ## Rounds collection interval to 'interval'
  ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
  round_interval = true

  ## Telegraf will send metrics to outputs in batches of at most
  ## metric_batch_size metrics.
  ## This controls the size of writes that Telegraf sends to output plugins.
  metric_batch_size = 1000

  ## Maximum number of unwritten metrics per output.  Increasing this value
  ## allows for longer periods of output downtime without dropping metrics at the
  ## cost of higher maximum memory usage.
  metric_buffer_limit = 10000

  ## Collection jitter is used to jitter the collection by a random amount.
  ## Each plugin will sleep for a random time within jitter before collecting.
  ## This can be used to avoid many plugins querying things like sysfs at the
  ## same time, which can have a measurable effect on the system.
  collection_jitter = "0s"

  ## Default flushing interval for all outputs. Maximum flush_interval will be
  ## flush_interval + flush_jitter
  flush_interval = "10s"
  ## Jitter the flush interval by a random amount. This is primarily to avoid
  ## large write spikes for users running a large number of telegraf instances.
  ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
  flush_jitter = "0s"

  ## By default or when set to "0s", precision will be set to the same
  ## timestamp order as the collection interval, with the maximum being 1s.
  ##   ie, when interval = "10s", precision will be "1s"
  ##       when interval = "250ms", precision will be "1ms"
  ## Precision will NOT be used for service inputs. It is up to each individual
  ## service input to set the timestamp at the appropriate precision.
  ## Valid time units are "ns", "us" (or "µs"), "ms", "s".
  precision = ""

  ## Log at debug level.
  # debug = false
  ## Log only error level messages.
  # quiet = false

  ## Log target controls the destination for logs and can be one of "file",
  ## "stderr" or, on Windows, "eventlog".  When set to "file", the output file
  ## is determined by the "logfile" setting.
  # logtarget = "file"

  ## Name of the file to be logged to when using the "file" logtarget.  If set to
  ## the empty string then logs are written to stderr.
  # logfile = ""

  ## The logfile will be rotated after the time interval specified.  When set
  ## to 0 no time based rotation is performed.  Logs are rotated only when
  ## written to, if there is no log activity rotation may be delayed.
  # logfile_rotation_interval = "0d"

  ## The logfile will be rotated when it becomes larger than the specified
  ## size.  When set to 0 no size based rotation is performed.
  # logfile_rotation_max_size = "0MB"

  ## Maximum number of rotated archives to keep, any older logs are deleted.
  ## If set to -1, no archives are removed.
  # logfile_rotation_max_archives = 5

  ## Pick a timezone to use when logging or type 'local' for local time.
  ## Example: America/Chicago
  # log_with_timezone = ""

  ## Override default hostname, if empty use os.Hostname()
  hostname = ""
  ## If set to true, do no set the "host" tag in the telegraf agent.
  omit_hostname = false
[[outputs.influxdb_v2]]
  ## The URLs of the InfluxDB cluster nodes.
  ##
  ## Multiple URLs can be specified for a single cluster, only ONE of the
  ## urls will be written to each interval.
  ##   ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"]
  urls = ["http://127.0.0.1:8086"]

  ## Token for authentication.
  token = "$INFLUX_TOKEN"

  ## Organization is the name of the organization you wish to write to; must exist.
  organization = "org"

  ## Destination bucket to write into.
  bucket = "db2"

  ## The value of this tag will be used to determine the bucket.  If this
  ## tag is not set the 'bucket' option is used as the default.
  # bucket_tag = ""

  ## If true, the bucket tag will not be added to the metric.
  # exclude_bucket_tag = false

  ## Timeout for HTTP messages.
  # timeout = "5s"

  ## Additional HTTP headers
  # http_headers = {"X-Special-Header" = "Special-Value"}

  ## HTTP Proxy override, if unset values the standard proxy environment
  ## variables are consulted to determine which proxy, if any, should be used.
  # http_proxy = "http://corporate.proxy:3128"

  ## HTTP User-Agent
  # user_agent = "telegraf"

  ## Content-Encoding for write request body, can be set to "gzip" to
  ## compress body or "identity" to apply no encoding.
  # content_encoding = "gzip"

  ## Enable or disable uint support for writing uints influxdb 2.0.
  # influx_uint_support = false

  ## Optional TLS Config for use on HTTP connections.
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false
# Read metrics about system load & uptime
[[inputs.system]]
  # no configuration

3.TELEGRAF配置参数说明

在这里插入图片描述

这四个key的值就对应的是JAVA应用程序中yaml中的配置的四个属性值,分别是url、token、org、bucket
注意:2.x版本是通过这四个属性来访问的,不再是账号和密码了
其中token需要提一嘴,token的值就是第二步创建完配置文件后返回的两个配置文件中的 export INFLUX_TOKEN

得到这四个配置属性后就可以操作数据库了吗 ???
NONONO,网上的资料比较杂乱,很多文章并没有讲到这一步,我是在这一步踩坑了,继续往看

经过测试发现了问题,注意这个TOKEN是数据库配置的TOKEN虽然可以连接到数据库并成功插入数据,但是并不具备访问的权限的,也就是说只能保存不能进行其他操作。查询报错:HTTP status code: 404; Message: failed to initialize execute state: could not find bucket “XX”

应用程序通过依赖中的API来访问的库,报错的原因其实就是缺少了最重要的API访问权限配置,网上的资料里没讲这块,贼坑

4.配置数据库的访问权限API TOKENS

在这里插入图片描述

勾选需要通过API访问的库和库的配置文件,其他权限根据自己情况来

在这里插入图片描述
点击创建后,会弹出生成的API访问的TOKENS,该TOKENS直接替换掉yaml配置文件中的token即可
在这里插入图片描述

5.JAVA代码操作库表

5.1 yaml

#influx配置
influx2:
  url: http://127.0.0.1:8086
  token: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX==写自己的
  org: org
  bucket: db2

5.2 pom依赖

这里我没选择高版本依赖是因为和项目中的依赖存在冲突,高版本依赖提供了对2.x以上版本的兼容API
高版本和低版本的依赖都可以操作2.x版本,这里根据自己的实际情况来决定即可

        <!--InfluxDB-->
        <dependency>
            <groupId>com.influxdb</groupId>
            <artifactId>influxdb-client-java</artifactId>
            <!--<version>6.9.0</version>-->
            <version>3.0.1</version>
        </dependency>

5.3 config

java">package net.influx.com.config;


import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author luo zhuo tao
 * @create 2023/8/29
 */
@Configuration
@ConfigurationProperties(prefix = "influx2")
public class InfluxdbConfig {

    private static final Logger logger = LoggerFactory.getLogger(InfluxdbConfig.class);

    private String url;
    private String token;
    private String org;
    private String bucket;

    @Bean
    public InfluxDBClient influxDBClient(){
        InfluxDBClient influxDBClient = InfluxDBClientFactory.create(url,token.toCharArray(),org,bucket);
        //日志级别可用可不用
        influxDBClient.setLogLevel(LogLevel.BASIC);
        if (influxDBClient.ping()){
            logger.info("InfluxDB时序数据库2.x---------------------------------------------连接成功!");
        }else {
            logger.info("InfluxDB时序数据库2.x---------------------------------------------连接失败!");
        }
        return influxDBClient;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public void setToken(String token) {
        this.token = token;
    }

    public void setOrg(String org) {
        this.org = org;
    }

    public void setBucket(String bucket) {
        this.bucket = bucket;
    }
}

5.4 controller

java">package net.influx.com.controller;

import com.alibaba.fastjson.JSON;
import com.influxdb.client.*;
import com.influxdb.client.domain.InfluxQLQuery;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxTable;
import com.influxdb.query.InfluxQLQueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;

/**
 * @author luo zhuo tao
 * @create 2023/8/29
 */

@RestController
@RequestMapping("influxdb")
public class InfluxdbController {

    private static final Logger logger = LoggerFactory.getLogger(InfluxdbController.class);

    @Resource
    private InfluxDBClient influxDBClient;

    @Value("${influx2.org:''}")
    private String org;

    @Value("${influx2.bucket:''}")
    private String bucket;

    private String table = "test1";

    @GetMapping("test")
    public String test() {
        /**
         * 写入:WriteApiBlocking 同步写入API WriteApi 异步写入API
         */
        if (false) {
            WriteApiBlocking writeApiBlocking = influxDBClient.getWriteApiBlocking();
            Point point = Point
                    .measurement(table)
                    .addField(String.valueOf(System.currentTimeMillis()), UUID.randomUUID().toString())
                    .time(Instant.now(), WritePrecision.NS);
            writeApiBlocking.writePoint(point);
        }

        /**
         * 查询:QueryApi 同步查询API InfluxQLQueryApi SQL查询API
         */
        if (false){
            InfluxQLQueryApi influxQLQueryApi = influxDBClient.getInfluxQLQueryApi();
            InfluxQLQuery influxQLQuery = new InfluxQLQuery("SELECT * FROM test1", bucket);
            InfluxQLQueryResult query = influxQLQueryApi.query(influxQLQuery);
            logger.info("query:{}", JSON.toJSONString(query));
            findAll();
        }

        /**
         * 删除
         */
        DeleteApi deleteApi = influxDBClient.getDeleteApi();
        deleteApi.delete(OffsetDateTime.now(), OffsetDateTime.now(),"",bucket,org);
        return "查询成功";
    }




    /**
     * @param measurement 表名
     */
    public void save(String measurement) {
        WriteOptions writeOptions = WriteOptions.builder()
                .batchSize(5000)
                .flushInterval(1000)
                .bufferLimit(10000)
                .jitterInterval(1000)
                .retryInterval(5000)
                .build();
        try (WriteApi writeApi = influxDBClient.getWriteApi(writeOptions)) {
            Point point = Point
                    .measurement(measurement)
                    .addField("MMSI".concat(UUID.randomUUID().toString()), UUID.randomUUID().toString())
                    .time(Instant.now(), WritePrecision.NS);
            writeApi.writePoint(bucket, org, point);
        }
    }


    public List<FluxTable> findAll() {
        String flux = "from(bucket: \"db3\")\n" +
                "  |> range(start:0)\n" +
                "  |> filter(fn: (r) => r[\"_measurement\"] == \"test1\")\n" +
                "  |> yield(name: \"mean\")";
        QueryApi queryApi = influxDBClient.getQueryApi();
        List<FluxTable> tables = queryApi.query(flux, org);
        logger.info("tables:{}", JSON.toJSONString(tables));
        return tables;
    }
}

5.5 查询方法、结果集提取方法

这里用了两种方式查询,一个是直接通过key查、一个是根据时间维度查询,具体的自己去研究flux语法这里不详细讲

java">package net.superlucy.departure.monitor.app.service.impl;

import cn.hutool.core.collection.CollectionUtil;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.QueryApi;
import com.influxdb.client.WriteApi;
import com.influxdb.client.WriteOptions;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import net.superlucy.departure.monitor.app.service.InfluxdbService;
import net.superlucy.departure.monitor.app.util.CommonUtil;
import net.superlucy.departure.monitor.dto.enums.InfluxdbEnum;
import net.superlucy.departure.monitor.dto.model.DepartureShipPosition;
import org.apache.commons.compress.utils.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.time.Instant;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author luo zhuo tao
 * @create 2023/9/4
 */
@Service
public class InfluxdbServiceImpl implements InfluxdbService {

    private static final Logger logger = LoggerFactory.getLogger(InfluxdbServiceImpl.class);

    /**
     * 通过MMSI号查询SQL:响应单条数据
     */
    private String queryValueFluxOne = "from(bucket: \"%s\") " +
            "|> range(start: %s) " +
            "|> filter(fn: (r) => r._measurement == \"%s\" and r._field == \"%s\")" +
            "" +
            "";

    /**
     * 通过时间范围查询SQL:响应多条数据
     */
    private String queryValueFluxTwo = "from(bucket: \"%s\") " +
            "|> range(start: %s) " +
            "|> filter(fn: (r) => r._measurement == \"%s\")" +
            "" +
            "";

    @Resource
    private InfluxDBClient influxDBClient;

    @Value("${influx2.org:''}")
    private String org;

    @Value("${influx2.bucket:''}")
    private String bucket;


    @Override
    public Map<String, Object> findOne(InfluxdbEnum influxdbEnum, String mmsi) {
        String flux = String.format(queryValueFluxOne, bucket, 0, influxdbEnum.getValue(), mmsi);
        QueryApi queryApi = influxDBClient.getQueryApi();
        List<FluxTable> tables = queryApi.query(flux, org);
        return qryVal(tables);
    }

    public Map<String, Object> qryVal(List<FluxTable> tables) {
        Map<String, Object> map = new HashMap<>();
        if (CollectionUtil.isNotEmpty(tables)) {
            for (FluxTable table : tables) {
                List<FluxRecord> records = table.getRecords();
                for (FluxRecord fluxRecord : records) {
                    map.put("value", fluxRecord.getValue().toString());
                    map.put("field", fluxRecord.getField());
                    map.put("valueTime", Date.from(fluxRecord.getTime()));
                }
            }
        }
        return map;
    }

    @Override
    public List<Map<String, Object>> findList(InfluxdbEnum influxdbEnum, String date) {
        String flux = String.format(queryValueFluxTwo, bucket, date, influxdbEnum.getValue());
        QueryApi queryApi = influxDBClient.getQueryApi();
        List<FluxTable> tables = queryApi.query(flux, org);
        return qryValList(tables);
    }

    @Override
    public Map<String, DepartureShipPosition> getDynamicList(InfluxdbEnum influxdbEnum, String date) {
        String flux = String.format(queryValueFluxTwo, bucket, date, influxdbEnum.getValue());
        QueryApi queryApi = influxDBClient.getQueryApi();
        List<FluxTable> tables = queryApi.query(flux, org);
        return dynamicList(tables);
    }

    /**
     * 查询所有船舶最新位置信息
     * @param tables
     * @return
     */
    private Map<String, DepartureShipPosition> dynamicList(List<FluxTable> tables) {
        Map<String, DepartureShipPosition> map = new HashMap<>();
        if (CollectionUtil.isNotEmpty(tables)) {
            for (FluxTable table : tables) {
                List<FluxRecord> records = table.getRecords();
                //直接用时间维度查询,会出
                // 现同一个Field多条数据的情况,这里只需要最新的数据,时间的排序是从远到近的,所以直接拿最后一条即可
                FluxRecord fluxRecord = records.get(records.size() - 1);
                DepartureShipPosition position = new DepartureShipPosition();
                String mmsi = fluxRecord.getField();
                String value = fluxRecord.getValue().toString();
                /**
                 * 动态格式转换方法是我自己业务里面的方法,不用管
                 * String mmsi = fluxRecord.getField();
                 * String value = fluxRecord.getValue().toString();
                 * 这两个get方法是已经获取到存储的数据结果了,后续处理根据自己业务需求来即可
                 */
                // 动态格式转换
                DepartureShipPosition dynamic = CommonUtil.dynamic(position, value);
                map.put(mmsi,dynamic);
            }
        }
        return map;
    }


    /**
     *
     * @param tables
     * @return
     */
    public List<Map<String, Object>> qryValList(List<FluxTable> tables) {
        List<Map<String, Object>> mapList = Lists.newArrayList();
        if (CollectionUtil.isNotEmpty(tables)) {
            for (FluxTable table : tables) {
                List<FluxRecord> records = table.getRecords();
                //直接用时间维度查询,会出现同一个Field多条数据的情况,这里只需要最新的数据,时间的排序是从远到近的,所以直接拿最后一条即可
                FluxRecord fluxRecord = records.get(records.size() - 1);
                Map<String, Object> map = new HashMap<>(1);
                    map.put("value", fluxRecord.getValue().toString());
                    map.put("field", fluxRecord.getField());
                    map.put("valueTime", Date.from(fluxRecord.getTime()));
                    mapList.add(map);
            }
        }
        return mapList;
    }


    /**
     * @param measurement 表名
     * @param k           MMSI号
     * @param v           ASI数据
     */
    @Override
    public void save(String measurement, String k, String v) {
        WriteOptions writeOptions = WriteOptions.builder()
                .batchSize(5000)
                .flushInterval(1000)
                .bufferLimit(10000)
                .jitterInterval(1000)
                .retryInterval(5000)
                .build();
        try (WriteApi writeApi = influxDBClient.getWriteApi(writeOptions)) {
            Point point = Point
                    .measurement(measurement)
                    .addField(k, v)
                    .time(Instant.now(), WritePrecision.NS);
            writeApi.writePoint(bucket, org, point);
        }
    }
}

http://www.niftyadmin.cn/n/5027280.html

相关文章

会话跟踪cookie、session、token

会话是指一个终端用户与交互系统进行通讯的过程&#xff0c;比如从输入账户密码进入操作系统到退出操作系统就是一个会话过程。会话较多用于网络上&#xff0c;TCP的三次握手就创建了一个会话&#xff0c;TCP关闭连接就是关闭会话。 客户与服务器之间的HTTP连接是一种一次性连…

Java——》synchronized互斥性

推荐链接&#xff1a; 总结——》【Java】 总结——》【Mysql】 总结——》【Redis】 总结——》【Kafka】 总结——》【Spring】 总结——》【SpringBoot】 总结——》【MyBatis、MyBatis-Plus】 总结——》【Linux】 总结——》【MongoD…

QT 点击窗口外区域 当前窗口自动关闭

想要通过弹出自定义窗口展示自定义的一些信息&#xff0c;同时也希望像右键菜单一样&#xff08;点击非菜单区域&#xff0c;菜单自动关闭&#xff09;的效果&#xff0c;那么你可以按照以下两种方式进行尝试&#xff1a; 设置窗口标识的方式 在构造函数中添加以下代码&#…

Linux系统编程6(线程互斥,锁,同步,生产消费模型)

上篇文章介绍完线程的概念后&#xff0c;我们将在这篇文章中初步探讨线程编程以及线程应用中的问题&#xff0c;这篇文章将以抢票系统为例&#xff0c;贯穿整篇文章。笔者将介绍在多线程编程中会出现的问题&#xff0c;什么是同步&#xff1f;什么是互斥&#xff1f;为什么多线…

JS Array 操作方法合集

数组方法 1. 创建数组1.1 使用 Array 构造函数1.2 Array.of 方法创建数组(es6 新增)1.3 Array.from 方法创建数组(es6 新增) 2. join 方法3. split 方法4. 数组的翻转和排序(改变数组)4.1 reverse 翻转数组4.2 sort 排序 5. concat 拼接方法6. slice 截取方法6.1 arr.slice(sta…

ASEMI二极管1N4148(T4)的用途和使用建议

编辑-Z 二极管是一种常见的电子元件&#xff0c;其中1N4148&#xff08;T4&#xff09;是一款广泛使用的快恢复二极管。它具有快速的开关特性和高反向阻挡能力&#xff0c;适用于多种电子应用。本文将介绍1N4148&#xff08;T4&#xff09;的特点、用途和如何正确使用该二极管…

APP自动化之weditor工具

由于最近事情颇多&#xff0c;许久未更新文章。大家在做APP自动化测试过程中&#xff0c;可能使用的是Appium官方提供的inspect进行元素定位&#xff0c;但此工具调试不方便&#xff0c;于是今天给大家分享一款更好用的APP定位元素工具&#xff1a;weditor weditor基于web网页…

数学建模熵权法中信息熵与信息熵冗余度的理解

具体步骤&#xff1a;数学建模——熵权法-腾讯云开发者社区-腾讯云 (tencent.com) 灵感来源&#xff1a;信息熵越大&#xff0c;信息量到底是越大还是越小&#xff1f; - 骚动的白米饭的回答 - 知乎 https://www.zhihu.com/question/274997106/answer/1055696026 信息熵在第二…