Spark Streaming 基于拉方式处理 Flume-ng 数据源( Pull-based Approach using a Custom Sink )
目录
Spark Streaming 基于拉方式处理 Flume-ng 数据源 (Pull-based Approach using a Custom Sink)
Flume-ng 聚集和移动大量日志数据的分布式的,有效的服务。这里我们解释怎样配置 Flume -ng 和 Spa rk Streaming 来从 Flume 获取数据。这里 介绍第二种方法 。
1. 使用自定义 Sink 的拉方式
不是 Flume 直接推送数据到 SparkStreaming ,这种方法运行了一个如下所示的 Flume Sink 。
1. Flume 将数据推送到 Sink 中,然后数据在此处缓存。
2. Spark Streaming 使用一个可靠的 Flume 接收器和操作从 Sink 中拉取数据。只有在 Spark Streaming 接收到数据并且把数据复制后才认为操作成功。
这个方法比前面的方法提供了强大的可靠性和容错保证。然而,这需要配置 Flume 运行一个自定义 Sink 。下面是配置步骤。
1.1 一般需求
选择一台在 Flume 代理中运行自定义 Sink 的机器。 Flume 其余的管道被配置为向那个代理发送数据。 Spark 集群中的机器都能连接到运行自定义 Sink 的那台机器上。
1.2 配置 Flume
在选定的机器上配置 Flume 需要如下的两步。
A . 添加如下的 JAR 包到要运行自定义 Sink 的机器中的 Flume 的 classpath 中 (这里我把如下 jar 放在 /usr/lib/flume-ng/lib/ 目录下)
spark-streaming-flume-sink_2.10-1.3.0-cdh5.4.3.jar
commons-lang3 -3.3.2.jar
B. 配置文件:在那台机器上,通过下面的配置文件配置 Flume 代理发送数据到一个 Avro Sink 中。
agent.sinks = spark
agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark.hostname = <hostname of the local machine>
agent.sinks.spark.port = 41444
agent.sinks.spark.channel = memoryChannel
2. 配置 Spark Streaming 程序
2.1 编程:在流处理程序的代码中,引入 FlumeUtils 并如下创建一个输入 DStream 流。 这里给出 Spark Java 程序例子
import java.util.Arrays ;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction ;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.apache.spark.storage.StorageLevel;
public class SparkStreamingFlume2 {
public SparkStreamingFlume2() {
}
public static void main(String[] args) {
if (args. length != 2) {
System. err .println( "Usage: JavaFlumeEventCount1 <host> <port>" );
System. exit (1);
}
StreamingExamples. setStreamingLogLevels ();
String host = args[0];
int port = Integer. parseInt (args[1]);
Duration batchInterval = new Duration(2000);
SparkConf sparkConf = new SparkConf().setAppName( "JavaFlumeEventCount" );
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
StorageLevel storagelevel = StorageLevel. MEMORY_ONLY ();
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils. createPollingStream (ssc, host, port,storagelevel);
flumeStream.count();
flumeStream.count().map( new Function<Long, String>() {
@Override
public String call(Long in) {
return "Received " + in + " flume events." ;
}
}).print();
ssc.start();
ssc.awaitTermination();
}
2.2 启动 Flume 这里主要需要添加 spark-streaming-flume-sink_2.10-1.3.0-cdh5.4.3.jar 和 commons-lang3-3.3.2.jar 到 $FLUME_HOME/lib 目录下
F lume-ng agent – c /etc/flume-ng/conf – f /etc/flume-ng/conf/flume.conf – Dflume.root.logger=DEBUG,console – n agent02
2.3 提交 Spark ,这里需要注意的添加必要的 jar 包,可以在提交的时候加上 --jars 来指定相关的 jar 包,也可以在 sc 中调用 addJar() 添加
spark-submit --master spark://udh-spark-test-04:7077 --class SparkStreamingFlume2 --jars /root/spark-streaming-flume-sink_2.10-1.3.0-cdh5.4.3.jar,/usr/lib/flume-ng/lib/flume-ng-sdk-1.5.0-cdh5.4.3.jar,/usr/lib/spark/lib/spark-examples-1.3.0-cdh5.4.3-hadoop2.6.0-cdh5.4.3.jar,/usr/lib/spark/lib/spark-assembly-1.3.0-cdh5.4.3-hadoop2.6.0-cdh5.4.3.jar /root/flume-test02.jar udh-spark-test-03 41444