宇智波带土 2017-09-26
数据处理分为三大类:
结合前文讲述的数据源特点、分类、采集方式、存储选型、数据分析、数据处理,我在这里给出一个总体的大数据平台的架构。值得注意的是,架构图中去掉了监控、资源协调、安全日志等。
左侧是数据源,有实时流的数据(可能是结构化、非结构化,但其特点是实时的),有离线数据,离线数据一般采用的多为ETL的工具,常见的做法是在大数据平台里使用Sqoop或Flume去同步数据,或调一些NIO的框架去读取加载,然后写到HDFS里面,当然也有一些特别的技术存储的类型,比如HAWQ就是一个支持分布式、支持事务一致性的开源数据库。
从业务场景来看,如果我们做统计分析,就可以使用SQL或MapReduce或streaming或Spark。如果做查询检索,同步写到HDFS的同时还要考虑写到ES里。如果做数据分析,可以建一个Cube,然后再进入OLAP的场景。
在具体介绍本文内容之前,先给大家看一下Hadoop业务的整体开发流程:
从Hadoop的业务开发流程图中可以看出,在大数据的业务处理过程中,对于数据的采集是十分重要的一步,也是不可避免的一步,从而引出我们本文的主角—Flume。本文将围绕Flume的架构、Flume的应用(日志采集)进行详细的介绍。
flume的特点:
flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。
flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
先看一下我们数据处理的主要步骤,首先是我们SDK采集数据,采集数据之后,首先把它扔到我们的消息队列里做一个基础的持久化,之后我们会有两部分,一部分是实时统计,一部分是离线统计,这两部分统计完之后会把统计结果存下来,然后提供给我们的查询服务,最后是我们外部展示界面。我们的数据平台主要基于中间的四个绿色的部分。
关于要求,对消息队列来说肯定是吞吐量一定要大,要非常好的扩展性,如果有一个消息的波峰的话要随时能够扩展,因为所有的东西都是分布式的,所以要保证节点故障不会影响我们正常的业务。
我们的实时计算目前采用的是分钟级别的实时,没有精确到秒级,离线计算需要计算速度非常快,这两部分我们当初在考虑的时候就选用了Spark,因为Spark本身既支持实时,又支持离线,而且相对于其他的实时的方案来说,像Flink或者是Storm和Samza来说,我们不需要到秒级的这种实时,我们需要的是吞吐量,所以我们选择Spark。实时部分用的是Spark streaming,离线部分用的是Spark offline的方案。
查询方案因为我们要支持多个维度的组合排序,所以我们希望支持sql,这样的话各种组合排序就可以转化成sql的group和order操作。
消息队列我们选择的是Kafka,因为在我们看来,Kafka目前是最成熟的分布式消息队列方案,而且它的性能、扩展性也非常好,而且支持容错方案,你可以通过设置冗余来保证数据的完整性。 Kafka目前得到了所有主流流式计算框架的支持,像Spark, Flink, Storm, Samza等等;另外一个就是我们公司的几个创始人都来自于LinkedIn,他们之前在LinkedIn的时候就已经用过Kafka,对Kafka非常熟,所以我们选择了Kafka。
但选定Kafka之后我们发现了一个问题就是消息时序的问题。首先我们的数据采集 程中,因为不同的用户网络带宽不一样,数据可能是有延迟的,晚到的消息反而可能更早发生,而且Kafka不同的partition之间是不保证时序的。
但是我们所有的离线统计程序都是需要按时间统计的,所以我们就需要一个支持时序的数据库帮我们把数据排好序,这里我们选了HBase。我们用消息产生的时间加上我们生成消息的ID做成它唯一的row key,进行排序和索引。
对于sql的方案来说,我们选择的是Phoenix。选Phoenix是因为我们考虑了目前几个SQL On HBase的方案,我们发现Phoenix的效率非常好,是因为它充分的利用了HBase coprocessor的特性,在server端进行了大量的计算,所以大量减轻了client的数据压力还有计算压力。
还有就是它支持HBase的Column Family概念,比如说我们要支持40个纬度的时候我们会有一张大宽表,如果我们把所有的列都设置一个列族的话,在查询任意一个列的时候都需要把40列的数据都读出来,这样是得不偿失的,所以Phoenix支持Column Family的话,我们就可以把不同的列根据它们的相关性分成几个列族,查询的时候可能只会命中一个到两个列族,这样大大减少了读取量。
Phoenix还支持Spark的DataSource API,支持列剪枝和行过滤的功能,而且支持数据写入。什么是Spark的DataSource API呢, Spark在1.2的时候提供了DataSource API,它主要是给Spark框架提供一种快速读取外界数据的能力,这个API可以方便的把不同的数据格式通过DataSource API注册成Spark的表,然后通过Spark SQL直接读取。它可以充分利用Spark分布式的优点进行并发读取,而且Spark本身有一个很好的优化引擎,能够极大的加快Spark SQL的执行。
因为Spark最近非常的火,所以它的社区资源非常的多,基本上所有主流的框架,像我们常见的Phoenix,Cassandra, MongoDB都有Spark DataSource相关的实现。还有一个就是它提供了一个统一的数据类型,把所有的外部表都统一转化成Spark的数据类型,这样的话不同的外部表能够相互的关联和操作。
在经过上述的思考之后,我们选择了这样的一个数据框架。
首先我们最下面是三个SDK,JS、安卓和iOS,采集完数据之后会发到我们的负载均衡器,我们的负载均衡器用的是AWS,它会自动把我们这些数据发到我们的server端,server在收集完数据之后会进行一个初步的清洗,把那些不规律的数据给清洗掉,然后再把那些数据发到Kafka里,后面就进入到我们的实时和离线过程。
最终我们的数据会统计到HBase里面,对外暴露的是一个sql的接口,可以通过各种sql的组合去查询所需要的统计数据。目前我们用的主要版本,Spark用的还是1.5.1,我们自己根据我们自己的业务需求打了一些定制的patch,Hadoop用的还是2.5.2,HBase是0.98,Phoenix是4.7.0,我们修复了一些小的bug,以及加了一些自己的特性,打了自己的patch。
Lambda架构的主要思想是将大数据系统架构为多层个层次,分别为批处理层(batchlayer)、实时处理层(speedlayer)、服务层(servinglayer)如图(C)。
理想状态下,任何数据访问都可以从表达式Query= function(alldata)开始,但是,若数据达到相当大的一个级别(例如PB),且还需要支持实时查询时,就需要耗费非常庞大的资源。一个解决方式是预运算查询函数(precomputedquery funciton)。书中将这种预运算查询函数称之为Batch View(A),这样当需要执行查询时,可以从BatchView中读取结果。这样一个预先运算好的View是可以建立索引的,因而可以支持随机读取(B)。于是系统就变成:
(A)batchview = function(all data);
(B)query =function(batch view)。
图(C)
Ceph 是一个开源、多管齐下的操作系统,因为其高性能并行文件系统的特性,有人甚至认为它是基于Hadoop环境下的HDFS的接班人,因为自2010年就有研究者在寻找这个特性。
Apache Flink是一种可以处理批处理任务的流处理框架。该技术可将批处理数据视作具备有限边界的数据流,借此将批处理任务作为流处理的子集加以处理。为所有处理任务采取流处理为先的方法会产生一系列有趣的副作用。
这种流处理为先的方法也叫做Kappa架构,与之相对的是更加被广为人知的Lambda架构(该架构中使用批处理作为主要处理方法,使用流作为补充并提供早期未经提炼的结果)。Kappa架构中会对一切进行流处理,借此对模型进行简化,而这一切是在最近流处理引擎逐渐成熟后才可行的。
Flink的流处理模型在处理传入数据时会将每一项视作真正的数据流。Flink提供的DataStream API可用于处理无尽的数据流。Flink可配合使用的基本组件包括:
为了在计算过程中遇到问题后能够恢复,流处理任务会在预定时间点创建快照。为了实现状态存储,Flink可配合多种状态后端系统使用,具体取决于所需实现的复杂度和持久性级别。
此外Flink的流处理能力还可以理解“事件时间”这一概念,这是指事件实际发生的时间,此外该功能还可以处理会话。这意味着可以通过某种有趣的方式确保执行顺序和分组。
Flink的批处理模型在很大程度上仅仅是对流处理模型的扩展。此时模型不再从持续流中读取数据,而是从持久存储中以流的形式读取有边界的数据集。Flink会对这些处理模型使用完全相同的运行时。
Flink可以对批处理工作负载实现一定的优化。例如由于批处理操作可通过持久存储加以支持,Flink可以不对批处理工作负载创建快照。数据依然可以恢复,但常规处理操作可以执行得更快。
另一个优化是对批处理任务进行分解,这样即可在需要的时候调用不同阶段和组件。借此Flink可以与集群的其他用户更好地共存。对任务提前进行分析使得Flink可以查看需要执行的所有操作、数据集的大小,以及下游需要执行的操作步骤,借此实现进一步的优化。
Flink目前是处理框架领域一个独特的技术。虽然Spark也可以执行批处理和流处理,但Spark的流处理采取的微批架构使其无法适用于很多用例。Flink流处理为先的方法可提供低延迟,高吞吐率,近乎逐项处理的能力。
Flink的很多组件是自行管理的。虽然这种做法较为罕见,但出于性能方面的原因,该技术可自行管理内存,无需依赖原生的Java垃圾回收机制。与Spark不同,待处理数据的特征发生变化后Flink无需手工优化和调整,并且该技术也可以自行处理数据分区和自动缓存等操作。
Flink会通过多种方式对工作进行分许进而优化任务。这种分析在部分程度上类似于SQL查询规划器对关系型数据库所做的优化,可针对特定任务确定最高效的实现方法。该技术还支持多阶段并行执行,同时可将受阻任务的数据集合在一起。对于迭代式任务,出于性能方面的考虑,Flink会尝试在存储数据的节点上执行相应的计算任务。此外还可进行“增量迭代”,或仅对数据中有改动的部分进行迭代。
在用户工具方面,Flink提供了基于Web的调度视图,借此可轻松管理任务并查看系统状态。用户也可以查看已提交任务的优化方案,借此了解任务最终是如何在集群中实现的。对于分析类任务,Flink提供了类似SQL的查询,图形化处理,以及机器学习库,此外还支持内存计算。
Flink能很好地与其他组件配合使用。如果配合Hadoop 堆栈使用,该技术可以很好地融入整个环境,在任何时候都只占用必要的资源。该技术可轻松地与YARN、HDFS和Kafka 集成。在兼容包的帮助下,Flink还可以运行为其他处理框架,例如Hadoop和Storm编写的任务。
目前Flink最大的局限之一在于这依然是一个非常“年幼”的项目。现实环境中该项目的大规模部署尚不如其他处理框架那么常见,对于Flink在缩放能力方面的局限目前也没有较为深入的研究。随着快速开发周期的推进和兼容包等功能的完善,当越来越多的组织开始尝试时,可能会出现越来越多的Flink部署。
Flink提供了低延迟流处理,同时可支持传统的批处理任务。Flink也许最适合有极高流处理需求,并有少量批处理任务的组织。该技术可兼容原生Storm和Hadoop程序,可在YARN管理的集群上运行,因此可以很方便地进行评估。快速进展的开发工作使其值得被大家关注。
大数据系统可使用多种处理技术。
对于仅需要批处理的工作负载,如果对时间不敏感,比其他解决方案实现成本更低的Hadoop将会是一个好选择。
对于仅需要流处理的工作负载,Storm可支持更广泛的语言并实现极低延迟的处理,但默认配置可能产生重复结果并且无法保证顺序。Samza与YARN和Kafka紧密集成可提供更大灵活性,更易用的多团队使用,以及更简单的复制和状态管理。
对于混合型工作负载,Spark可提供高速批处理和微批处理模式的流处理。该技术的支持更完善,具备各种集成库和工具,可实现灵活的集成。Flink提供了真正的流处理并具备批处理能力,通过深度优化可运行针对其他平台编写的任务,提供低延迟的处理,但实际应用方面还为时过早。
最适合的解决方案主要取决于待处理数据的状态,对处理所需时间的需求,以及希望得到的结果。具体是使用全功能解决方案或主要侧重于某种项目的解决方案,这个问题需要慎重权衡。随着逐渐成熟并被广泛接受,在评估任何新出现的创新型解决方案时都需要考虑类似的问题。