Spark源码学习(10)——Spark Streaming

本文要解决的问题:

从源码级别对Spark Streaming进行简单学习。


Summarize

Spark Streaming实现了对实时流数据的高吞吐量、低容错的数据处理API。它的数据来源有很多种:Kafka、Flume、Twitter、ZeroMQ、TCP Scoket等。架构图如下:

这里写图片描述

Streaming接收实时流输入的数据,将其按批划分,然后交给Spark Enigne分批处理。如下图所示:

这里写图片描述


StreamingContext

和SparkContext相似。要使用Spark的流处理就必须创建StreamingContext对象。


DStream

DStream是Spark Streaming的是一个抽象类,离散流,它表示一个连续的流。是Spark的一个不可变的分布式数据抽象。

DStream上都用的到任何操作都会转换成底层的RDDs操作。而这些底层RDDs转换是由Spark Engine计算的。

DStream Transformation

离散流转换。DStream支持多种变换的基本SparkRDD使用。

UpdateStateByKey 有状态操作。

UpdateStateByKey在有新的数据信息进入或更新时,可以让用户保持想要的任何状。使用这个功能需要完成两步:

1)定义状态:可以是任意数据类型

2)定义状态更新函数:用一个函数指定如何使用先前的状态,从输入流中的新值更新状态。

对于有状态操作,要不断的把当前和历史的时间切片的RDD累加计算,随着时间的流失,计算的数据规模会变得越来越大。

转换操作 无状态

对于无状态的操作,每一次操作都只是计算当前时间切片的内容,例如每次只计算1s的时间所产生的RDD数据

Window操作

Window操作是针对特定时间并以特定时间间隔为单位进行滑动的操作。比如在1s为时间切片的情况下,统计最近10min的SparkStreaming产生的数据。并且没2min更新一次。



NetworkWordCount

NetworkWordCount是一个单词统计的测试类位于:org.apache.spark.examples.streaming下。

object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    //每一行以空格划分单词
    val words = lines.flatMap(_.split(" "))
    //统计数量
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

下面开始深入分析这段代码:

创建TCP Socket

在上述源码中ssc.socketTextStream(…)创建了TCP Scoket。里面有3个参数:

1)主机名

2)端口

3)stoageLevel,这是个存储对象,默认是放内存和磁盘并且是2份。这个可以自己设置。

继续跟踪socketTextStream中的socketStream方法,发现里面new了一个SocketInputDStream。

  def socketStream[T: ClassTag](
      hostname: String,
      port: Int,
      converter: (InputStream) => Iterator[T],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[T] = {
    new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
  }

SocketInputDStream就是一个DStream了。从源码中可以看出:

SocketInputDStream extendsReceiverInputDStream。而ReceiverInputDStream又extends DStream。

Ok,继续说SocketInputDStream。SocketInputDStream重写了ReceiverInputDStream中的getReceiver方法:

def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel)
  }

这里是new了一个SocketReceiver。

SocketReceiver中的onStart调用了receive这个方法:

def onStart() {

    logInfo(s"Connecting to $host:$port")
    try {
      socket = new Socket(host, port)
    } catch {
      case e: ConnectException =>
        restart(s"Error connecting to $host:$port", e)
        return
    }
    logInfo(s"Connected to $host:$port")

    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() }
    }.start()
  }

下面看receive的具体实现:

def receive() {
    try {
    //用bytesToObjects把InputStream转换成一行行的字符串
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next())
      }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }
    } catch {
      case NonFatal(e) =>
        logWarning("Error receiving data", e)
        restart("Error receiving data", e)
    } finally {
      onStop()
    }
  }
}

这里有个关键的方法store(iterator.next),OK,跟踪进去。

def store(dataItem: T) {
    supervisor.pushSingle(dataItem)
  }

Executor是ReciverSupervisor类型。这个pushSingle就是push数据列表到backend数据存储中。


启动StreamingContext

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))

可以发现lines的类型就是SocketInputDStream,然后对他进行一些转换操作(flatMap、map)。这些转换操作都是SocketInputDStream特有的。

最后一步操作就是reduceByKey(+ )。这里的reduceByKey(+ )和RDD的一样都是调用了combineByKey方法。那不一样的地方就是它调用了ShuffledDStream。源码如下:

def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
      partitioner, mapSideCombine, serializer)(null)
  }

Ok,继续跟踪ShuffledDStream。ShuffledDStream继承了DStream并且实现了compute方法。

 override def compute(validTime: Time): Option[RDD[(K, C)]] = {
    parent.getOrCompute(validTime) match {
      case Some(rdd) => Some(rdd.combineByKey[C](
          createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
      case None => None
    }
  }

这个方法根据validTime获取RDD进行reduceByKey。再次回到NetworkWordCount。


启动StreamingContext

再次回到NetworkWordCount。面前分源码分析,数据切割动作转换做看完了。现在开始启动StreamingContext。ssc.start()。跟踪源码如下:

 def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try {
            validate()

            // Start the streaming scheduler in a new thread, so that thread local properties
            // like call sites and job groups can be reset without affecting those of the
            // current thread.
            ThreadUtils.runInNewThread("streaming-start") {
              sparkContext.setCallSite(startSite.get)
              sparkContext.clearJobGroup()
              sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
              savedProperties.set(SerializationUtils.clone(
                sparkContext.localProperties.get()).asInstanceOf[Properties])
              scheduler.start()
            }
            state = StreamingContextState.ACTIVE
          } catch {
            case NonFatal(e) =>
              logError("Error starting the context, marking it as stopped", e)
              scheduler.stop(false)
              state = StreamingContextState.STOPPED
              throw e
          }
          StreamingContext.setActiveContext(this)
        }
        shutdownHookRef = ShutdownHookManager.addShutdownHook(
          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
        // Registering Streaming Metrics at the start of the StreamingContext
        assert(env.metricsSystem != null)
        env.metricsSystem.registerSource(streamingSource)
        uiTab.foreach(_.attach())
        logInfo("StreamingContext started")
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }

这里重点要看的是scheduler.start()这行。scheduler是JobScheduler实例化变量。继续进入start方法。

def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    listenerBus.start()
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)
    executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
      ssc.sparkContext,
      receiverTracker,
      ssc.conf,
      ssc.graph.batchDuration.milliseconds,
      clock)
    executorAllocationManager.foreach(ssc.addStreamingListener)
    receiverTracker.start()
    jobGenerator.start()
    executorAllocationManager.foreach(_.start())
    logInfo("Started JobScheduler")
  }

下面看一下三个start方法

StreamingListenerBus这是个事件监听器,比较简单。

启动ReceiverTracker

ReceiverTracker的start源码如下:

def start(): Unit = synchronized {
    if (isTrackerStarted) {
      throw new SparkException("ReceiverTracker already started")
    }

    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
      if (!skipReceiverLaunch) launchReceivers()
      logInfo("ReceiverTracker started")
      trackerState = Started
    }
  }

if(!receiverInputStreams.isEmpty).。这里要判断receiverInputStreams。receiverInputStreams是在SocketInputDStream的父类InputDStream当中,当实例化InputDStream的时候在DStreamGraph里面添加了InputStrem。

本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。
1.前言 HBase是云计算环境下最重要的NOSQL数据库,提供了基于Hadoop的数据存储、索引、查询,其最大的优点就是可以通过硬件的扩展从而几乎无限的扩展其存储和检索能力。但是HBase与传统的基于SQL语言的关系数据库无论从理念还是使用方式上都相去甚远,以至于要将基于SQL的项目移植到HBase时往往需要重写整个项目。 为了解决这个问题,很多开源项目提供了HBase的类SQL中间件,意即提供一种在HBase上使用的类SQL语言,使得程序员能够像使用关系数据库一样使用HBase, Apache Pho
为什么要用VXLAN 随着云计算数据中心的大规模建设与运营,传统的依赖VLAN技术的二层网络技术面临着越来越多的问题: vlan的数量限制   4096个vlan远不能满足大规模云计算数据中心的需求 物理网络基础设施的限制    基于IP子网的区域划分限制了需要二层网络连通性的应用负载的部署 TOR交换机MAC表耗尽     虚拟化以及东西向流量导致更多的MAC表项 多租户场景 租户可以自定义网络,且无需考虑与其他租户IP地址的重叠。 目前解决这些问题的主要方案是基于overlay的大二层网络技术。典型的
本节我们讨论 volume 的 Backup 操作。 Backup 是将 volume 备份到别的地方(备份设备),将来可以通过 restore 操作恢复。 Backup VS Snapshot 初看 backup 功能好像与 snapshot 很相似,都可以保存 volume 的当前状态,以备以后恢复。但二者在用途和实现上还是有区别的,具体表现在: Snapshot 依赖于源 volume,不能独立存在;而 backup 不依赖源 volume,即便源 volume 不存在了,也可以 restore。

LibSVM在Java中的简单应用 - 2016-07-14 18:07:36

首先,在这里首先感谢台湾林智仁先生的开源工具包libsvm。使SVM算法更加普及。大家可以到下面的libsvm官网去了解相关的信息。 Libsvm官方网站- https://www.csie.ntu.edu.tw/~cjlin/libsvm/ 其次,我在使用过程中发现,先生svm_scale文件中无法将经过规约的文件输出到本地txt文件中,只能在控制台重定向,而我并不想在程序运行中打开控制台进行较为繁琐的操作。 所以我改造了svm_scale文件,实现了文件的写入,在这里可以和大家分享一下。 改造后新增参
最新消息 Docker在上周的DockerCon技术大会上发布了1.12版核心产品Docker Engine,最大的新特性是Docker Swarm已经被整合到了Docker Engine里面而不再是一个单独的工具了,这样就可以更容易的把多个Docker主机组合成一整个规模更大可靠性更高的逻辑单元。Docker的掌舵者 Adrian Mouat相信这种新的集群模式可以大大增强Docker在相关领域的竞争力。 把Docker Swarm整合进Docker Engine是一个重大改进,但它也只是一个附加功能,
本文记录在3台物理机上搭建Hadoop 2.6.0的详细步骤及碰到的问题解决。默认使用root账号操作,实际中建议使用专用的hadoop用户账号。 1. 环境 机器: 物理机3台,ip分别为192.168.1.130、192.168.1.132、192.168.1.134 操作系统: CentOS 6.6 Java: 1.7 Hadoop: 2.6.0 请确保JDK已安装,使用 java -version 确认。 hosts配置 配置主机hosts文件: vim /etc/hosts 192.168.1.

Docker的安装配置及使用详解 - 2016-07-14 14:07:12

基本概念 Docker 包括三个基本概念 镜像(Image) 容器(Container) 仓库(Repository) 先理解了这三个概念,就理解了 Docker 的整个生命周期。 1、docker安装与启动 yum install -y epel-releaseyum install docker-io # 安装docker # 配置文件 /etc/sysconfig/docker chkconfig docker on # 加入开机启动 service docker start # 启动docker服
mahout之推荐系统源码笔记(4) —总结与优化 花了几天的时间阅读分析了mahout推荐系统中基于java单机和基于hadoop的分布式mapreduce源码。根据其推荐系统hadoop程序的job划分写了笔记1、2、3。在这里,基于笔记1,2,3做一个总结。 我们先从相似度开始。 什么是相似度,就是我们在构建推荐系统时,基于user或者基于item都需要计算出相应的候选item或者是user。那么在mahout的hadoop程序中,他运用的是基于item的推荐系统,同样的,也需要计算相似度。 计算相
分享主要分为以下五个部分: HAWQ基本介绍; HAWQ架构以及各重要组件的基本原理; HAWQ的中短期规划; 如何贡献到HAWQ和成为Apache Committer; Q A。 一、HAWQ基本介绍 HAWQ是一个Hadoop原生大规模并行SQL分析引擎,针对的是分析性应用。和其他关系型数据库类似,接受SQL,返回结果集。但它具有大规模并行处理很多传统数据库以及其他数据库没有的特性及功能。主要如下: 对标准的完善支持:ANSI SQL标准,OLAP扩展,标准JDBC/ODBC支持,比其他Hadoop
Hadoop 的 HDFS 集群在使用一段时间后,各个 DataNode 节点的磁盘使用率肯定会出现不平衡的情况,也就是数据量层面的数据倾斜,如图: 引起这种情况的方式很多: 1.        添加新的 Datanode 节点 2.        人为干预将数据的副本数降低或者增加   我们都知道当 HDFS 出现数据不平衡的时候,就会造成 MapReduce 或 Spark 等应用程序无法很好的利用本地计算的优势,而且 Datanode 节点之间也没有更好的网络带宽利用率,某些 Datanode 节点