当前位置: 江南文明网 > 科技 > 智能 >

大数据开发之sparkSQL的使用分享

条评论

大数据开发之sparkSQL的使用分享

  SparkSQL,使用SQL来完成大数据操作

  Spark之前使用RDD操作大数据,非常方便,但是也有各种问题,例如RDD每次读取的都是字符串,以及语法比较比较麻烦。针对这种情况,spark在新版本中升级RDD为DataFrame和DataSet,并使用SQL的方式去操作数据

  DataFrame,RDD的升级版,分布式的数据集,并且以列的方式组合的,类似于二维表格式,除数据外保存数据结构信息

  DataSet,DataFrame扩展,最新的数据抽象,相对于DataFrame,DataSet会记录字段的数据类型,并进行严格错误检查

  三者的关系是RDD进化 >>> DataFrame ,DataFrame进化 >>> DataSet。

  当然这里也不是说RDD不用了,而是把RDD转为底层处理,所以同学们还是需要先理解什么是RDD

  常用方法

  DataSet,DataFrame 的使用是依赖于SparkSession的,所以我们需要先创建SparkSession

  val spark = new SparkContext(

  new SparkConf()。setMaster("local")

  。setAppName("taobao")

  read.csv("路径")

  val data = spark.read.csv("data/A.csv")

  val data = spark.read

  。option("header","true") // 设置读取首行,这里的声明用于把数据首行作为列名【关注尚硅谷,轻松学IT】

  。csv("data/A.csv")

  show(num)

  显示顶部num行数据

  map(func)

  操作和RDD中类似,不同的是需要隐式转换,在代码前加上

  import spark.implicits._

  data.map(

  x => { // raw 可以通过下标获得到对应中,不需要切分

  (x.getString(0),x.getString(1)。toInt)

  }

  toDF("列名"…)

  转换成DataFrame类型,并设置列名

  select(col: String, cols: String*): DataFrame

  查询指定列并返回数据

  val r2 = data.select("名称",

  "人均价格")

  r2.show(10)

  selectExpr(exprs: String*): DataFrame

  执行原生的SQL中函数

  data.selectExpr("count(name)")

  data.selectExpr("avg(age)")

  rdd

  把DataFrame和DataSet转换成RDD类型

  printSchema()

  查看表结构

  root

  -- age: long (nullable = true)

  -- name: string (nullable = true)

  filter(Str)

  执行过滤

  filter("age>10")

  filter("age>10 and name='张三'")

  sum("列名") \ avg("列名") \ max("列名") \ min("列名") \ count()

  针对列进行求和

  平均值

  最大值

  最小值

  数量

  data.groupBy("age")。sum("age")。show()

  data.groupBy("age")。avg("age")。show()

  orderBy("列名")

  排序

  val r2 = data.map(

  x => {

  (x.getString(0),x.getString(1)。toInt)

  }

  )。toDF("名称","评论数")

  。orderBy("评论数")

  r2.show(10)

  这里注意如果需要降序需要

  val r2 = data.map(

  x => {

  (x.getString(0),x.getString(1)。toInt)

  }

  )。toDF("名称","评论数")

  。orderBy(desc("评论数"))

  r2.show(10)

  注意导入import org.apache.spark.sql.functions.desc

  除了上述的方法外,Spark还支持直接使用SQL的方式操作数据,方法如下

  createOrReplaceTempView(str)

  创建临时表,注意使用SQL的时候需要根据当前数据创建临时表,这样才可以在SQL里面使用

  data.createOrReplaceTempView("user")

  sql(str)

  在当前数据集上执行SQL语句

  val result = spark.sql("select name from user")

  result.show()

  val count = spark.sql("select count(*) from user")

  count.show()

  spark.udf.register(fName,func)

  自定义一个函数,用于SQL中处理

  spark.udf.register("f1",(x:String) => (x+"a"))

  val result = spark.sql("select f1(name) from user")

  result.show()