Node->Kafka开发测试(2)物联网大数据长什么样

很多东西,仅仅为了用而用,不要去纠结为什么。

之前一章中,在Windows环境下搭建一个Spark/Kafka开发测试环境,但是在实际情况中,都是使用SUSE/CentOS Linux这些操作系统会多一些。
当然,我们还需要更多的组件。例如:
Nginx:被用作反向代理(映射企业内网IP和外网IP的映射等),负载均衡(将http请求按照轮询和定义的规则发给不同的应用服务器来处理),减少单一服务的压力。
Node:用来接收和处理高并发的http简单请求,解析后,按照Topic(在Kafka中按照数据类型、区域等不同编码规则)不同创建出消息,然后发送给Kafka。

其实,Node的功能也可以使用Java来替代,但是Node对于处理高并发的简单数据请求业务更为高效,所以这里使用Node来做。而Nginx其实也可以不使用。因为Node里面提供一个http模块,基本上是从Nginx那边源码借鉴设计的,其实只用Node也够了。在超大量的并发请求中,Nginx可以帮助NodeJS服务器进行分流和,还是能提供一些帮助的。

本章节,其实和Windows没有任何关系了,这其实不重要,重要的是我们能够把这个流程从端到端的跑通,完成Hello,World的学习,这才是最重要的。其他的难度、高级的功能,无非是在这些之上的扩展,慢慢延伸出去的。

1/ 业务背景介绍

前一个章节中,我们已经介绍了在Window下如何搭建一个Spark、Kafka的环境,接下来继续学习:

  • 我们要发送什么数据,这些数据是干嘛用的?
  • 从哪里产生的?长成什么样?
  • 如何利用这些数据?最终数据要流向何方?
  • 如果对业务流程及其真实的最终作用都不清楚,算这么多数据,算来算去,有何意义?

物联网数据流的示意图 (1)- 数据产生部分

假设的业务背景是什么呢?

客户是某个汽车整车生产厂商,在整车厂把汽车销售出去之后,他就没什么事情了,最多对4S点做个回访,按客户需求卖点零件给4S店用于客户保养车辆的服务,以及获得从4S店的新车销售订单。

数据:研发&生产的过程数据
车辆在研发(试乘车)是研发阶段的运行测试数据(不同机动车条件路况环境,以及测试结果,其他的我说不来了)做更多数据挖掘工作。以及批量生产过程中的生产线后的整体测试数据进行收集。

数据:售后的数据
在互联网保养车的概念冲击下,这种传统4S店的服务越来越被动,客户希望能否从“大”数据中得到有用信息,更好的主动服务客户,保证消费者行车用车安全,并且将从获得的有价值信息提供给4S店(保养、增值、更换零件、客户关怀服务)、保险公司(危险驾驶、驾驶员信用打分、驾驶习惯、保险推荐等等、等等、其他的鬼话我说不来了)等。

具体的想法有很多:

1)获得车辆的数据(主数据本来就有),属性,行为(速度,加速度,刹车,急刹车,油耗),车况(里程、电池电压电流,温度骤变等、胎压),环境(室内外温度、冷冻液)等各种信息。

2)将这些实时获得的信息加工整理计算后得到一个实际的告警、或者是预测推断出来的结论,提供给4S店(或者是直接短信发给客户,通过App推送给客户)让其提供增值服务(您好,我是某奸商4S店的XXX,你是XXX吧,您的车电池温度,在最近5分钟内上升了多少度,赶紧去附近的进行检查,否则,再开下去就要要起火爆炸啦)

2/ 数据哪来的,需要什么数据

在我们假设的例子中,我们要在Kafka中要处理,并且发送给Spark Streaming处理的数据来自两个地方:

2.1 生产汽车的MES、测试系统

工业生产过程中、或者产品测试系统都有很完善的测试软件和系统,数据对接对接,其实就是发生在企业局域内中的事情,这个比较方便,有非常成熟的技术可以来实现数据的传输。

2.2 车载的智能终端、各种盒子、前端App等

这部分数据不是来自厂内、也不是汽车的零部件供应商提供的,毕竟大部分的整车车厂都不可能自己做某些关键部件,而且有些数据是需要发送给自己的供应商去分析的。

1)发动机、变速箱运行数据 (发给设备制造商,转速、油耗、功率、模式、路况、速度等等,用于跟踪和改进后续新型车辆的设计和质量跟踪

2)电动汽车的动力电池的运行数据(新能源汽车的最关键设备,因为电池会爆炸起火,所以电池很重要,发给动力制造的供应商来判断电动车的电池信息,售后支持和产品质量跟踪、改进、安全支持
例如:比亚迪、沃特玛、CATL、比克、力神、波士顿、环宇等等

3)车辆环境和传感器、胎压、内部告警、综合判断、故障、异常数(发给4S店提供客户关怀、售后支持、维修和用户提醒工作

4)监管、质保、安全运营(对于运营的电动大巴、政府有政策需要车企、以及动力电池的供应商、维修售后的单位共同保障安全

因此,数据是整车厂来收集、但是这些数据也不是整车厂全部自己来使用,有些数据它是有义务、有责任、协同的发送给其他的合作伙伴。对于共用的新能源电动汽车的运营车辆的信息,甚至需要提交给政府交通部门作为的安全报告、

3/ 数据怎么编码

这里以车辆的运行数据为例,假设这是从车辆发送出来的数据,具体每个车厂的数据和编码规则都不相同,这就是为什么前端数据集成整合这么难做成一个统一标准的原因(还好,目前国内正在有企业做这个工作,国外的话像Telit)。

车架号消息ID时间戳消息内容说明(消息中不包含)
XXX (车架号)BSC0012017030413203970
 XXX (车架号)BSP00120170304132039116.404,39.915
 XXX (车架号)EES0012017030413203940E – 环境
E – 电子电器
S – 传感器
001 – 当前单体温度
002 – 最高温度
003 – 最低温度
004 – 最高电压
005 – 单体最低电压
006 – 箱体电压(一组数据)

消息ID:我们国内的彩虹无限的数据编码规则例子(类似一个树状结构、差不多覆盖一辆汽车的400+数据点,每一个消息ID都代表一个数据点,例如EES001~EES200 之间的消息,是动力电池、电动机、电子设备的运行状态数据)。

这些数据如果做成一个宽表的话,不就是一个400列的信息记录嘛,只是因为不同时间段发出来的消息,所以可能无法合并成一行记录列,需要按照不同的消息分类来预处理这些消息,然后再进行计算)。

BSC001
类型代码
B: 行为类
E:环境类
分类代码
S: 速度
C: 操控
E: 电子电器
数据来源:
S: 传感器
C: CAN中线
P: 定位芯片
消息编码
001:当前速度

使用编码的目的:

  • 减少数据传输的量,有的消息内容
  • 统一数据使用规范
  • 结构化数据和提升数据质量

4/ 接受数据、解析数据

一辆车由很多的零部件组成,不同的信息经过处理后还会发送给不同的合作伙伴。这里以电池数据为例。看看我们收到数据到底是什么样的:

希望得到的数据结构:

车架号消息ID时间戳消息内容
SVGXXXEES0012017030413203940

实际收到的数据结构:

车架号消息ID时间戳消息内容
SVGXXXEES00620170304132039F067CC19204E5F64

它需要被解析成为一个可以直接让Spark Streaming收到后就可以进行处理的数据。那么什么格式比较方便呢?Kafka目前支持的三种数据格式:

  • String,可以做成CSV格式的,但是后续需要Spark Streaming自己来进行解析和分割字符串。
  • JSON,很容易解析,在Node中构建这样的JSON数组很容易
  • Avro,比JSON格式更加高效(数据传输少一半,速度提升2倍)的一种方式,但是不知道Spark Streaming处理起来如何

这里,我们肯定选择JSON格式啦!

解析后的数据格式

车架号消息ID时间戳单体
最高
电压
单体
最低
电压
单体
最低
电压
位置
单体
最高
温度
单体最低温度单体最高温度位置
SVGXXXEES00620170304132039300250183263278

在NodeJS中进行转换之后,发送给在Kafka中的Topic的消息内容为:

备注:EES006和其他的消息ID不一样,代表着电动车的电池箱的当前某个时间点的5个数据点,这些数据是TBOX从BSM里面拿到后封装了发送出来了。我们需要对他进行解析、而这个解析的过程需要在发送给Kafka之前就解析出来。

所以过程就是:

4.1 NodeJS的http服务(Windows环境下)

感觉Window环境下做Node其实满作孽的,建议尽快的切换到Linux、MacOS上去,要不是公司电脑是Windows,我才不在Windows下这样的测试。不过还好,再过几个月就可以换成MacOS了,幸福 – 假装很技术宅的样子。

  • 安装Node
  • 安装Microsoft .NET Framework 2.0 软件开发工具包 (SDK),以及Visual Studio C++,确保不要出现VCbuild.exe找不到这样的错误消息。这个是Window下安装Node-kafka的必备的百度或者Google搜索一下。
    或者是
    安装NodeJS的Windows编译工具包 (打开cmd的命令行run as administrator ),这种方式比较方便,不需要你安装VCBuild这些工具。

  • 安装NodeJS的Kafka的npm包、以及其他的包(Jquery、express等),取决于你的Node程序写的有多复杂。

Kafka-node是node里面一个实现Kafka的Producer、Consumer、Client、Consumer Group、offSet Management的包。我们会用它来创建消息、发送消息给Kafka(ZooKeeper)。

Avsc是Apache Avro项目的纯Javascript实现,目前看来比JSON格式,解析速度比JSON快2倍、并且数据传输量只有JSON的2分之一。在对于有大量的消息传输时候,其实可以可以考虑使用。本例中,为了简化,暂时不使用,先挖个坑。

备注:这里没有npm install kafka-node -g的模式来安装,主要是因为我们要开发的这个node的服务,是直接在本地Windows环境的WebStorm中来运行的,没有将其作为一个服务放在Node服务器端来进行。所以只是在WebStorm的项目工程下,执行npm install kafka-node,使其成为项目的本地node_modules了。

4.2 开发NodeJS的服务来接收数据

目前情况:

  • Zookeeper server运行在本地:2181端口
  • Kafka运行本地,因为只用了broker,所以是9092端口
  • Nodejs的一个服务运行在本地的8125端口(下面的代码)

使用WebStorm来创建一个项目工程,然后创建一个server.js的文件,这个js的文件node server.js启动以后,专门用来接收和解析(以下代码没有做解码的动作)来自应用网关的http请求数据。

测试这个程序:

直接在WebStorm中run这个server.js的话,服务就会启动。

然后,在Chorme浏览器中启用postman工具,然后选择POST协议,BODY部分选择Application/JSON格式,然后在Body中填入需要传入JSON格式数据,就可以把数据发送给Kafka了。

下一章中,我们将学习使用Postman、JMeter、NodeJS、Kafka-Node来发送消息。

Leave a Reply

Your email address will not be published.