Spark DataFrame操作MySQL数据

Mon Oct 05 16:41:59 CST 2015 9215 大数据

文章摘要DataFrame是spark推出的一个API,能够使得大数据更为简单,从而拥有更广泛的受众群体。使用DataFrameSpark可以大大简化从前使用RDD对数据进行操作的繁琐。

在Spark中,DataFrame是一个以命名列方式组织的分布式数据集,等同于关系型数据库中的一个表,也相当于R/Python中的data frames(但是进行了更多的优化)。DataFrames可以由结构化数据文件转换而来,也可以从Hive中的表得来,甚至可以取自于外部数据库(MySQL等)或现有的RDD。


本文简单介绍DataFrame从MySQL中组织数据。


一、环境准备

  1. 首先确保你正确安装了spark,包括配置好环境;

  2. 建立一个数据库名为testDF,创建表user,包含如下数据

    id    name    age

    1    chen    21

    2    liang    22



二、从MySQL表中创建DataFrame

1.运行spark本地单进程模式:

spark-shell --master local

从shell的信息中可以看到,SparkContext与SqlContext已经为我们准备好了。

这时已经进入了scala的运行环境,可以直接输入scala语句并运行。


2.创建一个DataFrame,这个DataFrame将会包含一个MySQL表的数据

val tableDF = sqlContext.jdbc("jdbc:mysql://mysql_hostname:mysql_port/testDF?user=your_username&password=your_password", "user")

可以看见shell中显示创建org.apache.spark.sql.DataFrame成功,并输出了DataFrame的数据结构。

jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string, age: int]


(如果创建失败有异常那么请查看发生异常的原因,可能是由于没有找到mysql-connector.jar)


3.运行

tableDF.show()

可以看到shell中打印出(当然,颜色是没有的)

id    name    age

1    chen    21

2    liang    22

由此可见,DataFrame已经包含了testDF.user表!


运行

tableDF.printSchema()

可看到更详细的结构

root

 |-- id: integer (nullable = false)

 |-- name: string (nullable = false)

 |-- age: integer (nullable = false)



三、对DataFrame进行操作

  1. filter

对DataFrame中的数据进行过滤,返回值是过滤后的行组成的DataFrame.

val frame1 = tableFrame.filter(tableFrame("age") === 21)    //注意等于号,须为 "==="
frame1.show

可以看到输出结果只有  1   chen   21这条记录


2.select

从DataFrame中选出指定列,返回值是由指定列组成的DataFrame.

val frame2 = tableFrame.select("name", "age")
frame2.show


name    age

chen    21

liang    22


select还可以这么玩:

tableFrame.select(tableFrame("name"), tableFrame("age") + 1).show

name     (age + 1)

chen     22       

liang    23    



3.join

连接操作,这个函数有三个签名,分别为

//交叉连接(笛卡尔积)。参数为连接的右值
def join(right : org.apache.spark.sql.DataFrame) : org.apache.spark.sql.DataFrame = { /* compiled code */ }

//内连接。第二个参数为连接条件
def join(right : org.apache.spark.sql.DataFrame, joinExprs : org.apache.spark.sql.Column) : org.apache.spark.sql.DataFrame = { /* compiled code */ }

//可在第三个参数中指定连接类型。(如"left_outer")为左外连接
def join(right : org.apache.spark.sql.DataFrame, joinExprs : org.apache.spark.sql.Column, joinType : scala.Predef.String) : org.apache.spark.sql.DataFrame = { /* compiled code */ }

为了尝试一下join,需要新建一个表address,并插入如下数据

name    address

chen    ShenZhen

liang    ZhanJiang

像步骤二那样,创建一个DataFrame来描述表address的数据,命名为tableDF2.


然后运行

tableDF.join(tableDF2, tableDF("name") === tableDF2("name")).show

看到输出为

id name  age name  address  

2  liang 22  liang ZhanJiang

1  chen  21  chen  ShenZhen 


4.group

分组

tableDF.groupBy("name").count().show

看到输出为

name  count

liang 1       

chen  1    

count: Unit = ()


5.sort

tableDF.sort("age").show


6.take

取出前n行数据,返回类型为Array[ROW]

tableDF.take(2)


7.foreach

对DataFrame每一行执行一次同样的操作

tableDF.foreach(row => (println(row.getString(1))))    //打印每一行位于第二列的name. (列计数从0开始)


等等....




打赏
打赏

分享到: