NodeJS->Kafka->Spark开发测试(3)发送/接收/解析/发送

很多东西,仅仅只是为了用而用。像蜻蜓一样,点一下水,了解一下这个水波纹,真的要去用的时候,我们再去潜水,虽然是个简单的demo,但是一趟hello,world下来,我们依然会很有收获。

在前面一章中,看到数据的来源及其格式,以及通过NodeJS来发送数据消息。接下来,我们来看如何构建一个http消息接收接收、解析和转发的Node服务。

数据流是:汽车TBOX(2G/3G短信)——->应用网关——->Node Web服务器——->HBase\Hive & Kafka(ZooKeeper)——->订阅后的处理(本章不讲解)

1/ 数据分发的策略

目前根据国家的政策,从车辆采集数据的渠道必须是由车辆的整车制造企业来负责完成,这其实也意味着,车辆运行的所有的数据的来源渠道只有整车厂,其他的合作伙伴需要拿到车辆的运行数据,必须通过整车厂这道关才能拿到。

整车厂的数据分发策略(假想)

1.1 整车厂内部先存储数据(待完善)

整车厂在拿到数据之后,直接将数据写入自己的数据中心的HBase,后面再批处理计算后再写入Hive,供Spark作业计算后,输出结果放入到MySQL,让用业务用户通过自开发应用对这些数据进行分析,或者使用可视化的工具进行分析。

车辆车况、环境数据、运行数据可以作为售后的产品质量分析和新车型改进的一个数据源,结合生产过程的供应商来料数据可以来看产品的质量。

1.2 对外分发数据

整车厂几乎掌握有这个车辆的所有数据:

  • 车主是谁,身份证号码
  • 车主的贷款、手机、经济状况(有无贷款)、从征信机构拿到信用卡信息、从车险公司拿到整体的保险购买情况
  • 发动机号+车架号是多少—->汽车车牌号是多少—>有无违章
  • 车辆在哪儿,今天去了哪儿,经常去哪儿,一周和一天的车辆行驶轨迹
  • 车辆上经常坐几个人、每天跑多少公里、油耗
  • 喜欢急刹吗?急加速?——>驾驶员行为是危险驾驶?稳重驾驶员?
  • 喜欢车震?通过(车辆所处位置、开关机、空调使用、乘坐人员数量、车辆悬挂晃动频率指数等等)

乍一看,有些数据是十分隐私的,特别是GPS信息,所以对外分发数据的时候,需要过滤掉全部的GPS信息(但是可以通过GPS的位置信息计算出所在城市的信息),车辆的所在城市对于车辆出了故障之后的维修和保养,异常检测来说,可以分配到当地的售后服务网点。

对于4S店,分发:

  • 车辆的运行数据、保养数据、车况
  • 车辆内的所有的错误代码(胎压不足、故障灯记录等等)车辆的里程、油耗等

对关键零部件供应商,分发:

  • 环境数据、动力、能耗、里程、瞬间速度、转速、时速、加速度等
  • 动力电池的运行数据、电压、电流、温度、充放电数据、输出电流、欠压、平均温度、平均电压、故障代码等
  • 发动机(电动机)的能耗使用、运行数据等
  • 车辆目前所在城市信息

对保险公司,分发:

  • 环境数据(车温、车灯开启)、动力、能耗、里程、瞬间速度、加速度、瞬时油耗/能耗
  • 所在城市信息
  • 碰撞感应、时间、门开启状态、安全带、副驾驶

1.3 传输数据和订阅定义的策略

安全:走公网、还是VPN

性能:数据传输量

经济型:费用和架构,对现有系统的改造

可行方案:最简洁的方案、自己可控的方案,使用公有云的消息服务吗? 还是自己搭建Kafka的集群对外进行数据分发服务。

2/ 利用NodeJS快速构建处理消息的http服务

2.1 NodeJS来干嘛来了

在Web应用服务器层面为什么选择NodeJS,而不是Java,其实这是由很多原因决定的,主要原因如下:

  • 实现起来简单,够快
    实现原型应用的速度快、代码少、适合做这样的实验
  • 运行起来够快
  • 丰富的社区和功能NPM包
  • 流行、互联网和移动应用的必选
  • 业务流程和需求决定
    这个Demo的业务流程就是构建一个足够强大的对外的http服务,没有什么复杂的业务逻辑,就是接受请求,解析,然后写数据库
  • 面对极其高的并发http请求

除了Node之外,在后面可能还会使用其他的一些编程语言、例如Scala、Java等。

2.2 利用NodeJS快速构建一个http服务

首先从NodeJS开始,前面还没有好好的了解这些代码的说明,这段代码是Kafka-node在Github上的示例程序。

以上的业务逻辑是

  • 启动服务后,接受来自http的POST请求
  • 返回响应200(OK),并且content-type为text/html格式
  • 将传入的数据原封不动的直接作为Kafka 消息的内容发送出去

那我们开始测试一下这个数据链路是否畅通?并发性能如何?

3/ 测试一把看看

  • 启动Zookeeper Server
    zkserver
  • 启动Kafka Server

    kafka-server-start.bat C:\bigdata\kafka_2.11-0.11.0.0\config\server.properties

     

  • 启动kafka console consumer来接收消息
    kafka-console-consumer.bat –zookeeper localhost:2181 –topic test3 –from-beginning
  • 检查有那些topic可以使用,暂时先用test3这个topic
    kafka-topics –list –zookeeper localhost:2181
  • 启动server.js服务,在项目文件夹目录下直接执行命令
    node server.js
    然后启动浏览器:http://localhost:8125/upload


  • 启动postman,发送测试数据数据

  • 观察server.js的console输出
    在本例中,从http的POST请求发来的数据,在server.js服务中都会直接输出到console中,主要是为了调试和测试。同时,发送到Kafka Broker中。
  • 观察Kafka-Console-Consumer是否拿到消息
    如果它能拿到消息,这意味着我们通过其他的方式也能订阅到这些消息。

到这里为止,我们看到消息已经从Postman发到NodeJS,然后生成消息发送给Kafka了。这说明,消息已经成功传递。

3/ 数据的规模

目前,我们只是模拟一个Postman客户端来发送数据,如果有1万辆车、30万辆车发送数据呢?那这个服务器集群处理的数据量将是海量的请求。还好基于Nginx+Node的架构可以帮我们解决来自前端海量的百万级并发。所以,并发请求将不是问题,只是我们需要了解一下这个情况。

假设:1辆车

  • 每隔10秒发送一次高优先级数据(只发送这个时间戳点的数据,不是把10秒钟的数据打包发送)
    一次0.5K(500字节,应该可以覆盖100个以上的数据点)
  • 每隔30秒发送一次低优先级数据(同上)
    一次1k(1024,可以覆盖300以上的数据点)
  • 合计:每分钟5kb的数据量
  • 每天开车8小时(8小时在路上,8小时在停车,8小时晚上睡觉)

1辆车1天 = 8 * 60 * 5k = 2.3MB数据
1辆车1天 = 8 * 60 * 8 =  3840条消息

车辆每天处理消息(亿条)每天(GB)每周(GB)每月(GB)每年(TB)
5万辆1.9112784336039
10万辆3.82241568672078
20万辆7.6448313613440156
40万辆15.2896627226880312

一辆车每隔10秒就发送一条消息,每次0.5K(Kafka处理5KB以内的数据是比较合适的),按照40万辆的规模的,那么:

每隔10秒钟,就会有40万条的短消息报文发给通过2G/3G网络发给应用网关,但是实际上的情况不是这样,毕竟每隔车辆的时钟都有一点偏移,所以我们可以想象成为40万辆车在10秒钟发出了40万条消息,实际这些车辆发消息的分布差不多是:每隔3秒,就发送13万条消息。

那么应用网关的收到的数据就是4万条/秒 = 19Mb/每秒数据量。

到这里为止,我们对于将要处理的数据量和性能要求,已经心里有底了。这些数字对于对于传统数据库和数据处理手段来说,还真是一个不小的麻烦?

是吗?不见得,来自互联网公司的这些开源“大数据”技术产品应该不会让我们失望,最多是不稳定,不好用了。

接下来的章节中,我们将学习如何处理这些海量的数据。

Leave a Reply

Your email address will not be published.