一、引言 1. 背景介绍 1.1 EMQX 是一个高性能的MQTT消息服务器,在当前场景用于处理大规模物联网设备的连接和上报的数据。
1.2 Telegraf 是一个收集、处理数据的组件,在当前场景用于收集物联网设备上报到EMQX服务器的数据,并且转发存储到InfluxDB数据库中。
1.3 InfluxDB 是一个时序数据库,专门用于存储和查询由时间索引的大量数据,在当前场景用于存储物联网设备上报的数据。
(三者的关系如下图,图中箭头指向为数据的流向)
2. 目标 通过IOT设备上报报文携带的租户信息,实现IOT设备数据在InfluxDB中的分表(分measurement)存储。
3. 实现方式 利用Telegraf的Processor插件来处理Input插件收集到的租户信息,根据租户信息将数据存储到指定measurement中。(此方法适用于EMQX开源版本,如果使用EMQX企业版可以不需要Telegraf组件直接将数据写入到InfluxDB中,具体方法没有去探究!)
二、准备工作 1. 安装(参考官方文档便可)
2. 配置(Telegraf的配置文件中需要使用)
添加EMQX连接的账号密码
添加InfluxDB的连接Token、创建organization、创建默认的bucket。
三、Telegraf配置详解(重点!) 1. 处理流程(重点中的重点!上图!)
这是telegraf官网搞来的流程图,大家只需要关注中间telegraf这块就行了,简而言之就是外包公司,左手收集金童玉女(数据),右手外包到指定的公司(存储服务器),我恨阿!
telegraf神通广大他可以从boss招(MQTT收集)、也可以从拉勾招(CPU收集),这些招聘路径相当于上图中的Input插件了,你爱从哪里收集就选用什么插件。毕竟telegraf是有良心的包包,对boy and girl们还得细细培训(Process)、对他们掌握的技能聚合统计(Aggregate)一番,才能给他们分配到合适的岗位,不然牛头不对马嘴人家都不爱跟他合作了。
整个过程就是:数据输入(input) -> 数据处理(Process) -> 数据聚合统计(Aggregate)-> 数据输出(output)
(这个过程千万!千万!千万要记住!掌握这个过程才能知道自己该如何处理收集到的数据)
2. Telegraf基础配置 接下来我们就是要配置Telegraf了,仅需小小一个telegraf.conf就能够把telegraf掌握在手掌中。
说来说去就这几点… 具体大家可以看看这个链接
我们此次主要用到的是它的input、Process、Output插件。
我们按照数据处理的流程来配置一下telegraf(仅展示了主题相关的字段),
首先是input插件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 ############################################################################### # INPUT PLUGINS # ############################################################################### # boy [[inputs.mqtt_consumer]] alias = "boy" servers = ["tcp://host.docker.internal:1883"] # 监听的emqx主题 topics = ["/boy/+"] username = "emqx_username" password = "emqx_password" # 输入数据的格式 data_format = "json" # 配置该input的名字,官方文档中是写“the base name of the measurement”,此measurement不仅仅 # 可以指InfluxDB中的measurement,还可以指代该input的名字,可以在Process插件中指定该名字进行处理 name_override = "boy" #字段配置(通过该字段形成measurement名,例如boy_java、boy_go..) json_string_fields = ["language"] tags = {tag1="language"} # girl [[inputs.mqtt_consumer]] alias = "girl" servers = ["tcp://host.docker.internal:1883"] # 监听的emqx主题 topics = ["/girl/+"] username = "emqx_username" password = "emqx_password" # 输入数据的格式 data_format = "json" # 配置该input的名字,官方文档中是写“the base name of the measurement”,此measurement不仅仅 # 可以指InfluxDB中的measurement,还可以指代该input的名字,可以在Process插件中指定该名字进行处理 name_override = "girl" #字段配置(通过该字段形成measurement名,例如girl_java、girl_go..) json_string_fields = ["language"] tags = {tag1="language"}
数据收集到了,接着处理一下数据吧,配置Process插件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 ############################################################################### # PROCESSOR PLUGINS # ############################################################################### # 调试时可以打开查看input插件收集到的数据 # [[processors.printer]] [[processors.template]] # 这里的作用是只处理名为boy的input插件收集的数据 namepass=["boy"] # 这里的作用是新增一个key为real_measurement_name的tag,并且赋值为template tag = "real_measurement_name" # 提取input收集的数据中key为language的value,并且和boy_进行拼接 template = 'boy_{{ .Tag "language" }}' [[processors.template]] # 这里的作用是只处理名为girl的input插件收集的数据 namepass=["girl"] tag = "real_measurement_name" # 提取input收集的数据中key为language的value,并且和girl_进行拼接 template = 'girl_{{ .Tag "language" }}' # 该插件的作用是将所有数据girl、 boy这两个插件的measurement改成key为real_measurement_name的值, # 也就是girl_java、girl_go、boy_java、boy_go.. [[processors.converter]] namepass=["girl", "boy"] [processors.converter.tags] measurement = ["real_measurement_name"] # 调试时可以打开查看经过Process插件处理过后的数据 #[[processors.printer]]
配好处理,接下来就要配置数据输出到influxDB了,也就是配置output插件:
1 2 3 4 5 6 7 [[outputs.influxdb_v2]] urls = ["http://host.docker.internal:8087"] token = "apiToken" organization = "organization" bucket = "bucketname" # 当前分measurement是在同一个bucket中,如果需要分bucket的话,可以研究一下bucket_tag这个配置 # bucket_tag = ""
配置好基础参数和上面的内容后,就可以打开telegraf的日志查看它的处理数据的情况了,或许你看不懂,那不要怕,我们再来了解一下数据在telegraf插件之间流转时的Line Protocol格式,了解完以后就看得懂日志了。
4. 数据传输过程的Line Protocol协议 面对大量的金童玉女,telegraf如果一个个去给他们填简历,那可忙不过来,因此他想了一个Line Protocol协议 。协议如下:
男 名字=大鼻涕,身高=188 爱好=挖鼻屎 骗进来的时间20240606,
怕大家看不懂中文下面翻译一个英文版的:
boy(measurement), name(tag)=big_bi_ti body_length(field)=188 pi_hao(field)=pick_shit 20240606(timestamp)
简而言之,这个Line Protocol协议的结构如下
测量(Measurement) :数据的名称,用于对不同类型的数据进行分类和标识。
标签(Tags) :键值对,可选但强烈推荐使用。标签在数据库中被索引,查询速度较快。
字段(Fields) :键值对,必填项。字段包含实际的数据值,与标签不同,字段不会被索引。
时间戳(Timestamp) :可选项,指定数据点记录的时间。如果未指定,InfluxDB 使用服务器的当前时间。
知道了进来的数据都遵守这个协议就好办了,我们…
=.= 接下来等再更吧,打这么多字累了..
(上面只是我最近使用时,翻阅官方文档和搜索所构建的知识体系,内容大都来自官方文档,尽量做到有据可依,如描述有误或者不准确,欢迎指正!不胜感激!!)