Spark 官方文档(5)——Spark SQL,DataFrames和Datasets 指南

Spark版本:1.6.2

概览

Spark SQL用于处理结构化数据,与Spark RDD API不同,它提供更多关于数据结构信息和计算任务运行信息的接口,Spark SQL内部使用这些额外的信息完成特殊优化。可以通过SQL、DataFrames API、Datasets API与Spark SQL进行交互,无论使用何种方式,SparkSQL使用统一的执行引擎记性处理。用户可以根据自己喜好,在不同API中选择合适的进行处理。本章中所有用例均可以在spark-shell、pyspark shell、sparkR中执行。

SQL

执行SQL语句的方法有多种:

  • 可以使用基础SQL语法或HiveQL语法在Spark SQL上执行查询,SparkSQL可以从已安装的Hive中读取数据。当使用其他编程语言时,结果集以DataFrame类型返回
  • 通过SQL命令行进行交互(spark-sql)
  • 可以通过JDBC/ODBC驱动进行交互

DataFrames

DataFrame是由分布式数据集合组成的一系列命名列,它与关系数据库的表类似,但有很多优化的地方。DataFrame支持多种数据源,包括结构化数据、Hive的表、外部数据库、RDDs等。DataFrame API支持scala 、java、Python和R语言。

Datasets

数据集接口在Spark1.6才加入,它可以使用Spark SQL的优化器对RDD操作进行优化。Dataset有JVM对象构建,并可以进行map、flatMap、filter等操作。Dataset API统一接口支持java和scala语言。

开始

程序入口: SQLContext

SQLContext是Spark SQL所有功能的入口,通过SparkContext可以创建该对象的实例:

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

除了SQLContext,还可以创建HiveContext对象,它包含更多的功能,例如HiveQL解析器支持更完善的语法、使用Hive用户自定义函数UDFs、从Hive表中读取数据等。HiveContext不依赖Hive是否安装,Spark默认支持HiveContext。从Spark1.3以后,推荐使用HiveContext,未来SQLContext会包含HiveContext中的功能。
可以通过spark.sql.dialect选项更改SQL解析器,这个参数可以再SQLContext的setConf方法设置,也可以通过SQL的ky=value语法设计。在SQLContext中dialect只支持一种简单的SQL解析器“sql”。HiveContext默认解析器是“hiveql”,同时支持“sql”,但一般推荐hiveql,因为它语法更全。

创建DataFrames

DataFrames的数据源多种多样,例如RDD、Hive table或者其他数据源。
下面代码从JSON文件创建了一个DataFrame

JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();

DataFrame 操作

DataFrame支持结构化数据领域常用的数据操作,支持Scala、Java、Python和R语言,下面是一些基本操作示例:

JavaSparkContext sc // An existing SparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Show the content of the DataFrame
df.show();
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df.col("name"), df.col("age").plus(1)).show();
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df.col("age").gt(21)).show();
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show();
// age  count
// null 1
// 19   1
// 30   1

对于DataFrame的所有操作类型可以参考API文档。除了简单的列操作,DataFrame还支持字符串操作、日期算法、数据操作等等,可以参考DataFrame函数文档

编码实现SQL查询

SQLContext的sql方法支持运行sql语法的查询,并返回DataFrame类型的结果集:

SQLContext sqlContext = ... // An existing SQLContext
DataFrame df = sqlContext.sql("SELECT * FROM table")

创建Datasets

Dataset与RDD类似,但它不适用java序列化也不适用Kryo,而是使用特定的Encoder作为序列化工具。Encoder可以对Spark对象进行序列化和反序列化,同时不需要反序列化在字节级别就能支持filtering、sorting和hashing等操作。

// Encoders for most common types are automatically provided by importing sqlContext.implicits._
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// Encoders are also created for case classes.
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]

RDD交互操作

在Spark SQL中有两种方式可以在DataFrame和RDD进行转换,第一种方法是利用反射机制,推导包含某种类型的RDD,通过反射将其转换为指定类型的DataFrame,适用于提前知道RDD的schema。
第二种方法通过编程接口与RDD进行交互获取schema,并动态创建DataFrame,在运行时决定列及其类型。

使用反射推断Schema

Scala支持使用case class类型导入RDD转换为DataFrame,通过case class创建schema,case class的参数名称会被利用反射机制作为列名。case class可以嵌套组合成Sequences或者Array。这种RDD可以高效的转换为DataFrame并注册为表。

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)

编程指定schema

当case class不能提前定义好时,可以通过以下三步通过代码创建DataFrame

  • 将RDD转为包含row对象的RDD
  • 基于structType类型创建schema,与第一步创建的RDD相匹配
  • 通过SQLContext的createDataFrame方法对第一步的RDD应用schema
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)

数据源

DataFrame接口支持一系列的数据源,它可以按照普通RDD进行操作,也能被注册为临时表进行操作。注册临时表后可以使用SQL查询操作数据集,本章节介绍了常用加载保存数据的方法,同时给出了内部数据源的特殊操作。

常规Load/Save函数

未配置spark.sql.sources.default情况下,默认使用parquet数据源处理所有操作。

val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

手动指定选项

用户可以手动指定数据源加载的选项,对于数据源类型需要使用完整名称指定例如(org.apache.spark.sql.parquet),但对于内部类型可以使用简称,例如(json parquet jdbc等)。可以通过以上方法在不同DataFrame之间进行转换。

val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

在文件上直接执行SQL

除了需要将文件加载到DataFrame再执行sql以外,还可以直接执行sql

val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

保存模式

Save通过SaveMode指定如何维护现有的数据。需要注意的是savemode未对数据加锁,因而不是源自操作。若使用overwrite模式时,原有数据会先被清空。

Scala/Java Any Language 含义
SaveMode.ErrorIfExists (default) “error” (default) 当数据输出的位置已存在时,抛出此异常
SaveMode.Append “append” 当数据输出的位置已存在时,在文件后面追加
SaveMode.Overwrite “overwrite” 当数据输出的位置已存在时,重写
SaveMode.Ignore “ignore” 当数据输出的位置已存在时,不执行任何操作,与 CREATE IF NOT EXISTS类似

保存到持久化表中

使用HiveContext时,DataFrame可以使用saveAsTable方法保存到持久化表中。与registerTempTable不同,saveASTable会为其真正创建数据区并创建指向该区域的指针放入HiveMetaStore中。在持有同一个metastore的连接期间,持久化的数据会一直存在,即使spark程序重启也不影响。可以通过SQLContext的table方法创建用于持久化表的DataFrame。
默认的saveASTable会创建“managed table”,其数据位置会被metastore维护,被管理的表数据会在表被删除时清空。

Parquet文件

parquet是一种流行的列式存储格式。SparkSQL支持对parquet的读写以及schema和数据的维护。在写parquet文件时,为了兼容,所有列都会转换为nullable格式。

编程实现数据加载

// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.write.parquet("people.parquet")

// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

分区推断

表分区是Hive等系统的常用优化手段。在一个分区表中,数据经常分布在不同目录下,分区列的值相同的数据分布在同一目录中。目前支持对parquet文件进行自动推断分区。例如我们可以将之前的数据增加两列gender和country,并将两列作为分区列进行数据分区。

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

将数据路径传给SQLContext后,可以自动推断DataFrame数据的分区信息。注意,数据的分区列是自动推断出来你的,目前分区列支持数值类型和string类型。若用户不希望自动推断分区列时,可以通过spark.sql.sources.partitionColumnTypeInference.enabled配置禁止自动推断,此时会使用string类型列进行分区。
分区类型会根据传入的路径进行推断,但用户可以配置数据源的basePath属性设置分析的路径。

Schema合并

parquet支持列增加等操作,当出现多个互相兼容的schemas时,parquet可以自动检测并合并这些文件的schema。由于schema 合并会消耗大量的资源,默认关闭该操作,可以通过以下方法打开:

  • 设置数据源mergeSchema属性为true
  • 设置SQL的选项spark.sql.parquet.mergeSchema为true
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")

// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

Hive metasotre Parquet表转化

SparkSQL使用内部库而不是Hive SerDe,对Hive metasotre Parquet表进行读写,性能很好,可以通过spark.sql.hive.convertMetastoreParquet配置。

Hive/Parquet Schema Reconciliation

由于Hive和Parquet的元数据处理方式不同,如下所示

  • Hive忽略大小写,而Parquet没有
  • Hive所有字段都是nullable,而parquet中null是有意义的值(避免理解错误,贴上原文:Hive considers all columns nullable, while nullability in Parquet is significant)

将Hive metastore Parquet table转换为Spark SQL parquet表时,遵从以下规则:

  • 相同名称的字段的数据类型必须相同,nullable类型被忽略。由于融合的数据类型需要在parquet中有对应的类型,所以nullability类型需要处理。
  • 融合后schema中包含了Hive元数据中定义的值

    • 任何只在Parquet schema中出现的字段被抛弃
    • 任何旨在Hive元数据中出现的字段作为nullable增加到融合后元数据中

元数据刷新

Spark SQL会缓存parquet元数据以便提高性能。若Hive metastore Parquet table转换被启用,则转换的表元数据也会被cache。若这些元数据被外部工具修改,则需要手动更新缓存元数据保持一致性。

// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")

配置

与parquet相关的配置参数如下所示

参数 默认值 描述
spark.sql.parquet.binaryAsString false 该选项让SparkSQL将string安装二进制数据按照字符串处理,以便兼容老系统
spark.sql.parquet.int96AsTimestamp true Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
spark.sql.parquet.cacheMetadata true 缓存Parquet的Schema元数据,提高查询静态数据效率
spark.sql.parquet.compression.codec gzip 设置Parquet文件的压缩编码方式,支持 uncompressed, snappy, gzip, lzo.
spark.sql.parquet.filterPushdown true 启用过滤谓词下推优化,将过滤下推到抽取数据时,取得性能的提升
spark.sql.hive.convertMetastoreParquet true 若设为false,Spark SQL使用Hive SerDe支持对Parquet tables的操作.
spark.sql.parquet.output.committer.class org.apache.parquet.hadoop.ParquetOutputCommitter The output committer class used by Parquet. The specified class needs to be a subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it’s also a subclass of org.apache.parquet.hadoop.ParquetOutputCommitter.
spark.sql.parquet.mergeSchema false 是否开启Schema合并

JSON数据集

SQLContext.read.josn()接口可以自动推断JSON文件的schema。SparkSQL支持的JSON文件中每一行需要是一个完整的JSON对象,不支持跨行的json对象。

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)

// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// Register this DataFrame as a table.
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

Hive 表

Spark SQL支持从Hive中读取数据,但由于Hive依赖过多,默认不支持Hive,需要在编译时添加-Phive -Phive-thriftserver选项。由于用到Hive的序列化和反序列化需要保证Hive包在各个worker中都存在。

将hive-site.xml、core-site.xml和hdfs-site.xml放入conf目录下配置Hive环境。在Yarn集群上面运行时,需要确定datanucleus jar包和hive-site.xml在driver和所有executor上面都存在。可以通过spark-submit的–jars和–file参数检查是否存在。
若通过Spark SQL操作Hive需要创建HiveContext,增加元数据功能及HiveQL支持。若没有部署Hive环境同样可以创建HiveContext。若没有在hive-site.xml中配置,会自动在当前目录创建metastore_db并在/user/hive/warehouse创建仓储目录,需要给hive对/user/hive/warehouse的写权限。

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

与不同版本Hive Metastore交互

由于Spark SQL可以与不同版本的Hive Metastor(而不是Hive的版本)进行交互,只需要修改部分的配置信息,相关配置如下:

属性 默认值 描述
spark.sql.hive.metastore.version 1.2.1 Hive metastore的版本信息,从0.12.0到1.2.1
spark.sql.hive.metastore.jars builtin 指定metastore的Jar包位置,builtin:该jar被打包到spark应用程序中;maven:使用maven远程仓储下载;类路径:需要包含hive所有的依赖包
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc 一个逗号分隔的类名前缀列表,这些类使用classloader加载,且可以在Spark SQL和特定版本的Hive间共享。例如,用来访问hive metastore 的JDBC的driver就需要这种共享。其他需要共享的类,是与某些已经共享的类有交互的类。例如,自定义的log4j appender。
spark.sql.hive.metastore.barrierPrefixes (empty) 使用逗号分隔的类名前缀列表,Spark SQL所访问的每个Hive版本都会被显式的reload这些类。

JDBC连接其他数据库

SparkSQL通过JdbcRDD实现对支持jdbc的数据库进行数据加载,将其作为DataFrame进行操作。JDBC加载的数据源不需要提供classTag。使用前需要将JDBC Driver包含在spark的classpath中。例如连接postgres需要如下设置

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

数据库中的表可以作为DataFrame或SparkSQL的临时表加载,支持以下的选项:

属性 描述
url JDBC连接URL
dbtable 需要读取的JDBC表。任何在From子句中的元素都可以,例如表或者子查询等。
partitionColumn, lowerBound, upperBound, numPartitions 这些选项需要同时制定,他们制定了如何并发读取数据的同时进行分区。lowerBound, upperBound仅用于确定分区边界不用于过滤数据,所有数据都会被分区
fetchSize 决定了每次数据取多少行
val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()

疑难问题

  • JDBC的driver类需要在所有executor可见,因为Java的DriverManager会进行安全检查,忽略所有不可见的类。可以通过修改每个worker节点的compute_classpath.sh以便包含Jar包
  • 有些数据库例如H2的名称是大写,需要在SparkSQL中同样使用大写

性能调优

对于一些负载可以通过内存缓存数据或者调整参数提高性能。

内存缓存数据

Spark SQL可以通过sqlContext.cacheTable(“tableName”) 或 dataFrame.cache()接口将RDD数据缓存到内存中。SparkSql可以近扫描需要的列并自动压缩、进行垃圾回收等。可以通过sqlContext.uncacheTable(“Tablename”)从内存中移除表。

属性 默认值 描述
spark.sql.inMemoryColumnarStorage.compressed true 若设为true,Spark SQL会基于列的统计数据自动选择压缩器进行数据压缩
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列缓存的每批次的数据大小,数据越大则内存利用率及压缩比例越大,但OOM风险也越大

其他配置信息

可以通过修改以下配置提高查询执行的性能,以后可能会弃用以下设置,而变为自动进行最优化配置。

属性 默认值 描述
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置做join操作时被广播变量的表的大小。当设为-1时禁用广播。目前只有Hive元数据支持统计信息,可以通过ANALYZE TABLE <tablename> COMPUTE STATISTICS进行信息统计
spark.sql.tungsten.enabled true 若为true,或使用tungsten物理优化执行,显式地管理内存并动态生成表达式计算的字节码
spark.sql.shuffle.partitions 200 配置shuffle操作时的分区数量

分布式SQL引擎

当使用JDBC/ODBC或者命令行进行交互时,SparkSQL可以作为分布式查询引擎执行。在这种模式下,Spark SQL的应用能够不写代码便执行查询。

运行Thrift JDBC/ODBC驱动

这里的实现与HiveServer2类似,可以通过beeline测试Spakr或者Hive1.2.1的JDBC驱动。通过以下命令启动jdbc驱动

./sbin/start-thriftserver.sh

这脚本支持所有的spark-submit的参数,还支持–hiveconf指定特定的Hive属性。可以通过–help查看本脚本具体参数。默认server监听的端口是10000,可以覆盖一些环境变量:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
  --master <master-uri> \
  ...

或者修改系统属性

./sbin/start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=<listening-port> \
  --hiveconf hive.server2.thrift.bind.host=<listening-host> \
  --master <master-uri>
  ...

可以通过beeline测试Thrift JDBC/ODBC驱动

./bin/beeline

连接JDBC/ODBC驱动

beeline> !connect jdbc:hive2://localhost:10000

可能需要输入用户和密码进行安全验证,在非安全模式下,只需要本机的用户名和空密码即可。通过hive-site.xml, core-site.xml 和 hdfs-site.xml配置Hive。ThriftJDBC驱动同时支持通过HTTP端口发送thrift RPC消息。通过hive-site.xml中的配置开启HTTP模式作为系统属性:

hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

beeline可以通过http模式连接JDBC/ODBC

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

通过Spark SQL CLI运行

CLI是在单点模式下执行Hive元数据服务和查询的命令工具,但它不能与Thrift JDBC驱动进行会话。

./bin/spark-sql

与Apache Hive的兼容性

Spark SQL设计时考虑对Hive metastore,SerDes以及UDF的兼容。目前是基于Hive-1.2.1版本,并且Spark SQL可以连到不同版本(0.12.0到1.2.1)的Hive metastore。Spark SQL Thrift JDBC可以直接在已经部署Hive的环境运行。

不支持的Hive功能

  • bucket表:butcket是Hive的哈希分区
  • Union功能
  • unique join
  • 字段统计信息
  • Hadoop归档文件
  • Hive的部分优化功能

参考

数据类型

Spark SQL和DataFrame支持以下数据类型

  • numeric类型
    • ByteType:单字节有符号整数
    • ShortType:2个字节的有符号整数
    • IntegerType:4字节整数
    • LongType:8字节整数
    • FloatType:4字节单精度浮点数
    • DoubleType:8字节双精度浮点数
    • DecimalType:任意精度有符号带小数的数值
  • String类型
  • Binary二进制类型
  • Boolean布尔类型
  • Datetime时间类型
    • TimestampType:时间戳类型
    • DateType:日期类型,只包含年月日
  • Complex复杂类型
    • ArrayType:数组类型
    • MapType:map类型
    • StructType:包含StructField序列的结构体

所有的数据类型都在org.apache.spark.sql.types中。

NaN含义

NaN是not a number的简写,用于处理不符合浮点数格式的float和double数据,其语义需要特殊处理:

  • NaN = NaN返回true
  • 聚集过程中,所有NaN会被放到同一分组中
  • NaN在join过程中被看成普通的值
  • NaN在升序排序时放到最后,被认为是最大的数值

本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。

HBase工作原理学习 - 2016-07-22 18:07:56

HBase工作原理学习   1 HBase简介 HBase是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建大规模结构化的存储集群。HBase的目标是存储并处理大型数据,具体来说是仅需使用普通的硬件配置,就能够处理由成千上万的行和列所组成的大型数据。 与MapReduce的离线批处理计算框架不同,HBase是一个可以随机访问的存储和检索数据平台,弥补了HDFS不能随机访问数据的缺陷,适合实时性要求不是非常高的业务场景。HBase存储的都是Byte数组
一:RDD粗粒度与细粒度 粗粒度: 在程序启动前就已经分配好资源(特别适用于资源特别多而且要进行资源复用) 细粒度:计算需要资源是才分配资源,细粒度没有资源浪费问题。 二: RDD 的解密: 1,分布式(擅长迭代式是spark的精髓之所在) 基于内存(有些时候也会基于硬盘) 特别适合于计算的计算框架 2,RDD代表本身要处理的数据,是一个数据集Dataset RDD本身是抽象的,对分布式计算的一种抽象 RDD 定义: 弹性分布数据集 代表一系列的数据分片 3,RDD弹性之一: 自动进行内存和磁盘数据存储的

spark 集群搭建 详细步骤 - 2016-07-22 18:07:20

最近好不容易搞到了三台测试机,可以用来搭建spark集群搞模型。本宝宝开心得不行,赶紧行动,把spark集群搭起来,模型跑起来。 1.搭建hadoop集群 hadoop的hdfs文件系统是整个生态圈的基础,因为数据量大了以后,数据一般就都放hdfs上头了。因为四台测试机之前已经搭建好了hadoop集群环境,而且经过本宝宝测试,hadoop集群也是可用的,所以就省了搭hadoop集群的功夫。 2.配置集群host 四台机器的hostname如下: namenodetest01.hadoop.xxx.com

使用Fuel安装Openstack - 2016-07-22 18:07:11

Openstack自动化部署工具,主要用于生产环境. 一. 环境准备 这里用的是Openstack 9.0版本. Fuel Documentation 下载 Fuel for OpenStack镜像文件 , 用于安装Feul Master. 安装 Xshell , 用于远程连接. 二. 安装fuel_master节点 1. VirtualBox网络配置 管理-全局设定-网络-仅主机(Host-Only)网络 新建三张新的网卡: Host-Only Ethernet Adapter #1 IPv4: 10.
flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。 一、什么是Flume? flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,Flume O

KafkaStreams介绍(二) - 2016-07-22 18:07:55

说明: 本文是Confluent Platform 3.0版本中对于Kafka Streams的翻译。 原文地址: https://docs.confluent.io/3.0.0/streams/index.html 看了很多其他人翻译的文档,还是第一次翻译,有什么翻译的不好的地方还请指出。   这是Kafka Streams介绍的第二篇,以前的介绍如下: http://blog.csdn.net/ransom0512/article/details/51971112   1.  快速入门 1.1.  目
2016年7月19日下午,笔者做客国泰君安通信研究团队”软银收购ARM“深度解读电话会议,与在线的150多位机构投资者分享了对于”软银收购ARM“的个人观点。 以下为电话会议实录,略经编辑以及后期补充部分观点。 主持人:各位同事朋友大家下午好,我是国泰君安通信行业分析师宋嘉吉,欢迎大家今天参加本次电话会议,此次会议的主题是软银收购ARM,7月18号软银宣布以243亿英镑收购半导体IP供应商ARM,是对未来物联网战略的提前卡位,我们认为这也是物联网行业布局芯片的又一重磅催化。今天大唐电信封了涨停,按照我们对
一、HBase伪分布式集群安装 1、安装包解压 $ cd app/ $ tar -xvfhbase-1.2.0-cdh5.7.1.tar.gz $ rmhbase-1.2.0-cdh5.7.1.tar.gz   2、添加环境变量 $ cd ~ $ vim .bashrc exportHBASE_HOME=/home/developer/app/hbase-1.2.0-cdh5.7.1 exportPATH=$PATH:$HBASE_HOME/bin $ source .bashrc   3、编辑hbase
本文将介绍Oracle集成云Agent的基础架构,所包含的组件,和如何连接云与OP应用。 目前/典型的集成方式 目前常用的将云应用/基于互联网的应用与企业内部部署(OP)应用连接的方式为:穿透一层或者更多的防火墙,使用反向代理、Oracle API Gateway或者OHS。要实现这些操作需要多种专业知识,比如防火墙需要开放入站端口,暴露一个私有的SOAP/REST服务并且配置网络路由。SOAP/REST服务可以用SOA套件之类的产品实现,比如与CRM系统进行通讯,实现客户信息的接收。如下图所示: 如果使
本次主要是详细记录Docker1.12在Ubuntu16.04上的安装过程,创建Docker组(避免每次敲命令都需要sudo),Docker常用的基本命令的总结,在容器中运行Hello world,以及创建一个基于Python Flask的web应用容器的全过程。 1.Docker1.12在Ubuntu16.04上安装 1.1.先决条件1,添加Docker源 wxl @wxl - pc: ~ $ sudo apt-get update 增加CA证书 wxl@wxl -pc :~$ sudo apt -ge