知识库 : Spark Streaming的概要与应用

 

 

Edit Document

 

 

 

 

 

 

 

 

 

 

Spark Streaming的概要与应用

 

 

 

作者 费英林

 


目录

1.               系统概要

2.               基本概念

2.1.               Linking

2.2.               初始化 StreamingContext

2.3.               离散数据流 DStream

2.4.               Input DStream Receiver

2.5.               DStream 的转换

2.6.               DStream 的输出操作

2.7.               DataFrame SQL 操作

2.8.               MLlib 操作

2.9.               DataFrame SQL 操作

2.10.               缓存与持久化

2.11.               Checkpointing

3.               实例

3.1.               Spark Streaming 程序

3.2.               Flume 配置

3.3.               打包

3.4.               启动应用

3.5.               启动 Flume Agent

3.6.               状态监控

3.7.               结果确认

 

 

1.     系统概要

Spark Streaming Spark API 的扩展,提供了可扩展的、高吞吐量的、可容错的实时数据处理。数据源可以是 Kafka Flume Twitter ZeroMQ Kinesis TCP sockets 等,这些数据可以通过 map reduce join window 等高级函数进行处理,处理后的数据可以存储至文件系统、数据库和仪表板等。我们甚至可以将机器学习和图处理应用于数据流处理的过程中。

QQ截图20150901104949.jpg

它的内部工作机制如下图所示。 Spark Streaming 接收输入数据流并将接收到的数据划分为多个小的 batc h Spark 引擎依次处理这些 batc h ,生成结果。

Spark Streaming

Spark Streaming 提供了一个高级抽象 离散数据流 DStream ,它代表一个持续的数据流。 DStream 可从输入流中创建,如 Kafka Flume Kinesis ,也可以从其它 DStream 中创建。实际上, DStream 是一个 RDD 的序列。

我们可以使用 Scala Java Python(Spark 1.2 中引入 ) 来实现 Spark Streaming 。其中针对 Python 语言提供的 API 是不完善的,具体细节可参照 API 文档。

2.     基本概念

2.1.   Linking

以下是基于 Maven 的配置。

                            <dependency>

                                          <groupId>org.apache.spark</groupId>

                                          <artifactId>spark-streaming_2.10</artifactId>

                                          <version>1.3.0</version>

                            </dependency>

对于 Kafka Flume Kinesis 等,我们需要添加相应的 artifact   Maven 里,列表如下:

Kafka                             -               spark-streaming-kafka_2.10

Flume                             -               spark-streaming-flume_2.10

Kinesis                             -               spark-streaming-kinesis-asl_2.10 [Amazon Software License]

Twitter                             -               spark-streaming-twitter_2.10

ZeroMQ               -               spark-streaming-zeromq_2.10

MQTT                             -               spark-streaming-mqtt_2.10

2.2.   初始化 StreamingContext

StreamingContext Spark Streaming 的主入口,初始化代码如下:

 

import org.apache.spark.*;

import org.apache.spark.streaming.api.java.*;

 

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);

JavaStreamingContext ssc = new JavaStreamingContext(conf, Duration(1000));

 

appName 是在 cluster UI 上可看到的应用名称。 Master 可以是 Spark Mesos YARN cluster URL 或者 local[*] (本地模式)。在实际应用中,如果是运行在一个集群上,我们不需要设置 master 参数,可以通过 spark-submit 提交作业。 Batch 区间要依据具体应用的实时需求和集群的资源情况进行设定。

JavaStreamingContext 也可以从一个已有的 JavaSparkContext 中创建,如:

 

import org.apache.spark.streaming.api.java.*;

 

JavaSparkContext sc = ...   //existing JavaSparkContext

JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

 

在定义了上下文之后,我们需要完成以下工作:

         通过 DStream 定义输入源;

         定义流计算过程 DStream 定义转换和输出;

         调用 streamingContext.start() 开始接收、处理数据;

         调用 streamingContext.awaitTermination() 等待处理完成;

         可调用 amingContext.stop() 来手工停止处理过程。

2.3.   离散数据流DStream

DStream 表示一个连续数据流,或者是从数据源获得的输入数据流,或者输入数据流处理后的数据流。 DStream 在内部表示为一个持续的 RDD 序列。

Spark Streaming

DStream 上的操作都翻译为底层 RDD 上的操作。以 wordcount 应用为例,

flatMap   操作是应用于 lines   DStream 中的每个 RDD 的,然后生成对应的 words   DStream 。如下图所示:

Spark Streaming

2.4.   Input DStream和Receiver

Input DStreams 代表从源数据接收到的输入数据流。每个 Input DStream (除了文件流)都与一个 Receiver 对象关联, Receiver 对象从数据源接收数据并存储在 Spark 的内存中供后续使用。 Spark Streaming 提供了两类内置的流数据源:

基本源:可从 StreamingContext API 中直接访问的源,如文件系统, socket 连接,和 Akka 角色。

高级源:如 Kafka Flume Kinesis Twitter 等,可通过附加的工具类获得。

 

2.5.   DStream 的转换

类似于 RDD 上的操作, transformation 可以改变输入流中的数据,具体的 transformation 说明请参阅 API

Spark Streaming 支持窗口操作,即我们可以定义一个滑动的时间窗口,这个时间窗口可以包含一个或多个 batch 区间。

Spark Streaming

2.6.   DStream 的输出操作

DStream 的数据可以输出到外部系统,如文件系统或数据库。 DStream 把持 foreachRDD 操作,类似于对输入流中的 RDD for 循环。

 

2.7.   DataFrame SQL 操作

Spark Streaming 支持 DataFrames SQL 操作。类似的,我们也可以在其它线程(与当前 StreamingContext 异步)里定义的表上运行 SQL 查询。

2.8.   MLlib操作

Spark Streaming 支持 MLlib 的包含的机器学习算法。

 

2.9.   DataFrame SQL 操作

Spark Streaming 支持 DataFrames SQL 操作。类似的,我们也可以在其它线程(与当前 StreamingContext 异步)里定义的表上运行 SQL 查询。

 

2.10.       缓存与持久化

类似于 RDD DStreams 允许开发者在内存中持久化数据。对于需要进行多次运算的数据,这个功能很有用。前面提到的时间窗口操作也依赖于这个功能。

 

2.11.       Checkpointing

Spark Streaming 支持两类数据做 checkpoint

         元数据 checkpointing - 保存流计算的定义到一个可空错存储,如 HDFS 。元数据包括配置、 DStream 操作和未完成的 batch

         数据 checkpointing - 保存产生的 RDD 到一个可靠存储。

具体的启用 Checkpointing 及恢复请参考官方文档。

 

3.     实例

以下实例通过 Flume 接入一个实时增加的文件, Flume 通过事件的形式将增加的文件数据加到 Spark 流处理的接收器,然后对这些数据进行 wordcount 操作,统计结果存储到一个 HBase 的表里面。

3.1.   Spark Streaming程序

相关的 Java 代码如下:

package org.apache.spark.examples.streaming;

 

import java.util.Arrays;

 

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

import org.apache.spark.streaming.Duration;

import org.apache.spark.streaming.Time;

import org.apache.spark.streaming.api.java.JavaPairDStream;

import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.apache.spark.streaming.flume.FlumeUtils;

import org.apache.spark.streaming.flume.SparkFlumeEvent;

 

import scala.Tuple2;

 

public final class JavaFlumeEventCount2 {

              private JavaFlumeEventCount2() {

              }

 

              public static void main(String[] args) {

                            if (args.length != 2) {

                                          System.err.println("Usage: JavaFlumeEventCount <host> <port>");

                                          System.exit(1);

                            }

 

                            String host = args[0];

                            int port = Integer.parseInt(args[1]);

 

                            Duration batchInterval = new Duration(2000);

                            SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");

                            JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,

                                                        batchInterval);

 

                            JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils

                                                        .createStream(ssc, host, port);

 

                            JavaPairDStream<String, Integer> lastCounts = flumeStream

                                                        .flatMap(new FlatMapFunction<SparkFlumeEvent, String>() {

 

                                                                      // @Override

                                                                      public Iterable<String> call(SparkFlumeEvent event)

                                                                                                  throws Exception {

                                                                                    String bodyString = new String(event.event().getBody()

                                                                                                                .array(), "UTF-8");

                                                                                    return Arrays.asList(bodyString.split(" "));

                                                                      }

                                                        }).mapToPair(new PairFunction<String, String, Integer>() {

                                                                      // @Override

                                                                      public Tuple2<String, Integer> call(String s) {

                                                                                    return new Tuple2<String, Integer>(s, 1);

                                                                      }

                                                        }).reduceByKey(new Function2<Integer, Integer, Integer>() {

 

                                                                      // @Override

                                                                      public Integer call(Integer x, Integer y) throws Exception {

                                                                                    // TODO Auto-generated method stub

                                                                                    return x.intValue() + y.intValue();

                                                                      }

                                                        });

 

                            lastCounts

                                                        .foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {

 

                                                                      // @Override

                                                                      public Void call(JavaPairRDD<String, Integer> values,

                                                                                                  Time time) throws Exception {

 

                                                                                    values.foreach(new VoidFunction<Tuple2<String, Integer>>() {

 

                                                                                                  // @Override

                                                                                                  public void call(Tuple2<String, Integer> tuple)

                                                                                                                              throws Exception {

                                                                                                                HBaseCounterIncrementor incrementor = HBaseCounterIncrementor

                                                                                                                                            .getInstance("spark_flume", "cf1");

                                                                                                                incrementor.increment("Counter", tuple._1(),

                                                                                                                                            tuple._2());

                                                                                                                System.out.println("Counter:" + tuple._1()

                                                                                                                                            + "," + tuple._2());

 

                                                                                                  }

                                                                                    });

 

                                                                                    return null;

                                                                      }

                                                        });

 

                            ssc.start();

                            ssc.awaitTermination();

 

              }

}

3.2.   Flume配置

创建一个配置文件: /etc/flume-ng/conf/flume-spark-streaming-tail.conf ,内容如下:

a3.sources = r1

a3.sinks = k1

a3.channels = c1

 

# Describe/configure the source

a3.sources.r1.type = exec

a3.sources.r1.command = tail -F /var/log/hadoop-yarn/yarn/yarn-yarn-nodemanager-udh-yf-dev-12.log

 

# Describe the sink

a3.sinks.k1.type = avro

a3.sinks.k1.hostname = udh-yf-dev-12.yonyou.com

a3.sinks.k1.port = 44123

 

# Use a channel which buffers events in memory

a3.channels.c1.type = memory

a3.channels.c1.capacity = 1000

a3.channels.c1.transactionCapacity = 100

 

# Bind the source and sink to the channel

a3.sources.r1.channels = c1

a3.sinks.k1.channel = c1

 

3.3.   打包

需要注意的是,如果我们的应用中使用到了 Spark 之外的包,那么需要将这些包一起打包到应用包中,也就是将依赖的包包含进来。通过在 Maven 中配置 Build 选项实现这一点。

 

3.4.   启动应用

启动命令如下:

spark-submit --class org.apache.spark.examples.streaming.JavaFlumeEventCount2 \

--master yarn-cluster \

--deploy-mode cluster \

--num-executors 3 \

--driver-memory 500m \

--executor-memory 500m \

--executor-cores 1 \

/tmp/SparkTest.jar \

udh-yf-dev-12.yonyou.com \

44123

其中 /tmp/SparkTest.jar 是应用包, udh-yf-dev-12.yonyou.com Flume sink 的目标主机, 44123 Flume sink 的目标端口, org.apache.spark.examples.streaming.JavaFlumeEventCount2 是具体的应用。

3.5.   启动F l ume Agent

启动命令如下:

flume-ng agent -n a3 -c /etc/flume-ng/conf -f /etc/flume-ng/conf/flume-spark-streaming-tail.conf

 

其中阿 a3 是定义的 Agent 名称, /etc/flume-ng/conf/flume-spark-streaming-tail.conf 是前面提到的 Flume 配置文件。

 

启动后, Flume 将传输新增的文件数据至指定的主机和端口,我们的 Spark Streaming 程序将监听那个端口并处理接收到的数据。

3.6.   状态监控

可打开 Yarn UI 查看作业运行状态,我们可以看到有一个 Spark 应用在持续运行。

 

3.7.   结果确认

打开 HBase Shell ,查询指定表,可看到数据在不断变化。

 

 

Attachments:

专题-一种基于UDH Search的HBase二级索引构建方案.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)
图4.png (image/png)
图3.png (image/png)
图2.png (image/png)
图1.png (image/png)
专题_基于UDH的一种实时查询系统构建方案.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)
基于UDH的一种实时查询系统构建方案 - 知识库 - 数据处理平台知识库.htm (text/html)
专题 - Spark Streaming的概要与应用.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)
专题 - Spark Streaming的概要与应用.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)
专题 - Spark Streaming的概要与应用.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)
专题 - Spark Streaming的概要与应用.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)