Spark的广播和累加器的使用

一、广播变量和累加器

1.1 广播变量:

广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。
Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。

1.2 累加器:

累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和总和。Spark原生地只支持数字类型的累加器,编程者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程。(对于python还不支持)
累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。

二.Java和Scala版本的实战演示

2.1 Java版本:

package com.Streaming;

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaStreamingContext;


import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import scala.Tuple2;

import java.util.*;

/**
 * 利用广播进行黑名单过滤!
 *
 * 无论是计数器还是广播!都不是想象的那么简单!
 * 联合使用非常强大!!!绝对是高端应用!
 *
 * 如果 联合使用扩展的话,该怎么做!!!
 *
 * ?
 */
public class BroadcastAccumulator {

    /**
     * 肯定要创建一个广播List
     *
     * 在上下文中实例化!
     */
    private static volatile Broadcast<List<String>> broadcastList = null;

    /**
     * 计数器!
     * 在上下文中实例化!
     */
    private static volatile Accumulator<Integer> accumulator = null;

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setMaster("local[2]").
                setAppName("WordCountOnlieBroadcast");

        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));


        /**
         * 没有action的话,广播并不会发出去!
         *
         * 使用broadcast广播黑名单到每个Executor中!
         */
        broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));

        /**
         * 全局计数器!用于统计在线过滤了多少个黑名单!
         */
        accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter");


        JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999);


        /**
         * 这里省去flatmap因为名单是一个个的!
         */
        JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) {
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) {
                return v1 + v2;
            }
        });

        /**
         * Funtion里面 前几个参数是 入参。
         * 后面的出参。
         * 体现在call方法里面!
         *
         * 这里直接基于RDD进行操作了!
         */
        wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
            @Override
            public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {
                rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
                    @Override
                    public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {
                        if (broadcastList.value().contains(wordPair._1)) {

                            /**
                             * accumulator不应该仅仅用来计数。
                             * 可以同时写进数据库或者redis中!
                             */
                            accumulator.add(wordPair._2);
                            return false;
                        }else {
                            return true;
                        }
                    };
                    /**
                     * 这里真的希望 广播和计数器执行的话。要进行一个action操作!
                     */
                }).collect();

                System.out.println("广播器里面的值"+broadcastList.value());
                System.out.println("计时器里面的值"+accumulator.value());
                return null;
            }
        });


        jsc.start();
        jsc.awaitTermination();
        jsc.close();

    }

    }


2.2 Scala版本

package com.Streaming

import java.util

import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
import org.apache.spark.broadcast.Broadcast

/**
  * Created by lxh on 2016/6/30.
  */
object BroadcastAccumulatorStreaming {

  /**
    * 声明一个广播和累加器!
    */
  private var broadcastList:Broadcast[List[String]]  = _
  private var accumulator:Accumulator[Int] = _

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest")
    val sc = new SparkContext(sparkConf)

    /**
      * duration是ms
      */
    val ssc = new StreamingContext(sc,Duration(2000))
   // broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
    broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
    accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")

    /**
      * 获取数据!
      */
    val lines = ssc.socketTextStream("localhost",9999)

    /**
      * 拿到数据后 怎么处理!
      *
      * 1.flatmap把行分割成词。
      * 2.map把词变成tuple(word,1)
      * 3.reducebykey累加value
      * (4.sortBykey排名)
      * 4.进行过滤。 value是否在累加器中。
      * 5.打印显示。
      */
    val words = lines.flatMap(line => line.split(" "))

    val wordpair = words.map(word => (word,1))

    wordpair.filter(record => {broadcastList.value.contains(record._1)})


    val pair = wordpair.reduceByKey(_+_)

    /**
      *这步为什么要先foreachRDD?
      *
      * 因为这个pair 是PairDStream<String, Integer>
      *
      *   进行foreachRDD是为了?
      *
      */
/*    pair.foreachRDD(rdd => {
      rdd.filter(record => {

        if (broadcastList.value.contains(record._1)) {
          accumulator.add(1)
          return true
        } else {
          return false
        }

      })

    })*/

    val filtedpair = pair.filter(record => {
        if (broadcastList.value.contains(record._1)) {
          accumulator.add(record._2)
          true
        } else {
          false
        }

     }).print

    println("累加器的值"+accumulator.value)

   // pair.filter(record => {broadcastList.value.contains(record._1)})

   /* val keypair = pair.map(pair => (pair._2,pair._1))*/

    /**
      * 如果DStream自己没有某个算子操作。就通过转化transform!
      */
   /* keypair.transform(rdd => {
      rdd.sortByKey(false)//TODO
    })*/
    pair.print()
    ssc.start()
    ssc.awaitTermination()

  }

}

本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。
引言: 对于刚接触ES的童鞋,经常搞不明白ES的各个概念的含义。尤其对“索引”二字更是与关系型数据库混淆的不行。本文通过对比关系型数据库,将ES中常见的增、删、改、查操作进行图文呈现。能加深你对ES的理解。同时,也列举了kibana下的图形化展示。 ES Restful API GET、POST、PUT、DELETE、HEAD含义: 1)GET:获取请求对象的当前状态。 2)POST:改变对象的当前状态。 3)PUT:创建一个对象。 4)DELETE:销毁对象。 5)HEAD:请求获取对象的基础信息。 M
设计原理 kafka的 设计 初衷是希望作为一个 统一的信息收集平台 , 能够实时的收集反馈信息 ,并需要 能够支撑较大的数据量 , 且具备良好的容错能力. 持久性 kafka 使用文件存储消息 ,这就直接决定kafka在性能上严重依赖文件系统的本身特性.且无论任何OS下,对文件系统本身的优化几乎没有可能.文件缓存/直接内存映射等是常用的手段.因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;同时 为了减少磁盘写入的次数,broker 会将消息暂时buffer起来,当消息的个数(
前言 人们对人脸识别的研究已经有很长一段时间,起初局限于获取基础的人脸信息,随着机器学习领域的发展,人脸识别的应用更加广泛,已经可以被用于人脸搜索、人脸鉴权等相关应用。本文将针对微软认知服务中提供的人脸识别API的调用方法进行一些初步的讲解。 Face API Face API中提供了3方面功能: 人脸检测 人脸分组 人脸识别(搜索) 首先是人脸检测,主要是指传统概念上的人脸识别功能,识别图片中的人的面孔,给出人脸出现的坐标区域,并根据识别出来的人脸分析出一些基本的信息(例如年龄)。 其次是人脸分组,可以

Hadoop MapReduce原理及实例 - 2016-07-17 17:07:30

MapReduce是用于数据处理的一种编程模型,简单但足够强大,专门为并行处理大数据而设计。 1. 通俗理解MapReduce MapReduce的处理过程分为两个步骤:map和reduce。每个阶段的输入输出都是key-value的形式,key和value的类型可以自行指定。map阶段对切分好的数据进行并行处理,处理结果传输给reduce,由reduce函数完成最后的汇总。 例如从大量历史数据中找出往年最高气温,NCDC公开了过去每一年的所有气温等天气数据的检测,每一行记录一条观测记录,格式如下: 为了
前几天有幸参加了OpenStack Days China的两天技术峰会,集合了全球及国内顶尖的OpenStack技术专家,为我们分享了许多关于OpenStack的技术报告。 有许多人参加类似技术峰会都有这些感受: 1、一般主会场的领导和院士发言基本没有什么干货,也就是对我们实际工作没有太大帮助 2、一般讲的不错的都是公司的CEO、CTO等,但是他们都是公司商业因素占据很多,技术并不是他们实干,好像也没有什么收获 3、一般分享干货的实践者往往讲的水平一般,枯燥乏味,太干了,干的让人无法消化 当然,我觉得现如

Flume性能测试报告 - 2016-07-16 17:07:11

1. 测试环境 1.1 硬件 CPU:Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz(8核) 内存:16G 1.2 软件 Flume:1.6.0 Hadoop:2.6.0-cdh5.5.0 Kfaka:2.11-0.9.0.1 JDK:1.8.0_91-b14 64位 1.3 测试文件 文件大小:107M ,共490010条记录 1.4 Flume配置 (1)Source配置 Flume Source采用spooldir的方式,直接读取预先准备好的测试文件。 agent .
我最近研究了hive的相关技术,有点心得,这里和大家分享下。 首先我们要知道hive到底是做什么的。下面这几段文字很好的描述了hive的特性: 1.hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。 2.Hive是建立在 Hadoop 上的数据仓

webSocket使用教程 - 2016-07-16 14:07:11

webSocket使用教程 Gson简介 GSON是Google开发的Java API,用于转换Java对象和Json对象 gson和其他现有java json类库最大的不同时gson需要序列化的实体类不需要使用annotation来标识需要序列化的字段,同时gson又可以 通过使用annotation来灵活配置需要序列化的字段。在Java开发中,有时需要保存一个数据结构成字符串,可能你会考虑用Json,但是当 Json字符串转换成Java对象时,转换成的是JsonObject,并不是你想要的Class类
目录 目录 前言 Openstack基金委员会 Openstack贡献者须知 注册Openstack In Launchpad 生成并上传OpenPGP密钥 生成并上传SSH公钥 Join The OpenStack Foundation 签署CLA贡献者协议 参考资料 前言 由Openstack基金委员会管理的Openstack社区,现在已经成为了全球第二大开源社区仅次于Linux社区,所以也有人将Openstack定义为下一个Linux。就从我个人角度出发,我认为Openstack和Linux不属于同
文本详细介绍了HDFS中的许多概念,对于理解Hadoop分布式文件系统很有帮助。 1. 介绍 在现代的企业环境中,单机容量往往无法存储大量数据,需要跨机器存储。统一管理分布在集群上的文件系统称为分布式文件系统。而一旦在系统中,引入网络,就不可避免地引入了所有网络编程的复杂性,例如挑战之一是如果保证在节点不可用的时候数据不丢失。 传统的网络文件系统(NFS)虽然也称为分布式文件系统,但是其存在一些限制。由于NFS中,文件是存储在单机上,因此无法提供可靠性保证,当很多客户端同时访问NFS Server时,很容