Spark Streaming的概要与应用
作者 : 费英林
目录
1. 系统概要
Spark Streaming 是 Spark API 的扩展,提供了可扩展的、高吞吐量的、可容错的实时数据处理。数据源可以是 Kafka 、 Flume 、 Twitter 、 ZeroMQ 、 Kinesis 和 TCP sockets 等,这些数据可以通过 map 、 reduce 、 join 和 window 等高级函数进行处理,处理后的数据可以存储至文件系统、数据库和仪表板等。我们甚至可以将机器学习和图处理应用于数据流处理的过程中。
它的内部工作机制如下图所示。 Spark Streaming 接收输入数据流并将接收到的数据划分为多个小的 batc h , Spark 引擎依次处理这些 batc h ,生成结果。
Spark Streaming 提供了一个高级抽象 – 离散数据流 DStream ,它代表一个持续的数据流。 DStream 可从输入流中创建,如 Kafka 、 Flume 和 Kinesis ,也可以从其它 DStream 中创建。实际上, DStream 是一个 RDD 的序列。
我们可以使用 Scala 、 Java 或 Python(Spark 1.2 中引入 ) 来实现 Spark Streaming 。其中针对 Python 语言提供的 API 是不完善的,具体细节可参照 API 文档。
2. 基本概念
2.1. Linking
以下是基于 Maven 的配置。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.3.0</version>
</dependency>
对于 Kafka 、 Flume 和 Kinesis 等,我们需要添加相应的 artifact 到 Maven 里,列表如下:
Kafka - spark-streaming-kafka_2.10
Flume - spark-streaming-flume_2.10
Kinesis - spark-streaming-kinesis-asl_2.10 [Amazon Software License]
Twitter - spark-streaming-twitter_2.10
ZeroMQ - spark-streaming-zeromq_2.10
MQTT - spark-streaming-mqtt_2.10
2.2. 初始化 StreamingContext
StreamingContext 是 Spark Streaming 的主入口,初始化代码如下:
import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, Duration(1000));
appName 是在 cluster UI 上可看到的应用名称。 Master 可以是 Spark 、 Mesos 、 YARN cluster URL 或者 local[*] (本地模式)。在实际应用中,如果是运行在一个集群上,我们不需要设置 master 参数,可以通过 spark-submit 提交作业。 Batch 区间要依据具体应用的实时需求和集群的资源情况进行设定。
JavaStreamingContext 也可以从一个已有的 JavaSparkContext 中创建,如:
import org.apache.spark.streaming.api.java.*;
JavaSparkContext sc = ... //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
在定义了上下文之后,我们需要完成以下工作:
通过 DStream 定义输入源;
定义流计算过程 – 在 DStream 定义转换和输出;
调用 streamingContext.start() 开始接收、处理数据;
调用 streamingContext.awaitTermination() 等待处理完成;
可调用 amingContext.stop() 来手工停止处理过程。
2.3. 离散数据流DStream
DStream 表示一个连续数据流,或者是从数据源获得的输入数据流,或者输入数据流处理后的数据流。 DStream 在内部表示为一个持续的 RDD 序列。
在 DStream 上的操作都翻译为底层 RDD 上的操作。以 wordcount 应用为例,
flatMap 操作是应用于 lines DStream 中的每个 RDD 的,然后生成对应的 words DStream 。如下图所示:
2.4. Input DStream和Receiver
Input DStreams 代表从源数据接收到的输入数据流。每个 Input DStream (除了文件流)都与一个 Receiver 对象关联, Receiver 对象从数据源接收数据并存储在 Spark 的内存中供后续使用。 Spark Streaming 提供了两类内置的流数据源:
基本源:可从 StreamingContext API 中直接访问的源,如文件系统, socket 连接,和 Akka 角色。
高级源:如 Kafka 、 Flume 、 Kinesis 和 Twitter 等,可通过附加的工具类获得。
2.5. DStream 的转换
类似于 RDD 上的操作, transformation 可以改变输入流中的数据,具体的 transformation 说明请参阅 API 。
Spark Streaming 支持窗口操作,即我们可以定义一个滑动的时间窗口,这个时间窗口可以包含一个或多个 batch 区间。
2.6. DStream 的输出操作
DStream 的数据可以输出到外部系统,如文件系统或数据库。 DStream 把持 foreachRDD 操作,类似于对输入流中的 RDD 做 for 循环。
2.7. DataFrame 和 SQL 操作
Spark Streaming 支持 DataFrames 和 SQL 操作。类似的,我们也可以在其它线程(与当前 StreamingContext 异步)里定义的表上运行 SQL 查询。
2.8. MLlib操作
Spark Streaming 支持 MLlib 的包含的机器学习算法。
2.9. DataFrame 和 SQL 操作
Spark Streaming 支持 DataFrames 和 SQL 操作。类似的,我们也可以在其它线程(与当前 StreamingContext 异步)里定义的表上运行 SQL 查询。
2.10. 缓存与持久化
类似于 RDD , DStreams 允许开发者在内存中持久化数据。对于需要进行多次运算的数据,这个功能很有用。前面提到的时间窗口操作也依赖于这个功能。
2.11. Checkpointing
Spark Streaming 支持两类数据做 checkpoint :
元数据 checkpointing - 保存流计算的定义到一个可空错存储,如 HDFS 。元数据包括配置、 DStream 操作和未完成的 batch 。
数据 checkpointing - 保存产生的 RDD 到一个可靠存储。
具体的启用 Checkpointing 及恢复请参考官方文档。
3. 实例
以下实例通过 Flume 接入一个实时增加的文件, Flume 通过事件的形式将增加的文件数据加到 Spark 流处理的接收器,然后对这些数据进行 wordcount 操作,统计结果存储到一个 HBase 的表里面。
3.1. Spark Streaming程序
相关的 Java 代码如下:
package org.apache.spark.examples.streaming;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import scala.Tuple2;
public final class JavaFlumeEventCount2 {
private JavaFlumeEventCount2() {
}
public static void main(String[] args) {
if (args.length != 2) {
System.err.println("Usage: JavaFlumeEventCount <host> <port>");
System.exit(1);
}
String host = args[0];
int port = Integer.parseInt(args[1]);
Duration batchInterval = new Duration(2000);
SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
batchInterval);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils
.createStream(ssc, host, port);
JavaPairDStream<String, Integer> lastCounts = flumeStream
.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() {
// @Override
public Iterable<String> call(SparkFlumeEvent event)
throws Exception {
String bodyString = new String(event.event().getBody()
.array(), "UTF-8");
return Arrays.asList(bodyString.split(" "));
}
}).mapToPair(new PairFunction<String, String, Integer>() {
// @Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
// @Override
public Integer call(Integer x, Integer y) throws Exception {
// TODO Auto-generated method stub
return x.intValue() + y.intValue();
}
});
lastCounts
.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
// @Override
public Void call(JavaPairRDD<String, Integer> values,
Time time) throws Exception {
values.foreach(new VoidFunction<Tuple2<String, Integer>>() {
// @Override
public void call(Tuple2<String, Integer> tuple)
throws Exception {
HBaseCounterIncrementor incrementor = HBaseCounterIncrementor
.getInstance("spark_flume", "cf1");
incrementor.increment("Counter", tuple._1(),
tuple._2());
System.out.println("Counter:" + tuple._1()
+ "," + tuple._2());
}
});
return null;
}
});
ssc.start();
ssc.awaitTermination();
}
}
3.2. Flume配置
创建一个配置文件: /etc/flume-ng/conf/flume-spark-streaming-tail.conf ,内容如下:
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = exec
a3.sources.r1.command = tail -F /var/log/hadoop-yarn/yarn/yarn-yarn-nodemanager-udh-yf-dev-12.log
# Describe the sink
a3.sinks.k1.type = avro
a3.sinks.k1.hostname = udh-yf-dev-12.yonyou.com
a3.sinks.k1.port = 44123
# Use a channel which buffers events in memory
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
3.3. 打包
需要注意的是,如果我们的应用中使用到了 Spark 之外的包,那么需要将这些包一起打包到应用包中,也就是将依赖的包包含进来。通过在 Maven 中配置 Build 选项实现这一点。
3.4. 启动应用
启动命令如下:
spark-submit --class org.apache.spark.examples.streaming.JavaFlumeEventCount2 \
--master yarn-cluster \
--deploy-mode cluster \
--num-executors 3 \
--driver-memory 500m \
--executor-memory 500m \
--executor-cores 1 \
/tmp/SparkTest.jar \
udh-yf-dev-12.yonyou.com \
44123
其中 /tmp/SparkTest.jar 是应用包, udh-yf-dev-12.yonyou.com 是 Flume sink 的目标主机, 44123 是 Flume sink 的目标端口, org.apache.spark.examples.streaming.JavaFlumeEventCount2 是具体的应用。
3.5. 启动F l ume Agent
启动命令如下:
flume-ng agent -n a3 -c /etc/flume-ng/conf -f /etc/flume-ng/conf/flume-spark-streaming-tail.conf
其中阿 a3 是定义的 Agent 名称, /etc/flume-ng/conf/flume-spark-streaming-tail.conf 是前面提到的 Flume 配置文件。
启动后, Flume 将传输新增的文件数据至指定的主机和端口,我们的 Spark Streaming 程序将监听那个端口并处理接收到的数据。
3.6. 状态监控
可打开 Yarn 的 UI 查看作业运行状态,我们可以看到有一个 Spark 应用在持续运行。
3.7. 结果确认
打开 HBase Shell ,查询指定表,可看到数据在不断变化。
Attachments:
图4.png (image/png)
图3.png (image/png)
图2.png (image/png)
图1.png (image/png)
专题_基于UDH的一种实时查询系统构建方案.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)
基于UDH的一种实时查询系统构建方案 - 知识库 - 数据处理平台知识库.htm (text/html)
专题 - Spark Streaming的概要与应用.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)
专题 - Spark Streaming的概要与应用.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)
专题 - Spark Streaming的概要与应用.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)
专题 - Spark Streaming的概要与应用.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)