Spark-Streaming

Spark Streaming 介绍

Spark Streaming 概述

什么是 Spark Streaming

在这里插入图片描述

     Spark Streaming 类似于 Apache Storm,用于流式数据的处理。根据官方文档介绍,Spark Streaming 有高吞吐量和容错能力强等特点。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象操作如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如 HDFS , 数据库等。另外 Spark Streaming 也能和 MLlib(机器学习)以及 Graphx 完美融合。

在这里插入图片描述

为什么要学习 Spark Streaming

  1. 易用
    在这里插入图片描述
  2. 容错
    在这里插入图片描述
  3. 易整合到 Spark 体系
    在这里插入图片描述

SparkStreaming 与 Storm 对比

SparkStreaming Storm
在这里插入图片描述 在这里插入图片描述
开发语言:Scala 开发语言:Clojure
编程模型:DStream 编程模型:Spoot/Bolt

Spark Streaming 原理

原理

     Spark Streaming 是基于 Spark 的流式批处理引擎,其基本原理是把==输入的数据以某一时间间隔批量的处理==,当批处理间隔缩短到==秒级==时,便可以用于处理实时数据流。

Spark Streaming 计算流程

     Spark Streaming 是将流式计算分解成一系列短小的批处理作业。这里的批处理引擎是 Spark Core,也就是把 Spark Streaming 的输入数据按照 batch size (如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成 Spark 中的 RDD (Resilient Distributed Dataset),然后将 Spark Streaming 中对 DStream 的 Transformtion 操作变为针对 Spark 中对 RDD 的。

     Transformation 操作,将 RDD 经过操作编程中间结果保存在内存中,整个流式计算根据业务的需求可以对中间的结果进行缓存或者存储到外部设备。下图为 Spark Streaming 的整个流程:

在这里插入图片描述

Spark Streaming 容错性

     对于流失计算来说,容错性至关重要。首先我们要明确一下 Spark 中 RDD 的容错机制。每一个 RDD 都是不可变的分布式可重算的数据集,其记录着确定性操作继承关系(lineage),所以只要输入数据是可容错的,那么任意一个 RDD 的分区(Partition)出错或不可用,都是可以利用原始输入数据通过转换操作而重新算出的。

     对于 Spark Streaming 来说,其 RDD 的传承关系如下图所示:

在这里插入图片描述

     途中的每一个椭圆表示一个 RDD ,椭圆形中的每个圆形代表一个 RDD 中的一个 Partition ,图中的每一列的多个 RDD 表示一个 DStream(图中有三个 DStream),而每一行最后一个 RDD 则表示每一个 Batch Size 所产生的中间结果 RDD。我们可以看到图中的每一个 RDD 都是通过 lineage 相连接的,由于 Spark Streaming 输入数据可以来自于磁盘,例如 HDFS(多分拷贝)或是来自于网络的数据流(Spark Streaming 会将网络输入数据的每一个数据流拷贝两份到其他机器)都能保证容错性,所以 RDD 中任意的 Partition 出错,都可以并行地在其他机器上将缺失的 Partition 计算出来。这个容错恢复方式比连续计算模型(如 Storm)的效率更高

Spark Streaming 实时性

     对于实时性的讨论,会牵涉到流式处理框架的应用场景。Spark Streaming 将流式计算分解成多个 Spark Job,对于每一段数据的处理都会经过 Spark DAG 图分解以及 Spark 的任务集的调度过程。对于目前版本的 Spark Streaming 而言,其最小的 Batch Size 的选取在 0.5 ~ 2秒之间(Storm 目前最小的延迟是 100ms左右),所以 Spark Streaming 能满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。

DStream

什么是 DStream

     Discretized Stream 是 Spark Streaming 的基础抽象,代表连续性的数据流和经过各种 Spark 算子操作后的结果数据流。在内部是线上。DStream 是一系列连续的 RDD 来表示。每个 RDD 含有一段时间间隔内的数据,如下图

在这里插入图片描述

     对数据的操作也是按照 RDD 为单位来进行的

在这里插入图片描述

     Spark Streaming 使用数据源产生的数据流创建 DStream,也可以在已有的 DStream 上使用一些操作来创建新的 DStream。

     它的工作流程像下面的图所示一样,接受到实时数据后,给数据分批次,然后传给 Spark Engine 处理最后生成该批次的结果。

在这里插入图片描述

DStream 相关操作

     DStream 上的操作与 RDD 类似,分为 Transformations (转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的操作,如:updateStateByKey()、transform()以及各种 Window 相关的操作。

Transformations on DStreams

Transformations Meaning
map(func) 对DStream中的各个元素进行func函数操作,然后返回一个新的DStream
flatMap(func) 与map方法类似,只不过各个输入项可以被输出为零个或多个输出项
filter(func) 过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream
repartition(numPartitions) 增加或减少DStream中的分区数,从而改变DStream的并行度
repartition(numPartitions) 增加或减少DStream中的分区数,从而改变DStream的并行度
union(otherStream) 将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.
count() 通过对DStream中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream
reduce(func) 对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream.
countByValue() 对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数
reduceByKey(func, [numTasks]) 利用func函数对源DStream中的key进行聚
合操作,然后返回新的(K,V)对构成的DStream
join(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStream
cogroup(otherStream, [numTasks]) 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream
transform(func) 通过RDD-to-RDD函数作用于DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD
updateStateByKey(func) 根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的DStream

特殊的Transformations

  • UpdateStateByKey Operation
    UpdateStateByKey用于记录历史记录,保存上次的状态

  • Window Operations(开窗函数,滑动窗口转换操作:)

          滑动窗口转换操作的计算过程如下图所示,我们可以事先设定一个滑动窗口的长度(也就是窗口的持续时间), 并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算),然后就可以让窗口按照指定时间间隔在源 DStream 上滑动,每次窗口停放的位置上,都会有一部分 DStream 被框入窗口内,形成一个小段 DStream ,这是,就可以启动对这个小段 DStream 的计算。

    在这里插入图片描述

    1. 红色的矩形就是一个窗口,窗口框住的是一段时间内的数据流。

    2. 这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。

    3. 所以基于窗口的操作,需要指定2个参数:
      window length - The duration of the window (3 in the figure)

      slide interval - The interval at which the window-based operation is performed (2 in the figure).
      窗口大小:一段时间内数据的容器。
      滑动间隔:每隔多久计算一次。

Output Operations on DStreams

     Output Operations 可以将 DStream 的数据输出到外部的数据库或文件系统,当某个 Output Operations 被调用时(与 RDD 的 Action 相同),spark streaming 程序才会真正的计算过程。

Output Operation Meaning
print() 打印到控制台
saveAsTextFiles(prefix, [suffix]) 保存流的内容为文本文件,文件名为"prefix-TIME_IN_MS[.suffix]".
saveAsObjectFiles(prefix, [suffix]) 保存流的内容为SequenceFile,文件名为 “prefix-TIME_IN_MS[.suffix]”.
saveAsHadoopFiles(prefix, [suffix]) 保存流的内容为hadoop文件,文件名为"prefix-TIME_IN_MS[.suffix]".
foreachRDD(func) 对Dstream里面的每个RDD执行func

DStream 操作实战

SparkStreaming 接收 socket 数据,实现单词技术 WordCound

架构图

在这里插入图片描述

实现流程

  1. 安装并启动生产者

     首先在 linux 服务器上用 Yum 安装 nc 工具,命令为 netcat 命令的简称,都是用来设置路由器。我们可以利用它像某个端口发送数据。

     yun install -y nc

  1. 通过 netcat 工具像指定的端口发送数据

     nc -lk 9999

  1. 编写 Spark Streaming 程序
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * sparkStreming流式处理接受socket数据,实现单词统计
      */
    object  SparkStreamingTCP {
    
      def main(args: Array[String]): Unit = {
        //配置sparkConf参数
        val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingTCP").setMaster("local[2]")
        //构建sparkContext对象
        val sc: SparkContext = new SparkContext(sparkConf)
        //设置日志输出级别
        sc.setLogLevel("WARN")
        //构建StreamingContext对象,每个批处理的时间间隔
        val scc: StreamingContext = new StreamingContext(sc,Seconds(5))
        //注册一个监听的IP地址和端口  用来收集数据
        val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.200.160",9999)
        //切分每一行记录
        val words: DStream[String] = lines.flatMap(_.split(" "))
        //每个单词记为1
        val wordAndOne: DStream[(String, Int)] = words.map((_,1))
        //分组聚合
        val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
        //打印数据
        result.print()
        scc.start()
        scc.awaitTermination()
      }
    }
    

         由于使用的是本地模式 local[2] 所以可以在本地运行该程序。

         注意:要指定并行度,如在本地运行设置 setMaster("local[2]"),相当于启动两个线程,一个给 received,一个给 computer。如果是在集群中运行,必须要求集群中可的 core 数大于1。

执行查看效果

  1. 先执行 nc -lk 9999
    在这里插入图片描述
  2. 在执行代码
    在这里插入图片描述
  3. 不断的在 1. 中输入不同的单词,观察 IDEA 控制台的输出
    在这里插入图片描述
    在这里插入图片描述

     现象:SparkStreaming 每隔 5s 计算一次当前 5s 内的数据,然后将每个批次的数据输出。

SparkStreaming 接受 socket 数据,实现单词计数累加

      在上面的案例中存在这样一个问题:每个批次的单词都被正确的统计出来,但是结果不能累加!如果需要累加需要使用 updateStateByKey(func)来更新状态。

架构图

在这里插入图片描述

实现流程

  1. 安装并启动生成者

     首先在linux服务器上用YUM安装nc工具,nc命令是netcat命令的简称,都是用来设置路由器。我们可以利用它向某个端口发送数据。

     yum install -y nc

  1. 启动一个服务端并监听9999端口

         nc -lk 9999

         向指定的端口发送数据

  2. 编写 SparkStreaming 程序

    	import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * sparkStreaming流式处理,接受socket数据,实现单词统计并且每个批次数据结果累加
      */
    object SparkStreamingTCPTotal {
    
      //newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1
      //runningCount 历史的所有相同key的value总和
      def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
        val newCount =runningCount.getOrElse(0)+newValues.sum
        Some(newCount)
      }
    
    
      def main(args: Array[String]): Unit = {
    
        //配置sparkConf参数
        val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingTCPTotal").setMaster("local[2]")
        //构建sparkContext对象
        val sc: SparkContext = new SparkContext(sparkConf)
    
        sc.setLogLevel("WARN")
        //构建StreamingContext对象,每个批处理的时间间隔
        val scc: StreamingContext = new StreamingContext(sc, Seconds(5))
    
        scc.checkpoint("./")
        //注册一个监听的IP地址和端口  用来收集数据
        val lines: ReceiverInputDStream[String] = scc.socketTextStream("bw01", 9999)
        //切分每一行记录
        val words: DStream[String] = lines.flatMap(_.split(" "))
        //每个单词记为1
        val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
        //累计统计单词出现的次数
        val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunction)
        result.print()
        scc.start()
        scc.awaitTermination()
      }
    }
    
    

         通过函数 updateStateByKey 实现。根据于 key 的前置状态和 key 的新值,对 key 进行更新,返回一个新状态的 DStream。

执行查看效果

  1. 执行 nc -lk 9999
    在这里插入图片描述
  2. 然后执行以上代码
    在这里插入图片描述
  3. 不断的在 1. 中输入不同的单词,观察 IDEA 控制台输出
    在这里插入图片描述
    在这里插入图片描述

     现象:sparkStreaming每隔5s计算一次当前5s内的数据,然后将每个批次的结果数据累加输出。

SparkStreaming开窗函数reduceByKeyAndWindow,实现单词计数

架构图

在这里插入图片描述

实现流程

  1. 安装并启动生产者

         首先在 linux 服务器上用 YUM 安装 nc 工具,nc 命令是 netcat 命令的简称,都是用来设置路由器。我们可以利用它向某个端口发送数据。

         yum install -y nc

  2. 启动一个服务器 并监听 9999 端口

         nc -lk 9999 ,向指定的端口发送数据。

  3. 编写 Spark Streaming 程序
	import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
	import org.apache.spark.streaming.{Seconds, StreamingContext}
	import org.apache.spark.{SparkConf, SparkContext}
	
	/**
	  * sparkStreming开窗函数---统计一定时间内单词出现的次数
	  */
	object SparkStreamingTCPWindow {
	
	  def main(args: Array[String]): Unit = {
	    //配置sparkConf参数
	    val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingTCPWindow").setMaster("local[2]")
	    //构建sparkContext对象
	    val sc: SparkContext = new SparkContext(sparkConf)
	    sc.setLogLevel("WARN")
	    //构建StreamingContext对象,每个批处理的时间间隔
	    val scc: StreamingContext = new StreamingContext(sc,Seconds(5))
	    //注册一个监听的IP地址和端口  用来收集数据
	    val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.200.160",9999)
	    //切分每一行记录
	    val words: DStream[String] = lines.flatMap(_.split(" "))
	    //每个单词记为1
	    val wordAndOne: DStream[(String, Int)] = words.map((_,1))
	    //reduceByKeyAndWindow函数参数意义:
	    // windowDuration:表示window框住的时间长度,如本例5秒切分一次RDD,框10秒,就会保留最近2次切分的RDD
	    //slideDuration:  表示window滑动的时间长度,即每隔多久执行本计算
	    val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(10),Seconds(5))
	    result.print()
	    scc.start()
	    scc.awaitTermination()
	  }
	}

执行查看效果

  1. 先执行 nc -lk 9999
    在这里插入图片描述
  2. 然后在执行以上代码
    在这里插入图片描述
  3. 断的在(1)中输入不同的单词,观察IDEA控制台输出
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

     现象: Spark Streaming 每个5s 计算一次当前在窗口大小为 10s 内的数据,然后将结果数据输出。

SparkStreaming开窗函数统计一定时间内的热门词汇

架构图

在这里插入图片描述

实现流程

  1. 安装并启动生产者

         首先在 linux 服务器上用 YUM 安装 nc 工具,nc 命令是 netcat 命令的简称,都是用来设置路由器。我们可以利用它向某个端口发送数据。

         yum install -y nc

  2. 启动一个服务器并监听 9999 端口

         nc -lk 9999

  3. 编写 Spark Streaming 程序

	package cn.test.spark
	
	import org.apache.spark.rdd.RDD
	import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
	import org.apache.spark.streaming.{Seconds, StreamingContext}
	import org.apache.spark.{SparkConf, SparkContext}
	
	/**
	  * sparkStreming开窗函数应用----统计一定时间内的热门词汇
	  */
	object SparkStreamingTCPWindowHotWords {
	
	  def main(args: Array[String]): Unit = {
	    //配置sparkConf参数
	    val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingTCPWindowHotWords").setMaster("local[2]")
	    //构建sparkContext对象
	    val sc: SparkContext = new SparkContext(sparkConf)
	    sc.setLogLevel("WARN")
	    //构建StreamingContext对象,每个批处理的时间间隔
	    val scc: StreamingContext = new StreamingContext(sc,Seconds(5))
	    //注册一个监听的IP地址和端口  用来收集数据
	    val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.200.160",9999)
	    //切分每一行记录
	    val words: DStream[String] = lines.flatMap(_.split(" "))
	    //每个单词记为1
	    val wordAndOne: DStream[(String, Int)] = words.map((_,1))
	    //reduceByKeyAndWindow函数参数意义:
	    // windowDuration:表示window框住的时间长度,如本例5秒切分一次RDD,框10秒,就会保留最近2次切分的RDD
	    //slideDuration:  表示window滑动的时间长度,即每隔多久执行本计算
	    val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(5),Seconds(5))
	    val data=result.transform(rdd=>{
	      //降序处理后,取前3位
	      val dataRDD: RDD[(String, Int)] = rdd.sortBy(t=>t._2,false)
	      val sortResult: Array[(String, Int)] = dataRDD.take(3)
	      println("--------------print top 3 begin--------------")
	      sortResult.foreach(println)
	      println("--------------print top 3 end--------------")
	      dataRDD
	    })
	    data.print()
	    scc.start()
	    scc.awaitTermination()
	  }
	}

执行查看效果

  1. 先执行 nc -lk 9999
    在这里插入图片描述
  2. 在执行以上代码
    在这里插入图片描述
  3. 不断的在 1. 中输入不同的单词,观察 IDEA 控制台输出
    在这里插入图片描述
    在这里插入图片描述

     现象:sparkStreaming每隔5s计算一次当前在窗口大小为10s内的数据,然后将单词出现次数最多的前3位进行输出打印。/p>

Spark Streaming整合flume实战

     flume作为日志实时采集的框架,可以与SparkStreaming实时处理框进行对接,flume实时产生数据,sparkStreaming做实时处理。

     Spark Streaming 对接 FlumeNG有两种方式,一种是FlumeNG将消息Push推给Spark Streaming,还有一种是Spark Streaming从flume 中 Poll 拉取数据。

Poll 方式

  1. 安装flume1.6以上
  2. 下载依赖包

         spark-streaming-flume-sink_2.11-2.0.2.jar放入到flume的lib目录下。/p>

  3. 写flume的agent,注意既然是拉取的方式,那么flume向自己所在的机器上产数据就行
  4. 编写flume-poll.conf配置文件
	a1.sources = r1
	a1.sinks = k1
	a1.channels = c1
	#source
	a1.sources.r1.channels = c1
	a1.sources.r1.type = spooldir
	a1.sources.r1.spoolDir = /root/data
	a1.sources.r1.fileHeader = true
	#channel
	a1.channels.c1.type =memory
	a1.channels.c1.capacity = 20000
	a1.channels.c1.transactionCapacity=5000
	#sinks
	a1.sinks.k1.channel = c1
	a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
	a1.sinks.k1.hostname=hdp-node-01
	a1.sinks.k1.port = 8888
	a1.sinks.k1.batchSize= 2000                           

启动flume:

flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-poll.conf -Dflume.root.logger=INFO,console

     服务器上的 /root/data目录下准备数据文件data.txt

在这里插入图片描述
5. 启动 spark-streaming 应用程序,去 flume 所在机器拉取数据
6. 代码实现

     需要添加pom依赖

	<properties>
   	 	<scala.version>2.11.8</scala.version>
    	<hadoop.version>2.7.4</hadoop.version>
    	<spark.version>2.0.2</spark.version>
    	<maven.compiler.source>1.8</maven.compiler.source>
    	<maven.compiler.target>1.8</maven.compiler.target>
	</properties>
	<dependencies>
		<dependency>
    		<groupId>org.apache.spark</groupId>
    		<artifactId>spark-sql_2.11</artifactId>
    		<version>2.0.2</version>
		</dependency>
    	<dependency>
       	 	<groupId>org.apache.spark</groupId>
        	<artifactId>spark-streaming_2.11</artifactId>
        	<version>${spark.version}</version>
    	</dependency>
    		<!--引入spark-streaming-flume的包-->
    	<dependency>
        	<groupId>org.apache.spark</groupId>
        	<artifactId>spark-streaming-flume_2.10</artifactId>
        	<version>2.0.2</version>
    	</dependency>
	</dependencies>

具体代码如下:

 import java.net.InetSocketAddress
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
 import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 
 object FlumeDir {
   def main(args: Array[String]): Unit = {
     val conf: SparkConf = new SparkConf().setAppName("FlumeDir").setMaster("local[2]")
     val sc = new SparkContext(conf)
     sc.setLogLevel("WARN")
     val scc = new StreamingContext(sc,Seconds(5))
     scc.checkpoint("./spark_flume_dir")
 
     //设置flume地址
     val addresses: Seq[InetSocketAddress] = Seq (new InetSocketAddress("192.168.44.127",8888))
     val flumeds: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,addresses,StorageLevel.MEMORY_AND_DISK)
     val lineds: DStream[String] = flumeds.map(x=>new String(x.event.getBody.array()))
     lineds.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
     scc.start()
     scc.awaitTermination()
 
   }
 }

在这里插入图片描述

Push方式

(1)编写flume-push.conf配置文件

#push mode
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname=172.16.43.63
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000                        

      注意配置文件中指明的hostname和port是spark应用程序所在服务器的ip地址和端口

  1. 代码实现如下:
	package cn.test.spark
	
	import java.net.InetSocketAddress
	
	import org.apache.spark.storage.StorageLevel
	import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
	import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
	import org.apache.spark.streaming.{Seconds, StreamingContext}
	import org.apache.spark.{SparkConf, SparkContext}
	
	/**
	  * sparkStreaming整合flume  推模式Push
	  */
	object SparkStreaming_Flume_Push {
	  //newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1
	  //runningCount 历史的所有相同key的value总和
	  def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
	    val newCount =runningCount.getOrElse(0)+newValues.sum
	    Some(newCount)
	  }
	
	
	  def main(args: Array[String]): Unit = {
	    //配置sparkConf参数
	    val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Push").setMaster("local[2]")
	    //构建sparkContext对象
	    val sc: SparkContext = new SparkContext(sparkConf)
	    //构建StreamingContext对象,每个批处理的时间间隔
	    val scc: StreamingContext = new StreamingContext(sc, Seconds(5))
	    //设置日志输出级别
	    sc.setLogLevel("WARN")
	    //设置检查点目录
	    scc.checkpoint("./")
	    //flume推数据过来
	    // 当前应用程序部署的服务器ip地址,跟flume配置文件保持一致
	    val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(scc,"172.16.43.63",8888,StorageLevel.MEMORY_AND_DISK)
	
	    //获取flume中数据,数据存在event的body中,转化为String
	    val lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))
	    //实现单词汇总
	   val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)
	
	    result.print()
	    scc.start()
	    scc.awaitTermination()
	  }
	
	}
	}
  1. 启动执行
    先执行spark代码
    在这里插入图片描述
    然后在执行flume配置文件
    在这里插入图片描述

  2. 观察 IDEA 控制台输出
    在这里插入图片描述

6. Spark Streaming整合kafka实战

     kafka作为一个实时的分布式消息队列,实时的生产和消费消息,这里我们可以利用SparkStreaming实时计算框架实时地读取kafka中的数据然后进行计算。

     在spark1.3版本后,kafkaUtils里面提供了两个创建dstream的方法,一种为KafkaUtils.createDstream,另一种为KafkaUtils.createDirectStream。

KafkaUtils.createDstream方式

     构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] ) 使用了receivers来接收数据,利用的是Kafka高层次的消费者api,对于所有的receivers接收到的数据将会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,它同步将接受到数据保存到分布式文件系统上比如HDFS。 所以数据在出错的情况下可以恢复出来 。

![在这里插入图片描述](https://img-blog.csdnimg.cn/20190813213050229.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3lvdUFyZVJpZGljdWxvdXM=,size_16,color_FFFFFF,t_70)

     创建一个receiver来对kafka进行定时拉取数据,ssc的rdd分区和kafka的topic分区不是一个概念,故如果增加特定主消费的线程数仅仅是增加一个receiver中消费topic的线程数,并不增加spark的并行处理数据数量 。

     对于不同的group和topic可以使用多个receivers创建不同的DStream 。

     如果启用了WAL (spark.streaming.receiver.writeAheadLog.enable=true) 同时需要设置存储级别(默认StorageLevel.MEMORY_AND_DISK_SER_2), 即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER) 。

KafkaUtils.createDstream实战

  1. 添加 kafka 的 pom 依赖

    <properties>
        <scala.version>2.11.8</scala.version>
        <hadoop.version>2.7.4</hadoop.version>
        <spark.version>2.0.2</spark.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.0.2</version>
        </dependency>
    </dependencies>
    
    
    1. 启动 zookeeper 集群
      zkServer.sh start

    2. 启动 kafka 集群
      kafka-server-start.sh /export/servers/kafka/config/server.properties

    3. 创建 topic
      kafka-topics.sh --create --zookeeper hdp-node-01:2181 --replication-factor 1 --partitions 3 --topic kafka_spark

    4. 向 topic 中生产数据
      通过 shell 命令向 topic 发送消息
      kafka-console-producer.sh --broker-list hdp-node-01:9092 --topic kafka_spark
      在这里插入图片描述

    5. 编写 Spark Streaming

    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.immutable
    
    //todo:利用sparkStreaming接受kafka中的数据实现单词计数----采用receivers
    object SparkStreamingKafka_Receiver_checkpoint {
    
      def updateFunc(a:Seq[Int], b:Option[Int]) :Option[Int] ={
        Some(a.sum+b.getOrElse(0))
      }
      def main(args: Array[String]): Unit = {
        val checkpointPath = "./kafka-receiver"
    
        val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
          createFunc(checkpointPath)
        })
        ssc.start()
        ssc.awaitTermination()
      }
      def createFunc(checkpointPath:String): StreamingContext = {
    
        //todo:1、创建sparkConf
         val sparkConf: SparkConf = new SparkConf()
                                    .setAppName("SparkStreamingKafka_Receiver_checkpoint")
                                    .setMaster("local[4]")
                                    //todo:开启wal预写日志
                                    .set("spark.streaming.receiver.writeAheadLog.enable","true")
        //todo:2、创建sparkContext
        val sc = new SparkContext(sparkConf)
    
        sc.setLogLevel("WARN")
    
        //todo:3、创建StreamingContext
        val ssc = new StreamingContext(sc,Seconds(5))
        ssc.checkpoint(checkpointPath)
        //todo:4、指定zkServer
        val zkServer="node1:2181,node2:2181,node3:2181"
    
        //todo:5、指定groupId
        val groupId="spark-kafka-receiver01"
    
        //todo:6、指定topics 这个可以利用一个消费者组来消费多个topic,
        //(topic_name -> numPartitions)  指定topic消费的线程数
        val topics=Map("kafka_spark"->1)
    
        //todo:7、并行运行更多的接收器读取kafak topic中的数据,这里设置3个
        val resultDStream: immutable.IndexedSeq[DStream[String]] = (1 to 3).map(x => {
          //todo:8、通过使用KafkaUtils的createStream接受kafka topic中的数据,生成DStream
          val kafkaDataDStream: DStream[String] = KafkaUtils.createStream(ssc, zkServer, groupId, topics).map(x => x._2)
          kafkaDataDStream
        }
        )
        //todo:利用StreamContext将所有的DStream组合在一起
        val kafkaDStream: DStream[String] = ssc.union(resultDStream)
    
        //todo:8、获取kafka中topic的内容
    
        //todo:9、切分每一行。每个单词记为1
        val wordAndOne: DStream[(String, Int)] = kafkaDStream.flatMap(_.split(" ")).map((_,1))
    
        //todo:10、相同单词出现的次数累加
        val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
    
        //todo:打印
        result.print()
        ssc
    
      }
    
    }
    
    1. 运行代码,查看控制台数据

    在这里插入图片描述

         通过这种方式实现,刚开始的时候系统正常运行,没有发现问题,但是如果系统异常重新启动sparkstreaming程序后,发现程序会重复处理已经处理过的数据,这种基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。官方现在也已经不推荐这种整合方式,官网相关地址下面我们使用官网推荐的第二种方式kafkaUtils的createDirectStream()方式。

KafkaUtils.createDirectStream方式

     不同于Receiver接收数据,这种方式定期地从kafka的topic下对应的partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,Spark通过调用kafka简单的消费者Api读取一定范围的数据。

在这里插入图片描述
相比基于Receiver方式有几个优点:

简化并行:

     不需要创建多个kafka输入流,然后union它们,sparkStreaming将会创建和kafka分区一种的rdd的分区数,而且会从kafka中并行读取数据,spark中RDD的分区数和kafka中的分区数据是一一对应的关系。

高效:

     第一种实现数据的零丢失是将数据预先保存在WAL中,会复制一遍数据,会导致数据被拷贝两次,第一次是被kafka复制,另一次是写到WAL中。而没有receiver的这种方式消除了这个问题。

恰好一次语义(Exactly-once-semantics):

     Receiver读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,虽然这种方法可以通过数据保存在WAL中保证数据不丢失,但是可能会因为sparkStreaming和ZK中保存的偏移量不一致而导致数据被消费了多次。EOS通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具

  1. 依赖到上面的案例即可,其余步骤也相同,接下来代码实现:
	import kafka.serializer.StringDecoder
	import org.apache.spark.streaming.dstream.InputDStream
	import org.apache.spark.streaming.kafka.KafkaUtils
	import org.apache.spark.streaming.{Seconds, StreamingContext}
	import org.apache.spark.{SparkConf, SparkContext}
	
	object KafkaTest2 {
	
	  def updateFunction(olds:Seq[Int],news:Option[Int]) : Option[Int] = {
	    Some(olds.sum + news.getOrElse(0))
	  }
	
	  def main(args: Array[String]): Unit = {
	      val conf: SparkConf = new SparkConf().setAppName("testkafka2").setMaster("local[2]")
	      val sc = new SparkContext(conf)
	      sc.setLogLevel("ERROR")
	      val scc = new StreamingContext(sc,Seconds(5))
	      scc.checkpoint("./spark_kafka")
	      val brokers = "node2:9092,node3:9092,node4:9092"
	      val topicSet = "mdj".split(",").toSet
	      val kafkaParam = Map("metadata.broker.list"->brokers)
	
	      val kafkads: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](scc,kafkaParam,topicSet)
	      kafkads.map(_._2).map(_.split(" ")).map((_,1)).updateStateByKey(updateFunction).print()
	
	      scc.start()
	      scc.awaitTermination()
	
	  }
	}
  1. 查看效果
    向 topic 中添加数据
    在这里插入图片描述
    查看控制台的输出:在这里插入图片描述

更多精彩内容