本文节选自CCF大数据教材系列丛书之《大数据处理》,本书由华中科技大学金海教授主编,包括大数据处理基础技术、大数据处理编程与典型应用处理、大数据处理系统与优化三个方面。本教材以大数据处理编程为核心,从基础、编程到优化等多个方面对大数据处理技术进行系统介绍,使得读者能够快速入门,同时体会大数据处理系统的设计理念与优化方法本质。
开源系统及编程模型
基于流计算的基本模型,当前已有各式各样的分布式流处理系统被开发出来。本节将对当前开源分布式流处理系统中三个最典型的代表性的系统:ApacheStorm,SparkStreaming,ApacheFlink以及它们的编程模型进行详细介绍。
ApacheStorm
ApacheStorm是由Twitter公司开源的一个实时分布式流处理系统[2],被广泛应用在实时分析、在线机器学习连续计算、分布式RPC、ETL等场景。Storm支持水平扩展、具有高容错性,保证数据能被处理,而且处理速度很快。Storm支持多种编程语言,易于部署和管理,是目前广泛使用的流处理系统之一。
一、Storm中的数据封装
Storm系统可以从分布式文件系统(如HDFS)或分布式消息队列(如Kafka)中获取源数据,并将每个流数据元组封装称为tuple。一条数据流即是一个无边界的tuple序列,而这些tuple序列可以以分布式的方式创建和处理。在Storm中,数据流中的每个tuple相互独立,彼此间的处理上不存在任何关联。Tuple也是Storm中消息传递的基本单元,其数据结构如图5-3-1所示。
如图5-3-1所示,一个tuple可以包含多个字段(field),每个字段代表对应流数据的一个属性,在Storm的每个操作组件发送向下游发送tuple时,会声明对应tuple每个字段的顺序和代表的含义(如数据的键、值、时间戳等)。
二、Storm中的应用拓扑建立
在Storm中,用户所提交的应用所构建的DAG拓扑被称为Topology。Storm的Topology类似于MapReduce中的一个job,但区别在于这个拓扑会永远运行(或者直到手动结束)。每个Topology中有两个重要组件:spout和bolt。
spout是Topology中数据流的来源,也即对应DAG模型中的起始操作。spout可以从外部源读取数据并将其以封装成tuple的形式发送到图5-3-1tuple的数据结构Topology中。bolt是Topology中对tuple进行处理的主要单元。Storm并不区分中间和终止操作,而是将其统一为bolt来进行实现,也即对结果的输出需要由用户自己来实现。所有对流数据的处理都是在bolt中实现,bolt可以执行各种基础操作,如过滤、聚合、连接等。bolt每处理完一个tuple后,可以按照应用需求发送给0个或多个tuple给下游的bolt。
三、Storm中的并行度指定
Storm中的并行度有三层含义。首先是worker进程数。Storm可以建立在分布式集群上,每台物理节点可以发起一个或多个worker进程。
一个worker对应一个物理的JVM(Java虚拟机)。通常,整个Topology会由一个或者多个worker进程来负责执行。每个worker会在一个JVM中运行一个或多个executor,每个executor对应一个线程,执行某一个spout或者bolt的计算任务。在Storm中,每个spout/bolt都可以实例化生成多个task在集群中运行,一般默认情况下,executor数与task数一一对应,也即每个实例都由一个单独的线程来执行。用户也可以指定task数大于executor数,这时部分task会由同一个线程循环调用来执行。在Storm的Topology建立时,用户可以根据需要依次来设定整体的worker进程数以及每个spout/bolt对应的executor数和task数。
四、Storm中的数据分组和传输
用户可以通过定义分组策略(streaminggrouping)来决定数据流如何在不同的spout/bolt的task中进行分发和传输。分组策略将所有的spout和bolt连接起来构成一个Topology,如图5-3-2所示。除了5.2.4节所介绍的几种基本分组策略外,Storm还支持其他的分组策略。例如localgrouping,这是shufflegrouping的一种变种分组策略。由于Storm划分多个worker进程,shufflegrouping可能导致大量的进程间通信,localgrouping则是将元组优先发往与自己同进程的下游task中,若没有这种下游task,才继续沿用shufflegrouping的方式。
图5-3-2streaminggrouping
图5-3-3Storm系统架构
又如directgrouping,这是一种特殊的分组方式,用户可以直接指定由下游的哪一个task来接收数据。
五、Storm的分布式系统架构
Storm可以运行在分布式集群上。Storm集群结构沿用了主从架构方式,即一个主控节点和多个工作节点。图5-3-3展示了整个Storm的系统架构。
Storm的基本组件分别如下。
Nimbus:运行Nimbus的节点是Storm集群的主控节点,Nimbus类似Hadoop中JobTracker的角色,是用户和Storm系统之间的交互点。Nimbus主要的工作是用于用户提交Topology、进行集群任务的分配调度、进行集群监控和统计等。
Supervisor:每个工作节点上都运行着一个Supervisor,Supervisor用于接收Nimbus分配的任务,并根据分配产生worker进程。同时它还会监视worker的健康状况,在必要的情况下会重启worker进程。
ZooKeeper:Storm系统借用Zookeeper集群来进行Nimbus和Supervisor之间的所有协调工作,包括Nimbus对Supervisor所执行的任务的调配,以及帮助Nimbus监控所有Supervisor和task的运行情况,以便失效时迅速重启。
六、Storm的编程示例
下面,以一个简单的WordCount应用为例,来进行Storm编程模型的讲解。
(1)实现生成数据的spout,封装数据首先构建一个CreateSentenceSpout来进行数据流的生成。为了简化说明,从若干给定的静态句子列表中每次随机抽取一句作为一个tuple来传递给下游bolt进行处理。CreateSentenceSpout的具体实现如代码5-3-1所示。
以上代码中,BaseRichSpout类是Storm提供的一个简单接口,使用它可以默认实现很多方法,使用户只用关心实现应用所需要的代码上去。
在spout中最主要的工作就是数据的封装。Spout的核心代码在nextTuple()方法中实现,即如何产生所需的tuple并进行传输。Spout会循环调用此方法来不断产生新的tuple。在本例中,从open()方法里给定的句子列表中随机抽取一条作为tuple,并通过emit方法将tuple进行传输。
在emit生成tuple时,还需要对tuple中的每个字段进行声明。这是由declareOutputFields()方法来实现。这个方法是Storm中所有spout/bolt都需要实现的一个方法。在本例中,生成的每个句子对应一个tuple,其只具有一个字段,字段的值就是句子本身,因此在declareOutputFields()中声明字段只有一个“sentence”。open()方法是对应组件在进行初始化时执行的方法,其中要注意的是open()方法会接收SpoutOutputCollector对象所提供的后续tuple传输方法作为参数,因此在open()方法的实现中,需要将其引用保存在一个变量当中,以便nextTuple()方法调用。
(2)实现对流数据进行操作处理的bolt
在WordCount应用中,对spout生成的句子,构建两个bolt来进行处理:一个SplitWordBolt来将句子划分为单词,一个CountBolt来对划分好的单词进行累计计数。下面,以SplitWordBolt为例来进行讲解,其实现代码如代码5-3-2所示。
BasicRichBolt类同样是Storm对于bolt类提供的一个简单接口,使用户能够仅集中编写所需的操作逻辑即可。
Bolt的核心是execute()方法。每当接收到一个新的tuple,都会直接对此方法进行调用,然后执行。使用getStringByField()方法可以读取在上游组件生成tuple时声明的对应字段里的值。当完成处理后,如果新产生的tuple需要继续向后传输,可以通过调用emit方法对tuple进行发送。
prepare()方法与spout中的open()方法功能相似。Declare-OutputFields()方法与spout()中相同,这里不再赘述。
(3)构建流应用Topology,并指明并行度和分组策略
实现了对应的spout和bolt功能之后,最后就是将其连接成一个完整的Topology。本例中Topology的代码如代码5-3-3所示。
以上代码中,首先生成了TopologyBuilder的一个实例,然后分别对应生成spout和bolt的各个实例。在setSpout和setBolt方法中,第一个参数为对应的组件注册了ID,第二个参数生成对应组件的实例,而第三个参数为对应组件需要生成的executor个数。
在setBolt方法中,除了对应生成实例外,还需要指定每个bolt需要接收哪个组件发送给自己的数据,以及数据的发送方式,即分组策略。例如本例中,CountBolt需要从SplitWordBolt处接收数据,SplitWordBolt发送的数据以fieldsgrouping(同keygrouping)的方式进行发送,其中用于分组的键值为SplitWordBolt发送tuple的“word”字段的值。
最后,可以自由指定程序的并行度。可以使用setNumWorkers方法来指定用于执行此Topology中worker进程的个数,本例中为整个Topology分配了4个worker进程;可以用setSpout和setBolt方法中的第三个参数指定executor数量,而若需要指定更多的task数,则可以继续使用setNumTasks进行设定。本例为每个spout/bolt都生成了4个executor,而进一步为SplitWordBolt分配了8个task,这使得每2个task由一个executor线程来负责执行。
SparkStreaming
SparkStreaming是SparkAPI核心扩展,提供对实时数据流进行流式处理,具备可扩展、高吞吐和容错等特性。SparkStreaming支持从多种数据源中提取数据,例如Twitter、Kafka、Flume、ZeroMQ和TCP套接字,并提供了一些高级的API来表示复杂处理算法,如map、reduce、join、windows等,最后可以将得到的结果存储到分布式文件系统(如HDFS)、数据库或者其他输出,Spark的机器学习和图计算的算法也可以应用于SparkStreaming的数据流中。
一、SparkStreaming中的数据封装
和Storm不同的是,SparkStreaming本质上是一个典型的微批处理系统,其与以元组为单位进行流式处理不同,它将无尽的数据流按时间切分为连续的小批次数据,然后以传统的批处理方法来进行快速连续的处理。
在SparkStreaming中,数据流被抽象成以时间片段分隔开的离散流(discretizedstream)形式。简单而言,就是将所有的流数据按照一定的批大小(如1秒)分割成一段又一段的小批次数据,如图5-3-4所示。SparkStreaming使用Spark引擎,将每一段小批次数据转化成为Spark当中的RDD(弹性分布式数据集)。流数据即以RDD的形式在SparkStreaming系统中进行运算。
图5-3-4SparkStreaming的离散流
二、SparkStreaming中的应用拓扑建立
SparkStreaming同样在系统中构建出DAG的处理模型。不过与Storm不同,SparkStreaming并不使用固定的处理单元来执行单一的操作。实际上,SparkStreaming中的DAG与SparkCore中的DAG相同,只是用DAG的形式将每一个时间分片对应的RDD进行运算的job来进一步划分成任务集stage,以便进行高效的批处理。SparkStreaming沿用了SparkCore对RDD提供的transformation操作,将所有RDD依次进行转换,应用逻辑分别进行转换处理,进而实现对整个离散流的转换。
图5-3-5展示了SparkStreaming的整体计算框架,一方面在线输入的数据流被按照时间切分为若干小批次数据并被转化成为RDD存储在内存中,另一方面,根据流应用逻辑,也即流处理引用抽象出DAG拓扑,制定出相应的RDDtransformation。RDD不断被批量执行transformation操作,直到产生最终的结果。
图5-3-5SparkStreaming计算框架[7]
三、SparkStreaming中的并行度指定
由于SparkStreaming本质上是将数据流的任务划分成为大量的微批数据,对应多个job来执行,所以SparkStreaming的并行度设定与Spark进行批处理时的设定一样,只能设定整体job的并行度,而不能对每个操作单独的并行度进行设置。然而由于批处理的特性,SparkStreaming可以最大化对系统并行能力的利用,也能获得相对更高的系统吞吐率。
四、SparkStreaming中的数据分组和传输
由于使用微批处理技术,SparkStreaming的数据被打包为一个个微批,而每个微批相互独立地进行处理,所以不涉及所提到的数据分组与传输问题。但这也展现出微批处理的一个局限性,其难以灵活处理基于用户自定义的窗口的聚合、计数等操作,也不能进行针对数据流的连续计算,如两个数据流的实时连接等操作。
五、SparkStreaming的系统框架
SparkStreaming建立在Spark系统之上,其系统架构相对于Spark的修改和新增部分如图5-3-6所示。
图5-3-6SparkStreaming基于Spark修改和新增的组件[7]
除开Spark系统本身组件外,SparkStreaming主要组件如下。
master:是SparkStreaming中流应用的入口。根据应用逻辑产生用于转换RDD的task然后进行调度,并对这些task进行追踪。D-Streamlineage包含了离散流间的转换关系,类似流应用的DAG图。
client:SparkStreaming建立了一个client库来将数据传入到系统当中。
worker:是SparkStreaming中流数据的入口以及执行RDD转换的主要组件。相对于Spark,主要新增了inputreceiver对流数据进行独立的接收。流数据可以是从系统外在线地进行读取进来,并转化为离散流的形式,也可以是经过其他execution执行转化后的离散流。
六、SparkStreaming的编程示例
SparkStreaming的编程较为简单,这是由于它本身基于Spark建立,有丰富的API可以调用,可以省去大量无关的编码。下面同样以WordCount应用为例来对SparkStreaming的编程模型进行说明。
(1)离散流的输入和数据封装
在WordCount应用中,假定直接从一个socket来获取源源不断的句子数据流,那么数据流的输入具体实现如代码5-3-4所示。
以上代码中,首先建立了JavaStreamingContext对象,同时需要指定划分离散流的时间间隔。本例中指定了每隔1s就划分一次微批。接着,指定从端口的socket中持续获取数据流。通过以上代码,每个executor获取的数据流就会根据1s的时间间隔不断划分成小批次,并进一步转化为RDD。这一串RDD的组合即是新产生的“lines”离散流。
(2)建立应用拓扑,进行离散流的转化
离散流的转化即根据相应的应用逻辑指定对应的RDD的转化方式。在WordCount应用中,先将句子转化为若干的单词,然后将每个单词变成(单词,计数)的二元对,最后对相同单词的二元对计数进行累加。具体实现如代码5-3-5所示。
以上代码中,利用Spark丰富的transformation方法,将由一个个句子组成的“lines”离散流首先通过flatMap的方式映射为由单词组成的“words”离散流。进一步通过mapToPair的方式映射为(单词,计数)二元对组成的“pairs”离散流,这里每个单词没有累加前,计数值就直接等于1。最后通过reduceByKey的方式,对相同单词的计数进行累加操作。
ApacheFlink
ApacheFlink是一个同时支持分布式数据流处理和数据批处理的大数据处理系统。其特点是完全以流处理的角度出发进行设计,而将批处理看作是有边界的流处理特殊流处理来执行。Flink可以表达和执行许多类别的数据处理应用程序,包括实时数据分析、连续数据管道、历史数据处理(批处理)和迭代算法(机器学习、图表分析等)。
Flink同样是使用单纯流处理方法的典型系统,其计算框架与原理和ApacheStorm比较相似。Flink做了许多上层的优化,也提供了丰富的API供开发者能更轻松地完成编程工作。
一、Flink中的数据封装
Flink能够支撑对多种类型的数据进行处理,例如Flink支撑任意的Java或者Scala类型,这使得Flink使用更加灵活。类似Storm,Flink同样也可以使用多字段的tuple为其基本数据单元。Flink可以支持了多种Flinktuple类型(tuple1至tuple25),每种tuple都是一个固定长度的对象序列。
二、Flink中的应用拓扑建立
Flink中核心概念为数据流(stream)和转换(transformation)。每个转换对应的是一个简单的操作,根据应用逻辑,转换按先后顺序构成了流应用的DAG图,如图5-3-7所示。数据流在转换之间传递,直到完成所有的转换进行输出。Flink应用包含明确的源操作和汇聚操作,用于数据的输入与输出。
Flink内部实现了许多基本的转换操作,比如Map、FlatMap、Reduce、Window等,同时也实现了许多源和汇聚操作,比如writeAsText、writeAsCsv、print等。Flink提供了丰富的API以简化用户对应用拓扑的编写和表达。
三、Flink中的并行度指定
与Storm相似,Flink程序的计算框架本质上也并行分布式的。在系统中,一个流包含一个或多个流分区,而每一个转换操作包含一个或多个子任务实例。操作的子任务间彼此独立,以不同的线程执行,可以运行在不同的机器或容器上。
一个Flink应用同样运行在一个或多个worker进程当中。一个worker中生成一个或多个taskslot。每个taskslot用以承载和执行Flink每个转换操作的一个子任务实例。Flink可以指定全局的taskslot数目作为其最大的并行度。同时若部分转换不需要使用如此多资源,Flink也可以指定每一操作具体的子任务数。每个转换操作对应的子任务默认轮询地分布在分配的taskslot内。
四、Flink中的数据分组与传输
Flink的数据分组方法主要包括一对一(one-to-one)模式或者重分组(redistributing)模式两种。
采用一对一模式时,数据流中元素的分组和顺序会保持不变,也就是说,对于上下游的两个不同的转换操作,下游任一子任务内要处理的元组数据,与上游相同顺序的子任务所处理的元组数据完全一致。
采用重分组模式则会改变数据流所在的分组。重分组后元组的目标子任务根据处理的变换方法不同而发生改变。例如经过keyBy()转化,元组就会根据keyBy()的参数选择对应的字段作为key值,进行哈希计算来重新分组。经过broadcast()转化即相应地进行广播等。
五、Flink的系统框架
图5-3-8显示了ApacheFlink的分布式运行环境架构。
Flink的系统架构中包含以下重要组件。
jobclinet:jobclient是一个独立的程序执行入口。jobclient负责接收用户提交的程序,并将用户提交的程序通过优化器和graphbuilder转换成dataflowgraph(类似流应用的DAG图)。然后将生成的dataflow提交给jobmanager进行job的管理和调度。一旦执行完成,jobclient返回给用户最后的执行结果。
jobmanager:对应一个Flink程序的master进程,负责job的管理和资源的协调。主要包括任务调度、监控任务的执行状态、协调任务的执行、检查点管理和失败恢复等。
图5-3-8ApacheFlink分布式运行环境[9]
taskmanager和taskslot:是Flink中具体负责执行tasks的组件。每个taskmanage对应是运行在节点上的JVM进程,拥有一定的量的资源。比如内存、CPU、网络、磁盘等。每个执行的task运行在其中的一个或多个线程中。taskslot是分布式程序真正执行task的地方。每个taskslot可以包括JVM进程中的一部分内存。
六、Flink的编程示例
Flink的编程核心也就在数据流和转换上。下面,依然以WordCount为例来对Flink的编程模型进行说明。代码5-3-6是Flink中以5分钟为窗口进行一次求和统计的WordCount应用代码。
在以上代码中,定义了一个DataStream实例,并通过socket的方式从端口监听在线获取数据。监听到的句子数据被使用flatmap转化成单词,并直接以(单词,计数)二元对的形式记录下来。当流被转化为二元对后,接着根据当前第0位的字段“word”进行keyBy()的操作,最后以5分钟为窗口大小,对计数值进行累计。
Flink的编程非常简洁和直观,上例中,DataStream从源操作从socket在线读取数据,到各种转换操作,到最后的汇聚求和操作都可以直接表达出来。Flink提供了丰富的API和各种表达上的简化来降低用户的编程难度和编程量。
上例通过使用env.setParallelism来设置流处理程序的整体并行度,即taskslot数量为8。同时,可以进一步为每一个操作设置并行度,如在saveAsText()操作后通过使用setParallelism将这个操作的并行度修改为1。
想要了解更多资讯,请扫描下方