spark快速入门与WordCount程序机制深度解析 spark研习第二季

2、spark wordCount程序深度剖析

标签: spark


一、Eclipse(scala IDE)开发local和cluster

(一). 配置开发环境

  1. 要在本地安装好java和scala。 
    由于spark1.6需要scala 2.10.X版本的。推荐 2.10.4,java版本最好是1.8。所以提前我们要需要安装好java和scala并在环境变量中配置好。
  2. 下载scala IDE for eclipse安装 连接:http://scala-ide.org/download/sdk.html 
    打开ide新建scala project 
    点击file -> new ->Scala Project ,在弹出的对话框中弹性project name 为“WordCount”,默认点击next,点击finish的。
  3. 修改scala版本 
    项目创建完成后默认使用的是scala的2.11.7 版本。要手动将版本换成2.10.X。在项目名称右击选择properties,在弹出窗口点击,scala Compiler,在右侧窗口,选中Use Project settings, 将scala Installation 修改为Latest 2.10 bundle(dynamic).点击apply,点击ok。scala版本变成2.10.6。

  4. 找到依赖的spark jar文件并导入到eclipse中。 
    所依赖的jar文件是 
    spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar。 
    在项目名称上右击,选择build path ->configure build path。在弹出框中点击library,点击右侧的addExternalJARs,然后选择 
    park-assembly-1.6.0-hadoop2.6.0.jar点击打开,然后点击ok。

(二)、spark程序开发步骤

1. 在src下建立spark程序工程包

在src上右击new ->package 填入package的name为com.dt.spark。

2. 创建scala的入口类。

在包的名字上右击选择new ->scala class 。在弹出框中Name 中,在增加WordCount。点击finish。 
在方法内部讲关键字class 改成object ,然后创建main方法。

3. local模式代码方法

  1. package com.dt.spark
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.SparkContext
  4. import org.apache.spark.rdd.RDD
  5. object WordCount{
  6. def main(args: Array[String]): Unit ={
  7. /**第1步,创建spark的配置对象sparkconf,设置spark运行时的配置信息,如通过setMaster设置程序连接的spark
  8. * 集群的master的URL,如果设置为local则在本地运行。
  9. * */
  10. val conf = new SparkConf()
  11. conf.setAppName("The first spark app")
  12. conf.setMaster("local")
  13. /**第2步,创建SparkContext对象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark应用程序的
  14. * 核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend
  15. * */
  16. val sc = new SparkContext(conf)
  17. /**第3步,根据数据源(HDFS,HBase,Local FS)通过SparkContext来创建RDD
  18. * 数据被RDD划分为一系列的Partitions,分配到每个partition的数据属于一个Task的处理范畴
  19. * */
  20. val lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md", 1) //读取本地文件并设置一个partition
  21. /**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算
  22. **/
  23. val words = lines.flatMap{ line => line.split(" ")}//对每行字符串进行单词拆分,并把所有拆分结果通过flat合并成一个大的单词集合
  24. val pairs = words.map{ word => (word, 1)} //在单词拆分基础上对每个单词实例计数为1
  25. val wordCounts = pairs.reduceByKey(_+_)//相同的key,value累加
  26. wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))
  27. sc.stop()
  28. }
  29. }

在运行过程中会出现WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable。java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. 这个错误。但是在local模式下,这个是正常的。因为spark是和hadoop编译在一起的,我们在window 下开发,缺少hadoop的配置。这不是程序错误,也不影响我们的任何功能。

4.编写Cluster模式代码

  1. import org.apache.spark.SparkConf
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.rdd.RDD
  4. object WordCount_Cluster {
  5. def main(args: Array[String]){
  6. /**第1步,创建spark的配置对象sparkconf,设置spark运行时的配置信息,如通过setMaster设置程序连接的spark
  7. * 集群的master的URL,如果设置为local则在本地运行。
  8. * */
  9. val conf = new SparkConf() //创建SparkConf对象
  10. conf.setAppName("Wow,My First Spark App!")
  11. // conf.setMaster("spark://master:7077")
  12. /**第2步,创建SparkContext对象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark应用程序的
  13. * 核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend
  14. * */
  15. val sc = new SparkContext(conf)
  16. /**第3步,根据数据源(HDFS,HBase,Local FS)通过SparkContext来创建RDD
  17. * 数据被RDD划分为一系列的Partitions,分配到每个partition的数据属于一个Task的处理范畴
  18. * */
  19. //val lines = sc.textFile("hdfs://Master:9000/library/wordcount/input/Data") //读取HDFS文件并切分成不同的Partions
  20. val lines = sc.textFile("/library/wordcount/input/Data") //读取HDFS文件并切分成不同的Partions
  21. /**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算
  22. **/
  23. val words = lines.flatMap { line =>line.split(" ")} //对每一行的字符串进行单词拆分并把所有行的拆分结果通过flat合并成为一个大的单词集合
  24. val pairs = words.map { word => (word, 1) }
  25. val wordCounts = pairs.reduceByKey(_+_) //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
  26. wordCounts.collect.foreach(wordNumberPair =>println(wordNumberPair._1 + " : " + wordNumberPair._2))
  27. //需要添加collect,表示集群
  28. sc.stop()
  29. }
  30. }

将程序达成jar 包 
在项目名称上右击点击export选择java 下的jar file,点击next,选择输出目录,输入文件名,点击next,点击next,然后点击完成。导出jar 包。

在spark中执行wordcount方法。

将jar 放到linux系统某个目录中。执行 
./spark-submit --class com.dt.spark.WordCount_Cluster --master spark://worker1:7077 ./wordcount.jar

也可以将以上命令保存到.sh文件中,直接执行sh文件即可。

二、使用idea开发spark的Local和Cluster

(一)、配置开发环境

1. 要在本地安装好java和scala。

由于spark1.6需要scala 2.10.X版本的。推荐 2.10.4,java版本最好是1.8。所以提前我们要需要安装好java和scala并在环境变量中配置好

2. 下载IDEA 社区版本,选择windows 版本并按照配置。

安装完成以后启动IDEA,并进行配置,默认即可,然后点击ok以后,设置ui风格,然后点击next 会出现插件的选择页面,默认不需求修改,点击next,选择安装scala语言,点击install 按钮(非常重要,以为要开发spark程序所以必须安装),等安装完成以后点击start启动IDEA。

3. 创建scala项目

点击 create new project ,然后填写project name为“Wordcount”,选择项目的保存地址project location。 
然后设置project sdk即java 的安装目录。点击右侧的new 按钮,选择jdk,然后选择java 的安装路径即可。 
然后选择scalasdk。点击右侧的create ,默认出现时2.10.x 版本的scala,点击ok即可。然后点击finish。

4. 设置spark的jar 依赖。

点击file->project structure 来设置工程的libraries。核心是添加spark的jar依赖。选择Libraries ,点击右侧的加号,选择java,选择spark1.6.0 的spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar。点击ok。稍等片刻后然后点击ok(Libraries作用于WordCount),然后点击apply,点击ok。(这一步很重要,如果没有无法编写spark的代码)

(二)、编写代码

1. 在src下建立spark程序工程包

在src上右击new ->package 填入package的name为com.dt.spark。

2. 创建scala的入口类。

在包的名字上右击选择new ->scala class 。在弹出框中填写Name ,并制定kind为object ,点击ok。

3. 编写local代码

  1. package com.dt.spark
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.SparkContext
  4. import org.apache.spark.rdd.RDD
  5. object WordCount{
  6. def main(args: Array[String]): Unit ={
  7. /**第1步,创建spark的配置对象sparkconf,设置spark运行时的配置信息,如通过setMaster设置程序连接的spark
  8. * 集群的master的URL,如果设置为local则在本地运行。
  9. * */
  10. val conf = new SparkConf()
  11. conf.setAppName("The first spark app")
  12. conf.setMaster("local")
  13. /**第2步,创建SparkContext对象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark应用程序的
  14. * 核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend
  15. * */
  16. val sc = new SparkContext(conf)
  17. /**第3步,根据数据源(HDFS,HBase,Local FS)通过SparkContext来创建RDD
  18. * 数据被RDD划分为一系列的Partitions,分配到每个partition的数据属于一个Task的处理范畴
  19. * */
  20. val lines = sc.textFile("G://datarguru spark//tool//spark-1.4.0-bin-hadoop2.6//README.md", 1) //读取本地文件并设置一个partition
  21. /**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算
  22. **/
  23. val words = lines.flatMap{ line => line.split(" ")}//对每行字符串进行单词拆分,并把所有拆分结果通过flat合并成一个大的单词集合
  24. val pairs = words.map{ word => (word, 1)} //在单词拆分基础上对每个单词实例计数为1
  25. val wordCounts = pairs.reduceByKey(_+_)//相同的key,value累加
  26. wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))
  27. sc.stop()
  28. }
  29. }

在代码去右击选择点击run”wordCount”来运行程序。在生成环境下肯定是写自动化shell 脚本自动提交程序的。 
注意:如果val sc = new SparkContext(conf)报错,并且没有运行结果,需要将scala的module改成scala 2.10版本的。具体操作:File->project structure -> Dependencies ->删除scala 2.11.x的module.-> 左上角的“+” -> scala ->选中scala2.10.4 -> apply

4. 编写Cluster模式代码

  1. package com.dt
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.SparkContext
  4. import org.apache.spark.rdd.RDD
  5. object Word{
  6. def main(args: Array[String]): Unit ={
  7. /**第1步,创建spark的配置对象sparkconf,设置spark运行时的配置信息,如通过setMaster设置程序连接的spark
  8. * 集群的master的URL,如果设置为local则在本地运行。
  9. * */
  10. val conf = new SparkConf()
  11. conf.setAppName("The first spark app")
  12. //conf.setMaster("spark://master:7077")
  13. /**第2步,创建SparkContext对象,SparkContext是spark程序所有功能的唯一入口,其作用是初始化spark应用程序的
  14. * 核心组件,包括DAGScheduler,TaskScheduler,SchedulerBackend
  15. * */
  16. val sc = new SparkContext(conf)
  17. /**第3步,根据数据源(HDFS,HBase,Local FS)通过SparkContext来创建RDD
  18. * 数据被RDD划分为一系列的Partitions,分配到每个partition的数据属于一个Task的处理范畴
  19. * */
  20. val lines = sc.textFile("/library/wordcount/input/Data")
  21. /**第4步,对初始的RDD进行Transformation级别的处理,如map、filter高阶函数编程,进行具体计算
  22. **/
  23. val words = lines.flatMap{ line => line.split(" ")}//对每行字符串进行单词拆分,并把所有拆分结果通过flat合并成一个大的单词集合
  24. val pairs = words.map{ word => (word, 1)} //在单词拆分基础上对每个单词实例计数为1
  25. val wordCountsSorted = pairs.reduceByKey(_+_).map(pairs=>(pairs._2, pairs._1)).sortByKey(false).map(pair=>(pair._1, pair._2))//相同的key,value累加并且排名
  26. wordCountsSorted.collect.foreach(wordNumberPair => println(wordNumberPair._1 + ":" + wordNumberPair._2))
  27. sc.stop()
  28. }
  29. }

将程序达成jar 包 
点击file->project structure,在弹出的页面点击Artifacts,点击右侧的“+”,选择jar –> from modules with dependencies,在弹出的页面中,设置好main class 然后点击ok,在弹出页面修改Name(系统生成的name不规范)、导出位置并删除scala和spark的jar(因为集群环境中已经存在)点击ok 。然后在菜单栏中点击build –> Artifacts ,在弹出按钮中,点击bulid,会自动开始打包。

在spark中执行wordcount方法。 
将jar 放到linux系统某个目录中。执行

    1. ./bin/spark-submit --class com.dt.spark.Word --master spark://master:7077 ./word.jar

注意事项: 
为什么不能再ide开发环境中,直接发布spark程序到spark集群中? 
1. 开发机器的内存和cores的限制,默认情况情况下,spark程序的dirver在提交spark程序的机器上,如果在idea中提交程序的话,那idea机器就必须非常强大。 
2. Dirver要指挥workers的运行并频繁的发生同学,如果开发环境和spark集群不在同样一个网络下,就会出现任务丢失,运行缓慢等多种不必要的问题。 
3. 这是不安全的。

三、WordCount的java开发版本

(一). 环境搭建安装jdk 和maven。

  1. 安装jdk并配置环境变量 
    系统变量→新建 JAVA_HOME 变量。 
    变量值填写jdk的安装目录(本人是 E:\Java\jdk1.7.0) 
    系统变量→寻找 Path 变量→编辑 
    在变量值最后输入 %JAVA_HOME%\bin;%JAVA_HOME%\jre\bin;(注意原来Path的变量值末尾有没有;号,如果没有,先输入;号再输入上面的代码) 
    系统变量→新建 CLASSPATH 变量值填写 .;%JAVA_HOME%\lib;%JAVA_HOME%\lib\tools.jar(注意最前面有一点)
  2. Maven的安装和配置 
    解压apache-maven-3.1.1-bin.zip,并把解压后的文件夹下的apache-maven-3.1.1文件夹移动到D:\Java下,如果没有Java这个文件夹的话,请自行创建 
    新建系统变量 MAVEN_HOME 变量值:D:\Java\apache-maven-3.1.1。编辑系统变量 Path 添加变量值: ;%MAVEN_HOME%\bin。 
    在mave 的目录中修改conf/settings.xml,在localRepository属性后添加D:/repository修改maven下载jar 的位置。
  3. eclipse 中java 和maven 的配置 
    点击 window ->java ->Installed JREs ->add ->standard vm ,点击next ,然后选择jdk 的安装路径点击finish即可。 
    点击window ->Maven ->Installations ->add 在弹出页面选择mave 的安装路径,然后点击finish。然后在列表中选择我们自己刚添加的那个maven信息。 
    然后点击window ->Maven ->User Setings 在右侧的User Settings 点击browse 现在mavenconf目录下的setttings.xml .(主要是修改maven下载依赖包存放的位置)

(二). 创建maven项目

  1. 创建maven项目 
    点击file ¬->new ->others ->maven project 点击next,选择maven-archetype-quickstart ,点击next,group id 为 com.dt.spark,artifact id 为 sparkApps,然后点击finish。
  2. 修改jdk和pom文件 
    创建maven项目后,默认的jdk是1.5要改成我们前面安装好的jdk1.8。在项目上右击build path ->configure build path 。在弹出页面点击Libraries,选中jre system library 。点击edit,在弹出框选择workspace default jre ,然后点击finish。然后在点击ok。将pom文件修改为如下内容,然后等待eclipse下载好maven依赖的jar包,并编译工程。编译好工程后有个错误提示,在此错误列上,右击选择quick fix ,在弹出页面点击finish即可。
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>com.dt.spark</groupId>

本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。
1、spark 部署 标签: spark 0 apache spark项目架构 spark SQL -- spark streaming -- MLlib -- GraphX 0.1 hadoop快速搭建,主要利用hdfs存储框架 下载hadoop-2.6.0,解压,到etc/hadoop/目录下 0.2 快速配置文件 cat core-site.xml configuration property name fs.defaultFS /name value hdfs://worker1:9000 /va

Devstack单节点环境实战配置 - 2016-07-23 14:07:05

本实验是在VMware12下建立虚机的一个测试环境。 1 前期准备工作 真机环境win10 Linux版本 centos-everything-7.0 VMware版本 VMwareworkstations12 虚机配置如下: 8G内存 2核cpu(开启虚拟化) 网络配置为桥接模式 /boot 500M(一定要分大点不然之后会遇到问题) swap分区4G 其余的空间全部分给/分区 配置yum源,你可以保持装机自带的centos自带官方yum源,或者使用国内的给的镜像,本次试验中用的国外镜像并且使用fast
Openstack自动化部署工具, 主要用于生产环境. 一. 环境准备 这里用的是Openstack 9.0版本. Fuel Documentation 下载 Fuel for OpenStack镜像文件 , 用于安装Feul Master. 安装 Xshell , 用于远程连接. 安装 xftp , 用于从Windows主机向虚拟机传输文件. 二. 安装fuel_master节点 1. VirtualBox网络配置 管理-全局设定-网络-仅主机(Host-Only)网络 新建三张新的网卡: Host-O
本篇主要阐述通过DeveStack 去部署Openstack(mitaka),对大多数来说安装部署Openstack 来说是个痛苦的过程,尤其是 OpenStack和它依赖的一些组件在快速发展中,经常出现这个版本组件对不上那个版本 dashboard等情况。如果只是看看或者初期玩玩 OpenStack的话,使用DevStack也是个不错的办法。DevStack采用了自动化源码部署的方式,适用于开发环境的部署和Openstack开发者,单节点,小环境;这里采用的操作系统为Ubuntu14.04。 一、操作系
操作过程: 第一步删除phoenix中系统的表格信息,主要为SYSTEM.CATALOG,第二步删除Hbase中的表格信息。 操作步骤: (1)查询phoenix系统表 SYSTEM.CATALOG 内容是所有表格的信息,系统表和自建表 SYSTEM.FUNCTION 内容是所有函数信息,系统函数和自定义函数 SYSTEM.SEQUENCE 我也不知道 SYSTEM.STATS 内容是所有表格的最后状态信息 (2)查询SYSTEM.CATALOG表结构 (3)我要删除的表格 DELETE from SYS
1.查看mysql中metastore数据存储结构 Metastore中只保存了表的描述信息( 名字,列,类型,对应目录 ) 使用SQLYog连接itcast05 的mysql数据库 查看hive数据库的表结构: 2.建表(默认是内部表(先建表,后有数据)) (建表时必须指定列的分隔符) create table trade_detail(id bigint, account string, income double, expenses double, time string) row format d
1、查看系统环境 cat /etc/redhat-releaseuname -runame -m 关闭所有服务器的防火墙 /etc/init.d/iptables stopchkconfig iptables offchkconfig --list iptables 2、Spark集群机器规划 一共准备了四台机器 Host IP Hadoop Spark Node1 192.168.2.128 Master Master Node2 192.168.2.130 Slave Slave Node3 192.
本篇主要阐述通过DeveStack 去部署Openstack(mitaka),对大多数来说安装部署Openstack 来说是个痛苦的过程,尤其是 OpenStack和它依赖的一些组件在快速发展中,经常出现这个版本组件对不上那个版本 dashboard等情况。如果只是看看或者初期玩玩 OpenStack的话,使用DevStack也是个不错的办法。DevStack采用了自动化源码部署的方式,适用于开发环境的部署和Openstack开发者,单节点,小环境;这里采用的操作系统为Ubuntu14.04。 一、操作系
Spark版本: 1.6.2 概览 Spark SQL用于处理结构化数据,与Spark RDD API不同,它提供更多关于数据结构信息和计算任务运行信息的接口,Spark SQL内部使用这些额外的信息完成特殊优化。可以通过SQL、DataFrames API、Datasets API与Spark SQL进行交互,无论使用何种方式,SparkSQL使用统一的执行引擎记性处理。用户可以根据自己喜好,在不同API中选择合适的进行处理。本章中所有用例均可以在 spark-shell、pyspark shell、s

HBase工作原理学习 - 2016-07-22 18:07:56

HBase工作原理学习   1 HBase简介 HBase是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建大规模结构化的存储集群。HBase的目标是存储并处理大型数据,具体来说是仅需使用普通的硬件配置,就能够处理由成千上万的行和列所组成的大型数据。 与MapReduce的离线批处理计算框架不同,HBase是一个可以随机访问的存储和检索数据平台,弥补了HDFS不能随机访问数据的缺陷,适合实时性要求不是非常高的业务场景。HBase存储的都是Byte数组