Free Will

Spark笔记(5):Spark SQL

Spark SQL所使用的数据抽象并非RDD,而是DataFrame。DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,它不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能。Spark能够轻松实现从Mysql到DataFrame的转化,并且支持SQL查询。

Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLCOntext及HiveContext接口,来实现对其数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能。

Sparksession支持从不同的数据源加载数据,以及把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身的表,然后使用SQL语句来操作数据。

一、RDD 转换成 DataFrame

Spark支持两种方法实现从RDD转换成DataFrame。第一种方法是,利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;第二种方法是,使用编程接口,构造一个schema并将其应用在已知的RDD上。

反射机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
%pyspark
from pyspark.sql import SparkSession
spark = SparkSession(sc)
data = sc.parallelize([("hadoop",2),("spark",3),("hive",4),("spark",4)])
dataframe = data.toDF(["project","number"])
dataframe.createOrReplaceTempView('TempTable')
spark.sql("select project, sum(number) as sum from TempTable group by project").show()
+-------+---+
|project|sum|
+-------+---+
| spark| 7|
| hadoop| 2|
| hive| 4|
+-------+---+

运用编程接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
%pyspark
from pyspark.sql.types import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
spark = SparkSession(sc)
data = sc.parallelize([("hadoop",2),("spark",3),("hive",4),("spark",4)])
def return_row(item):
return Row(**{"project":item[0],"number":item[1]})
schema = StructType([
StructField('project',StringType(),True),
StructField('number',IntegerType(),True )
])
data = data.map(return_row)
data.take(10)
data = spark.createDataFrame(data,schema)
data.show()
data.createOrReplaceTempView("TempTable")
spark.sql("select project,sum(number) as sum from TempTable group by project").show()
+-------+------+
|project|number|
+-------+------+
| hadoop| 2|
| spark| 3|
| hive| 4|
| spark| 4|
+-------+------+
+-------+---+
|project|sum|
+-------+---+
| spark| 7|
| hadoop| 2|
| hive| 4|
+-------+---+

二、DataFrame常用操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
dataframe.printSchema()
root
|-- project: string (nullable = true)
|-- number: long (nullable = true)
dataframe.select(dataframe.project,dataframe.number+1).show()
+-------+------------+
|project|(number + 1)|
+-------+------------+
| hadoop| 3|
| spark| 4|
| hive| 5|
| spark| 5|
+-------+------------+
dataframe.filter(dataframe.number>3).show()
+-------+------+
|project|number|
+-------+------+
| hive| 4|
| spark| 4|
+-------+------+
dataframe.groupby("project").sum().show()
+-------+-----------+
|project|sum(number)|
+-------+-----------+
| spark| 7|
| hadoop| 2|
| hive| 4|
+-------+-----------+
dataframe.sort(dataframe.number).show()
+-------+------+
|project|number|
+-------+------+
| hadoop| 2|
| spark| 3|
| spark| 4|
| hive| 4|
+-------+------+

三、读取与保存

读取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
## json
spark.read.json(file)
## csv
spark.read.csv(file, header=True, inferSchema=True)
## mysql
sql="(select * from mysql.db where db='wp230') t"
df = spark.read.format('jdbc').options(
url='jdbc:mysql://127.0.0.1',
dbtable=sql,
user='root',
password='123456'
).load()
## parquet
spark.read.parquet(file)
## hdfs
spark.read.csv(file)

保存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
## csv
df.write.csv(path=file_path, header=True, sep=",", mode='overwrite')
## parquet
df.write.parquet(path=file_path,mode='overwrite')
## hdfs
df.write.mode("overwrite").options(header="true").csv(file_path)
## mysql
df.write.mode("overwrite").format("jdbc").options(
url='jdbc:mysql://127.0.0.1',
user='root',
password='123456',
dbtable="test.test",
batchsize="1000",
).save()

四、用户自定义函数

虽然spark.sql.function中已包含大量常用函数,但总有一些特定场景无法满足,这就需要使用udf,一个基本udf创建的流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 1.创建普通的python函数
def toDate(s):
return str(s)+'-'
# 2.注册自定义函数
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# 根据python的返回值类型定义好spark对应的数据类型
# python函数中返回的是string,对应的pyspark是StringType
toDateUDF=udf(toDate, StringType())
# 使用自定义函数
df1.withColumn('color',toDateUDF('color')).show()

最简单的就是通过lambda函数,不需要定义返回值类型,可以直接使用

1
2
3
4
5
6
7
# 创建udf自定义函数
from pyspark.sql import functions
concat_func = functions.udf(lambda name,age:name+'_'+str(age)) # 简单的连接两个字符串
# 应用自定义函数
concat_df = spark_df.withColumn("name_age",concat_func(final_data.name, final_data.age))
concat_df.show()

以上两例是在dataframe中使用,也可以在spark.sql中使用:

1
2
3
4
5
6
7
8
9
10
11
# 定义自定义函数
def is_nulludf(fieldValue, defaultValue):
if fieldValue == None:
return defaultValue
return fieldValue
# 注册自定义函数
spark.udf.register("is_nulludf", is_nulludf)
# 使用自定义函数
spark.sql("select col_name, is_nulludf(col_name) as col_name2 from table ")


应统联盟


连接十万名应统专业同学


阿药算法


打通算法面试任督二脉