记录生活、记录历史

Spark

2019.05.15

Spark 学习笔记。Spark 的学习项目。基本上很多内容都是直接来自于官网。本项目会不断更新,以达到更合适,更简洁,更优雅地实现Spark程序等。

简介

官网 | 下载 | API文档 | 官方快速开始 | Archive | Scala官网 | Spark Github

Apache Spark是一个开源集群运算框架,最初是由加州大学柏克莱分校AMPLab所开发。相对于HadoopMapReduce会在运行完工作后将中介数据存放到磁盘中,Spark使用了存储器内运算技术,能在数据尚未写入硬盘时即在存储器内分析运算。Spark在存储器内运行程序的运算速度能做到比Hadoop MapReduce的运算速度快上100倍,即便是运行程序于硬盘时,Spark也能快上10倍速度。Spark允许用户将数据加载至集群存储器,并多次对其进行查询,非常适合用于机器学习算法。

版本

Spark JDK Scala Python Date
[*] 2.3.2 8+ 2.11 2.7+/3.4+
2.4.3 8+ 2.12 2.7+/3.4+

修订记录

修订日期 修订人 说明
2020-07-28 taliove 调整结构,整合之前的文档。

安装

下载处下载spark-2.4.3-bin-hadoop2.7.tgz,解压至目录E:\spark-2.1.0-bin-hadoop2.7。将 Spark 目录下的 bin 目录放置到环境变量处。

windows:

linux:

1
2
3
4
5
6
vi /etc/profile
# 插入如下语句
export SPARK_HOME=/usr/local/spark-2.1.0
export PATH=$PATH:$SPARK_HOME/bin
# 执行语句,使环境变量生效
source /etc/profile

快速开始

Spark 开发

初始化

Spark 程序必须做的第一件事是创建一个SparkContext对象。它告诉 Spark 如何访问集群。要创建 SparkContext 首先需要构建一个包含有相关应用程序信息的SparkConf对象。

每个 JVM 只能激活一个 SparkContext。在创建新的 SparkContext 之前,你必须将正在运行的 SparkContent 调用stop()命令关闭该 Spark 进程。

1
2
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

appName是 Spark 应用程序的名称,它将会显示在集群的UI界面上。

masterSpark,Mesos或Yarn集群的URL,或者可以直接指定为local,以本地模式运行。

使用 Shell

在 Spark Shell 中,已经为您创建好了一个特殊的 SparkContent,变量名为sc。如果你在 Shell 中使用自定义的 SparkContext,将会报错。可以使用的参数有:

  • --master 指定上下文连接到主服务器
  • local[integer] 指定需要使用的核心数
  • --jars 指定 JAR 文件
  • --repositories 指定JAR包库地址
  • --packages 指定包名,本参数通常与--repositories值匹配使用

使用四个核心运行:

1
$ ./bin/spark-shell --master local[4]

添加 code.jar 到类路径:

1
$ ./bin/spark-shell --master local[4] --jars code.jar

使用 maven:

1
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.0.1"

RDD

Spark 是围绕 弹性分布式数据集(RDD) 的概念而展开。RDD 是一个可以并行操作的容错集合。创建 RDD 有两种方法:并行化驱动程序中的现有集合,或引用外部存储系统中的数据集,例如共享文件系统,HDFS,HBase等。

RDD 操作

RDD 支持两种类型的操作:

  • 转换:从现有数据集创建新数据集
  • 操作:在数据集上运行计算后将计算结果返回到驱动程序

例如,map是一个转换,它通过一个函数传递每个数据集元素,并返回一个表示结果的新 RDD。另一方面,reduce是一个使用某个函数聚合 RDD 的所有元素,并最终结果返回驱动程序的。

为了说明 RDD 基础知识,举个例子:

1
2
3
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

第一次行定义来自外部文件的RDD。此数据集并未加载到内存中。其中的 lines 只是文件的指针。lineLengths 也不是马上计算。 只有最后执行的 reduce 动作时,Spark 才将计算分解在不同的机器上运行。

如果 lineLengths 以后想再次使用,可以这样:

1
lineLengths.persist()

使 lineLengths 在第一次计算后保存到内存中。

转换

转换 释义
map(func) 返回通过参数func传递源的每个元素形成新的分布式数据集。
filter(func) 返回使func返回值为true的所有元素的集合
flatMap(func) 和 map 比较类似。不同的是函数可返回 0项或更多项的结果。所以它的返回值应该是一个数组。
mappartitions(func) 和 map 比较类似。不同的是它在每个分区或块上单独运行,因此函数 func 在 type T 类型上运行时,也必须是同样的类型。
mapPartitionsWithIndex(func) 与 mapPartitions 类似。不同的是 func 提供了分区的索引值。因此当在类型T的RDD上运行时,func 也必须是类型(Int Iterator) => Iterator
sample(withReplacement, fraction, seed) 使用给定的随机数种子,对数据的一小部分进行采样。
union(otherDataset) 合并两个数据集
intersection(otherDataset) 取两个数据集的交集
distinct([numTasks]) 去重数据集
groupByKey([numTasks]) 对数据集进行分组。如果需要对每个键都执行聚合(例如求和或平均值)进行分组,则使用 reduceByKey 或 aggregateByKey,将会产生更好的性能。
reductByKey(func,[numTasks]) 使用给定的聚合函数 func 对每个元素进行聚合
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 使用给定的聚合函数 func 对每个元素取一个中立值进行聚合
sortByKey([ascending], [numTasks] 返回按键升降序排列的集合
join(otherDataset, [numTasks]) 当调用 (K, V) 和 (K, W) 的数据集时,返回数据集连接的聚合 (K, ( V ,W ))。支持leftOuterJoin,rightOuterJoinfullOuterJoin
cogroup(otherDataset, [numTasks]) 当调用类型为 (K, V) 和 (K, W) 的数据集时,返回 (K, (Iterable, Iterable)元组的数据集。此操作也称为groupWith
cartesian(otherDataset) 当调用类型为T和U的数据集时,返回(T,U)对的数据集(所有元素对)。(笛卡尔集)
pipe(command, [envVars]) 通过 Shell 命令管道RDD的每个分区,写入进程 stdin,并且输出到其 stdout 的行将作为字符串的RDD返回。
coalesce(numPartitions) 将RDD中的分区数减少为 numPartitions。过滤大型数据集后,可更有效的运行操作。
repartition(numPartitions) 随机重新调整RDD中的数据以创建更多或更多的分区,并在它们之间平衡。
repartitionAndSortWithinPartitions(partitioner)] 根据给定的分区重新分区RDD,并在每个生成的分区中按键值对重新对记录排序。这比repartition在每个分区中调用然后排序更有效,因为它可以将排序推送到shuffe机器中。

操作

操作 释义
reduce(func) 使用函数 func 来聚合数据集中的每一个元素。该函数接受两个参数,并返回一个值。
collect() 在驱动程序中将数据集中所有的元素以数组的方式返回
count() 返回数据集中元素的总数
first() 返回数据集可第一个元素。相当于take(1)
take(n) 返回数据集中前n个元素的数组
takeSample(withReplacement, num, [seed] 使用指定的随机种子返回num个元素的随机样本
takeOrdered(n, [ordering] 使用自然顺序或自定义的比较器返回数据集中的前n个元素
saveAsTextFile(path) 将数据集的元素作为文本或文本文件集写入本地文件系统,HDFA或任何其他 Hadoop 支持的文件系统的给定目录中。Spark 将在每个元素上调用toString ,将其转换为文件中的一行文本。
saveAsSequenceFile(path) (Java and Scala) 将数据集的元素作为 Hadoop SequenceFile 写入本地文件系统,HDFS或任何其他 Hadoop 支持的文件系统中的给定路径。这可以在实现 Hadoop 的 Writable 接口的键值对的 RDD 上使用。在 Scala 中,它也可以在可隐式转换为 Writable 的类型上使用。(Spark 包括基本类型的转换,如Int,Double,String等)。
saveAsObjectFile(path) (Java and Scala) 使用 JAVA 序列化以简单格式编写数据集。可以使用SparkContext.objectFile()进行加载。
countByKey() 仅适用于 (K, V) 的 RDD。返回 (K, Int) 对的散列映射,其中包含每个键的计数。
foreach(func) 在数据集的每个元素上运行函数func

RDD 的持久化

Spark 中最重要的功能之一便是跨操作在内存中*持久化(或缓存)*数据集。当您持久保存 RDD 时,每个节点都会存储它在内存中计算的任何分区,并在该数据集(或从中派生的数据集)的其他操作中重用。这使得操作执行速度更快。缓存是迭代算法和快速交互式使用的关键工具。

可以使用persist()cache()方法标记要保留的 RDD。第一次在 action 中计算它,它将保留在节点的内存中。Spark 的缓存是容错的:如果丢失了 RDD 的任何分区,它将使用最初创建它的转换自动重新计算。

每个持久化的 RDD 可以使用不同的存储级别进行存储。例如,可以将数据缓存到磁盘或内存中。通过传递StorageLevel对象来设置这些级别persist()cache()方法是使用默认存储级别的简写,即StorageLevel.MEMORY_ONLY(在内存中存储反序列化的对象)。

存储级别列表:

存储级别 释义
MEMORY_ONLY 将 RDD 存储为 JVM 中的反序列化 JAVA 对象。如果 RDD 不适合内存,则某些分区将不会被缓存,并且每次需要时都会重新计算。这是默认的级别。可以通过调用cache()来使用。
MEMORY_AND_DISK 将 RDD 存储为 JVM 中的反序列化 JAVA 对象。如果 RDD 不适合内存,请存储不适合磁盘的分区,并在需要时从那里读取它们。
MEMORY_ONLY_SER(java/scala) 将 RDD 存储为序列化 JAVA 对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间。但是CPU的占用会比较高
MEMORY_AND_DISK_SER(java/scala) 与 MEMORY_ONLY_SER 类型,但将不适合内存的分区溢出到磁盘,而不是每次需要时动态重新计算它们。
DISK_ONLY 仅将 RDD 分区存储在磁盘上
MEMORY_ONLY_2, MEMORY_AND_DISK_2 等 与上面的级别相同,但复制两个集群节点上的每个分区
OFF_HEAP(实验) 与 MEMORY_ONLY_SER 类似,但将数据存储在堆首内存中。这需要启用堆外内存。

删除数据

Spark 会自动监视每个节点上的缓存使用情况,并以最近最小使用(LRU)的方式删除旧数据分区。如果您想手动删除 RDD 而不是等待它退出缓存,请使用方法RDD.unpersist()

共享变量

广播变量

使用SparkContext.broadcast(v)创建广播变量 v 。广播变量是一个包装器,可以通过调用该value方法来访问它的值。

1
val broadcastVar = sc.broadcast(Array(1, 2, 3))

创建广播变量后,炒使用它来代替 v 的值,这样这 v 值就不会多次传送到节点。此处,广播之后,变量不该被修改,以确保所有的节点获得相同的变量值。

累加器
1
2
3
4
5
val accum = sc.longAccumulator("My Accumulator")

sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

accum.value //Long  = 10

请注意 Spark 的惰性。如果在 RDD 上的操作中更新它们,则只有在 RDD 作为操作的一部分计算时,才会更新累加器的值。

Spark SQL, DataFrames 和 Datasets

与 Spark RDD API 不同, Spark SQL 提供的接口为 Spark 提供了有关数据结构与正在执行计算的更多信息。

  • Dataset:数据集是分布式的数据集合。
  • DataFrame:是使用列名组织起来的数组集(Dataset)。

在 Scala API 中,DataFrame 它只是类型 Dataset[Row]的一个别名。 在 Java API 中,用户需要使用 Dataset 来表示 DataFrame。

初始化 SparkSession

Spark 中所有功能的入口点都是SparkSession类。要创建基本的SparkSession,只需要使用SparkSession.builder()

1
2
3
4
5
6
7
8
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("Spark SQL示例")
.config("spark.some.config.option", "some-value")
.getOrCreate()

//  引用可将RDD转换为DataFrames的隐式转换
import spark.implicits._

创建 DataFrame

应用程序可以从现有的 RDD,Hive表或 Spark 数据源创建 DataFrame。 使用 JSON 创建 DataFrame:

1
2
3
val df = spark.read.json("examples/src/main/resources/people.json")
//  显示DataFrame的内容
df.show()

无类型数据集操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

程序化运行SQL查询

1
2
3
df.createOrReplaceTempView("people");
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show();

全局临时视图

Spark SQL 中的临时视图是会话范围。如果你希望拥有一个在所有会话之间共享的临时视图并保持活动状态,直到 Spark 程序终止 ,你可以创建一个全局临时视图。

1
2
3
4
5
6
7
//  将 DF 注册到系统保留的数据数 global_temp
df.createGlobalTempView("people")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()

//  全局临时视图是跨会话的,所以新创建一个会话
spark.newSession().sql("SELECT * FROM global_temp.people").show()

临时视图不会缓存到内存中,需要显式调用df.cache

创建数据集 Datasets

数据集(Datasets)与 RDD 类似,但是它们不使用 JAVA 序列化或 Kryo。而是使用专用的编码器来序列化对象以便通过网络进行处理或传输。

1
2
3
4
5
case class Person(name: String, age: Long)

//  为案例类创建编码器
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()

与 RDDs 的交互

通过反射
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
//  读取并转换数据
val peopleDF = spark.sparkContext
  .textFile("src/main/resources/people.csv")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
//  创建或替换临时视图
peopleDF.createOrReplaceTempView("people")
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

//  使用索引值访问
teenagersDF.map(teenager => "Name:" + teenager(0)).show()

//  使用字段名称访问
teenagersDF.map(teenager => "Name:" + teenager.getAs[String]("name")).show()
编译式指定元信息

如果无法提交定义案例类,则可以通过以下方式创建:

  • 从原始 RDD 创建的行级数据创建 RDD
  • 使用 StructType 匹配步骤1中创建的 RDD 内容
  • 通过SparkSession提供的 createDataFrame 方法将模式应用于 Row
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import spark.implicits._
import org.apache.spark.sql.types._

//  创建一个RDD
val peopleRDD = spark.sparkContext.textFile("src/main/resources/people.csv")
//  使用字符串作为模式创建
val schemaString = "name age"
//  使用字符串的模式创建模式
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

//  将RDD转换成 Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

//  使用RDD应用模式
val peopleDF = spark.createDataFrame(rowRDD, schema)

//  创建临时视图
peopleDF.createOrReplaceTempView("people")

//  此时SQL可以在临时视图上运行
val results = spark.sql("SELECT name FROM people")

//  输出结果
results.map(attributes => "Name: " + attributes(0)).show()

聚合

聚合是 DataFrames 内置的功能。它提供了像count(), countDistinct(), avg(), max(), min(),等等聚合方法。

无类型的用户自定义聚合函数

用户自定义聚合函数必须继承自UserDefinedAggregateFunction虚类。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
object MyAverage extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)

override def bufferSchema: StructType = {
  StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}

override def dataType: DataType = DoubleType

override def deterministic: Boolean = true

override def initialize(buffer: MutableAggregationBuffer): Unit = {
  buffer(0) = 0L
  buffer(1) = 0L
}

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  if (!input.isNullAt(0)) {
    buffer(0) = buffer.getLong(0) + input.getLong(0)
    buffer(1) = buffer.getLong(1) + 1
  }
}

override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
  buffer1(1) = buffer1.getLong(0) + buffer2.getLong(0)
}

override def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("User Defined Test").master("local")
  .getOrCreate()

//  注册自定义聚合类
spark.udf.register("myAverage", MyAverage)

val df = spark.read.json("src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()

val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
}

数据源

Spark SQL 直接通过 DataFrame 接口对各种数据源进行操作。DataFrame 可以使用关系转进行操作,也可以用于创建临时视图。将 DataFrame 注册为临时视图允许您对其数据运行 SQL 查询。

通用加载/保存功能

通常需要手动指定数据源类型(json, parquet, jdbc, orc, libsvm, csv, text)。

要加载 JSON 文件,可以使用:

1
2
3
4
5
val usersDF = spark.read.format("json").load("src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

//载入时使用
spark.read.load("namesAndFavColors.parquet")

要加载 CSV 文件使用:

1
2
3
4
5
val peopleDFCsv = spark.read.format("csv")
    .option("sep", ";")
    .option("inferSchema", "true")
    .option("title", true)
    .load("src/main/resources/people.csv")

直接在文件上运行 SQL:

1
val sqlDF = spark.sql("SELECT * FROM parquest.`src/main/resources/users.parquet`")
保存模式

在执行保存操作时,可以选择保存的模式。如果使用Overwrite,数据将在写入新数据之前被删除。

Scala/Java 任何语言 释义
SaveMode.ErrorIfExists(default) “error” or “errorifexists” 将 DataFrame 保存到数据源时,如果数据已存在,则会引发异常
SaveMode.Append “append” 将 DataFrame 到数据源时,如果数据或表已存在,则 DataFrame 的内容会附加到现有数据上。
SaveMode.Overwrite “overwrite” 覆盖模式意味着在将 DataFrame 保存数据时,如果数据或表已存在,则预期的数据将会覆盖原有数据。
SaveMode.Ignore “ignore” 忽略模式,表示将 DataFrame 保存到数据源时,如果数据或表已存在时,预期的数据操作将不会保存,也不会更改现有数据内容。

保存模式的使用:

1
2
df.select("name", 
"age").write.format("parquet").mode(SaveMode.ErrorIfExists).save("peopleSave.parquet")
保存到持久表

DataFrames同样的可以使用命令saveAsTable持久化表保存到Hive Metastore中。请注意:使用此功能不需要现有的Hive部署。

Spark 将为您创建默认的本地 Hive Metastore(使用Derby)。与命令createOrReplaceTempView不同的是,saveAsTable将实现 DataFrame 的内容并创建指向 Hive Metastore 中数据的指针。只要您保持与同一 Metastore 的连接,即使 Spark 程序重新启动,持久表仍然存在。

可以通过调用table在方法在SparkSession上创建持久表。

对于基于文件的源,例如 text, parquet, json等,您可以通过path选项指定自定义的表路径。例如df.write.option("path", "some/path").saveAsTable("t")

删除表时,将不会删除自定义表路径。如果未指定自定义表路径,则 Spark 程序会将数据写入创建目录(spark-warehouse)下的默认表路径,删除表时,也将删除默认表路径。

Spark 常用开发示例

连接数据库

mysql

build.gradle中,添加 mysql 的引用:

1
2
3
dependencies {
    compile ("mysql:mysql-connector-java:5.1.46")
}

连接到mysql的示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
val spark = SparkSession.builder()
    .appName("Spark JDBC SQL示例")
    .master("local")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
val prop = new Properties
prop.setProperty("user", "root")
prop.setProperty("password", "root")

val df = spark.read.jdbc("jdbc:mysql://localhost:3306/yunpos", "cm_order", prop)
//或者直接使用:val df = spark.read.jdbc("jdbc:mysql://localhost:3306/yunpos?user=root&password=root", "cm_order", new Properties)

println(df.count())

配置相关

展示当前所有配置

1
spark.conf.getAll.foreach(println)

设置配置

1
spark.conf.set("hive.metastore.uris", "thrift://bigdata3:9083")

常用统计方法

使用SparkSession获取 Spark 程序的会话后的变量spark的操作举例。

在使用Spark操作大部分操作时,需要导入常用的方法类:

1
2
3
4
//  导入 SQL 相关的方法
import org.apache.spark.sql.functions._
//  导入 可将RDD隐式转换为DF的操作集合
import spark.implicits._

获取临时表

1
val DF = spark.table("table_name")

SQL

GROUP BY
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
val spark = SparkSession.builder()
  .appName("Spark MySql Score 示例")
  .master("local")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()
val df = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/yunpos?user=root&password=root", "sc_order", new Properties)
//  创建临时视图
df.createTempView("order")
//  直接使用SQL语句进行统计
val upDF = spark.sql("SELECT upstream_name, count(id) as count FROM order GROUP BY upstream_name")
upDF.show()
//  保存数据
upDF.write.format("json").saveAsTable("TW_SCORE_ORDER_W_X")
spark.stop()

具体例子参见项目中的src/main/scala/com.tangfan.spark.example.ScoreOrderTest

Join

笛卡尔积
1
joinDF1.join(joinDF2)
单字段

这种方式类似于a join b using column1形式。需要两个 DataFrame 具有相同的列名:

1
joinDF1.join(joinDF2, "id")
多字段
1
joinDF1.join(joinDF2, Seq("id", "name"))
指定join类型

指定字段可以指定 Join 的操作类型 inner、outer、left_outer、right_outer、left semi类型等。例如:

1
joinDF1.join(joinDF2, Seq("id", "name"), "inner")
指定字段
1
joinDF1.join(joinDF2, joinDF1("id") === joinDF2("t1_id"))

指定 join操作类型

1
joinDF1.join(joinDF2, joinDF1("id") === joinDF2("t1_id"), "inner")

参考文档: Spark-SQL之DataFrame操作

读取文件时指定编码格式

1
2
3
4
 val logDF = spark.read
      .option("header", "true")
      .option("encoding", "gbk")
      .csv("src/main/resources/log.csv")

列转类型

1
2
3
4
val iskwDF = logDF
      .withColumn("stepTemp", logDF.col("step").cast(IntegerType))
      .drop("step")
      .withColumnRenamed("stepTemp", "step")

过滤数据

一般使用filter进行数据的过滤。where是它的一个alias

1
2
3
4
//  等于一个值
df.filter($"iskw" === "1")
//  小于一个值
df.filter($"iskw" < 2) 

查询汇总计算

1
2
3
4
5
6
import sqlContext.implicits._
import org.apache.spark.sql.functions._
// 方法一
df.agg(sum("isa")).first.get(0)
// 方法二
df.select(col("steps")).rdd.map(_(0).asInstanceOf[Int]).reduce(_+_)

保存数据到CSV文件

1
df.repartition(1).write.option("header", "true").csv("src/main/resources/result.csv")

将DataFrame转换为List

1
val list = df.rdd.map(r => r(0).asInstanceOf[String]).collect.toList

where…in…

1
val resultUsrList = filter1DF.filter($"usrid".isin(filterIsaAsOneList: _*))

增量改查

创建dataframe的几个方法
  1. 本地seq + toDF创建DataFrame示例:
1
2
import sqlContext.implicits._ 
val df = Seq( (1, "First Value", java.sql.Date.valueOf("2010-01-01")), (2, "Second Value", java.sql.Date.valueOf("2010-02-01")) ).toDF("int_column", "string_column", "date_column")
  1. 通过case class + toDF创建DataFrame的示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
/ sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// 使用 sqlContext 执行 sql 语句.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// 注:sql()函数的执行结果也是DataFrame,支持各种常用的RDD操作.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
动态分区

使用spark执行hive覆盖指定分区操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
val spark = SparkSession.builder().appName("YuewenAdReport")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict") // 增加对分区操作权限
.getOrCreate()

//以下代码证明无效,会全盘覆盖
//val DF = spark.read.table("test");
//DF.write.mode("overwrite").format("hive").insertInto("tableName")

// 使用sql时:

spark.sql("select *, date_format(to_date(create_time, 'yyyy-MM-dd'), 'yyyyMMdd') as pt_day from table").createOrReplaceTempView("tempTable")
spark.sql("INSERT OVERWRITE TABLE demo_tab PARTITION (pt_day) SELECT stadt, geograph_breite, id, t.country FROM demo_stg t;)

使用spark-shell时:

1
2
3
spark-shell --conf "hive.exec.dynamic.partition=true" --conf "hive.exec.dynamic.partition.mode=nonstrict"

df.write.mode("overwrite").format("hive").insertInto("tableName")

主要的几个配置如下:

参数 默认值 注释
hive.exec.dynamic.partition TRUE 设置为true表示启用动态分区插入
hive.exec.dynamic.partition.mode strict strict模式时用户必须至少指定一个静态分区;nonstrict模式时所有分区都可能是动态分区
hive.exec.max.dynamic.partitions.pernode 100 每个mapper/reducer节点允许创建的最大动态分区数
hive.exec.max.dynamic.partitions 1000 总共允许创建的最大动态分区数
hive.exec.max.created.files 100000 一个MR job中允许创建多少个HDFS文件
hive.error.on.empty.partition FALSE 当动态分区插入产生空值时是否抛出异常

参考文档: apache-hive-partitions

删除分区
1
spark.sql("ALTER TABLE ds1505_tbl_sms_log DROP IF EXISTS PARTITION(pt_day = '20190101')").show()
随机排序
1
2
3
4
5
import spark.implicits._
val a = Seq("1", "2", "3", "4", "5", "6", "7", "8", "9").toDF("test")
a.show(false)
a.printSchema
a.orderBy(rand()).show
使用Foldleft多次调用withColumn新增列

You can use select with varargs including *:

1
2
3
4
import spark.implicits._

df.select($"*" +: Seq("A", "B", "C").map(c => 
  sum(c).over(Window.partitionBy("ID").orderBy("time")).alias(s"cum$c")): _*)

This:

Maps columns names to window expressions with Seq(“A”, …).map(…) Prepends all pre-existing columns with $"" +: …. Unpacks combined sequence with … : _. and can be generalized as:

1
2
3
4
5
6
7
8
9
import org.apache.spark.sql.{Column, DataFrame}

/**
 * @param cols a sequence of columns to transform
 * @param df an input DataFrame
 * @param f a function to be applied on each col in cols
 */
def withColumns(cols: Seq[String], df: DataFrame, f: String => Column) =
  df.select($"*" +: cols.map(c => f(c)): _*)

If you find withColumn syntax more readable you can use foldLeft:

1
2
3

Seq("A", "B", "C").foldLeft(df)((df, c) =>
  df.withColumn(s"cum$c",  sum(c).over(Window.partitionBy("ID").orderBy("time"))))

which can be generalized for example to:

1
2
3
4
5
6
7
8
/**
 * @param cols a sequence of columns to transform
 * @param df an input DataFrame
 * @param f a function to be applied on each col in cols
 * @param name a function mapping from input to output name.
 */def withColumns(cols: Seq[String], df: DataFrame, 
    f: String =>  Column, name: String => String = identity) =
  cols.foldLeft(df)((df, c) => df.withColumn(name(c), f(c)))

参考: Spark/Scala repeated calls to withColumn() using the same function on multiple columns

withColumn

创建列

条件创建
1
2
3
4
5
6
7
8
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ // for `toDF` and $""
import org.apache.spark.sql.functions._ // for `when`

val df = sc.parallelize(Seq((4, "blah", 2), (2, "", 3), (56, "foo", 3), (100, null, 5)))
    .toDF("A", "B", "C")

val newDf = df.withColumn("D", when($"B".isNull or $"B" === "", 0).otherwise(1))

导入导出

CSV
导入
  1. 将文件上传至服务器,并使用hadoop fs -put命令导入至hdfs
1
hadoop fs -put file.csv /user/tangf/
  1. 进入spark-shell,导入指定CSV
1
2
3
4
val table = spark.read.option("header", 
"true").csv("hdfs://bigdata4:8020/user/tangf/file.csv")

table.write.mode("overwrite").saveAsTable("table_name")

如果 csv 需要指定各个字段类型,以及以不同的文件格式打开则使用以下配置:

1
val table = spark.read.option("header", "false").option("delimiter", "`").option("encoding", "gbk").schema("ID BIGINT, MID STRING, WeekIndex INT, Mobile STRING, TaskId INT, InterfaceID INT, Content STRING, UserID INT, UpdateTime TIMESTAMP, AddTime TIMESTAMP, Status INT, SubmitErrorCode STRING, ReportErrorCode STRING, MobileArea STRING, ActualCount INT, ExtNo STRING, AccessCode STRING, DeductFlag INT").csv("hdfs://bigdata4:8020/user/test/file.csv")
  1. 如果含有分区,则使用以下方法进行分区写入
1
2
3
4
5
6
7
table.createOrReplaceTempView("table")

val saveTable = spark.sql("select *, date_format(to_date(AddTime, 'yyyy-MM-dd'), 'yyyyMMdd') as pt_day from table")

saveTable.write.mode("overwrite").partitionBy("pt_day").saveAsTable("imissms_large_black")
//分区重载写入,以防覆盖整个表(2019-10-24 未测试)
spark.sql("insert overwrite tableA partition(pt_day) select * from table")
  1. 保存模式有:overwriteappendErrorIfExistsIgnore

读表:spark.read.table("tableName")

导出
1
2
val table = spark.read.table("table_name")
table.write.mode("overwrite").option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").option("header", "false").csv("/user/test/table_name")
读取文件夹内所有文件

关键点: scala 使用正则 获取文本内容 读取文件夹下所有文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object CreditConvert {
  def main(args: Array[String]): Unit = {
     val spark = SparkSession.builder()
      .appName("Spark JDBC CSV")
      .master("local")
      .getOrCreate()
    import spark.implicits._
    //  注册获取当前处理的文件名称并获取从中获取日期字符串

    val get_file_name = udf((path: String) => "\\d{8}".r.findFirstIn(path).get.replaceAll("(\\d{4})(\\d{2})(\\d{2})", "$1-$2-$3"))
    val fileName = "F:\\test\\*.txt"

    val phoneDF = spark.read.text(fileName)
      .map(line => line.mkString.split(" {4}"))
      .flatMap(arr => {
        val phone = arr(0)
        val is_in = arr(1)
        val words = phone.split(" ")
        words.map(word => (word, is_in))
      }).withColumn("date", get_file_name(input_file_name())).toDF("phone", "is_in", "date")
    phoneDF.printSchema()
    phoneDF.show(false)
    phoneDF.repartition(1).write
      .option("header","true")
      .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") //解决 java.lang.IllegalArgumentException: Illegal pattern component: XXX 报错问题
      .csv("src/main/resources/test_20190906.csv")
  }
}

时间转换

时间戳转时间

1
2
3
4
select 
  from_unixtime(1508673584)                    as fut,
  cast(from_unixtime(1508673584) as date)      as futAsDate,
  cast(from_unixtime(1508673584) as timestamp) as futAsTimestamp;

Result:

fut futAsDate futAsTimestamp
2017-10-22 11:59:44 2017-10-22 2017-10-22 11:59:44.0

参考:Hive/SparkSQL: How to convert a Unix timestamp into a timestamp (not string)?

开窗函数

获取分组最小值/最大值

1
2
-- 获取分组下的最新的数据。根据CreateTime desc 获取最新
select * from (select uuid, state, firstCard, CreateTime, cardProduct, ROW_NUMBER() over(partition by uuid order by CreateTime desc) as rn from ds13457_PuFaCredit ) a where a.rn = 1 limit 100

开窗函数: 语法:[function] over(partition by deptno order by salary 函数有:row_number()、rank()、dense_rank()、sum()、avg()、first_value()、last_value()、lag()、lead()、ratio_to_report()、percent_rank()、cume_dist()、PERCENTILE_DISC()

参考: How to select the first row of each group? hive OVER(PARTITION BY)函数用法

文件存储

查询表的存储路径

1
spark.sql("desc formatted beacon_web_toutiao_report").toDF.filter('col_name === "Location").collect()(0)(1).toString

查询表使用到的文件:

1
spark.read.table("ds3147_tblPhoneTask").inputFiles

Spark Gradle 项目

gradle.properties

1
2
3
4
5
6
7
8
version=1.0.0-SNAPSHOT
scalaVersion=2.11.8
slf4jVersion=1.7.21
logbackVersion=1.1.7
sparkVersion=2.3.0

title=SparkLearn
group=com.tangfan.spark.example

build.gradle

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
plugins {
    id 'java'
    id 'scala'
    id 'idea'
    id 'application'
}
dependencies {
    compile "org.scala-lang:scala-library:${scalaVersion}"
    compile "org.scala-lang:scala-reflect:${scalaVersion}"
    compile "org.scala-lang:scala-compiler:${scalaVersion}"

    compile "org.apache.spark:spark-mllib_2.11:${sparkVersion}"
    compile "org.apache.spark:spark-sql_2.11:${sparkVersion}"
    compile "org.apache.spark:spark-streaming-flume-assembly_2.11:${sparkVersion}"
    compile "org.apache.spark:spark-graphx_2.11:${sparkVersion}"
    compile "org.apache.spark:spark-launcher_2.11:${sparkVersion}"
    compile "org.apache.spark:spark-catalyst_2.11:${sparkVersion}"
    compile "org.apache.spark:spark-streaming_2.11:${sparkVersion}"
    compile "org.apache.spark:spark-core_2.11:${sparkVersion}"
    compile group: 'commons-io', name: 'commons-io', version: '2.5'
}

如果有引用其它的组件,可以使用shadowJar插件将这些组件打包进去。

build.gradle

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
plugins {
    id 'com.github.johnrengelman.shadow' version '2.0.4'
}

shadowJar {
    classifier = 'shadow'
    dependencies {
        //  只将某些包打入进去
        include(dependency("com.xx:xx:1.0.0"))
    }
}

然后调用命令gradle shadownJar即可生成shadowJar包。

Spark 部署

在 YARN 上启动 Spark

以集群模式启动 Spark 应用程序:

1
$ ./bin/spark-submit --class path.to.your.CLass --master yarn --deploy-mode cluster [options] <app jar> [app options]

例如:

1
2
3
4
5
6
7
8
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --queue thequeue \
    xx.jar \
    10

引用

Spark 开发与运维

Spark-shell

启动参数

参数说明

参数 默认值 说明
–queue 使用yarn哪个队列
–executor-memory 是指定每个executor(执行器)占用的内存,num * memory 不能超过集群最大内存量
–total-executor-cores 所有executor总共使用的cpu核数
–executor-cores 每个executor使用的cpu核数
–num-executors executor个数
–以下参数需要使用--conf进行处理
spark.defalut.parallelism num_executors * cores * 2~3
spark.local.dir /tmp 指定spark执行临时目录

spark-shell 启动:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
spark-shell --executor-memory 15G --executor-cores 8

spark-shell \
--executor-memory 4G \
--num-executors 8 \
--executor-cores 4 \
--driver-memory 10G \
--conf "spark.defalut.parallelism=1000" \
--conf "spark.storage.memoryFraction=0.5" \
--conf "spark.shuffle.memoryFraction=0.3"

使用hive动态分区启动参数:

1
2
3
spark-shell \
--conf "hive.exec.dynamic.partition=true"
--conf "hive.exec.dynamic.partition.mode=nonstrict"

spark-submit 参数:

1
spark-submit --master yarn --deploy-mode client --driver-memory 1G --executor-memory 2G --executor-cores 4 --num-executors 3 --queue dev

Spark-submit

1
spark-submit --class 应用主类 --jars 执行过程中使用到的JAR包

查看执行时间

1
spark.time(spark.sql("xxx").show)

显示结果:

1
2
3
4
5
6
7
8
scala> spark.time(spark.sql("select count(1) as c from ds1505_tbl_sms_log").show)
+---------+
|        c       |
+---------+
|129877561|
+---------+

Time taken: 7834 ms

使用jar包内的日志配置文件

1
2
3
4
5
spark-submit --class com.path.to.class.InitialContactDriver \
--driver-java-options "-Dlog4j.configuration=file:log4j.properties" \
--conf "spark.executor.extraJavaOptions=Dlog4j.configuration=file:log4j.properties" \
--driver-class-path /home/cloudera/SNAPSHOT.jar \
--master yarn    /home/cloudera/SNAPSHOT.jar

关键点: 使用:--driver-class-path

灯塔各参数下的执行效率

参数处显式配置内容,其它配置使用默认。 计算麦远短信表的总量,约1.299亿数据总量。

代码参考:

1
spark.time(spark.sql("select count(1) as c from ds1505_tbl_sms_log").show)

测试结果:

参数 MASTER 队列 平均用时 首次用时
默认 default 6.7s 18s
默认 yarn default 6.7s 18.2s
15G 8Cores default 5s 15.6s
15G 8Cores yarn default 5.4s 14.9s
8G 4Cores default 5.2s 14s
8G 4Cores yarn default 5.2s 15.2s
4G 2Cores default 6s 15.2s
4G 2Cores yarn default 5.97s 16.1s
默认 dev 8.5s 20s
默认 yarn dev 7.5s 19.7s

其它测试:

参数 队列 平均用时 首次用时
2G 2Cores / –deploy-mode client –driver-memory 1G –num-executors 3 dev 5.6s

说明: 15G 8Cores表示的参数为:

1
--executor-memory 15G --executor-cores 8

文件上传

本地模式文件上传

参数配置为以下配置时,使用本地读取:

参数 配置
master local/yarn
deploy-mode client

读取文件方式:

1
2
3
4
5
6
//读取1:直接使用Files读取
new String(Files.readAllBytes(Paths.get(url)), "utf-8")

//读取2:使用Spark读取
val wholeFiles = spark.sparkContext.wholeTextFiles(tagsJsonUrl)
wholeFiles.collect().head._2

其中:url参数即可以为相对路径,也可以是绝对路径。在使用azkaban调度时,直接使用相对路径,即为azkaban zip包内的文件。

集群模型文件上传(待验证)

参数配置为以下配置时,使用集群获取:

参数 配置
master local/yarn
deploy-mode cluster

此时,文件需要使用spark-submit --files参数进行文件上传。获取文件的方式为:

1
val url = System.getenv("SPARK_YARN_STAGING_DIR") + s"/$url"