zoukankan      html  css  js  c++  java
  • 转】SparkSQL中的内置函数

    原博文来自于:  http://blog.csdn.net/u012297062/article/details/52207934    感谢!

     使用Spark SQL中的内置函数对数据进行分析,Spark SQL API不同的是,DataFrame中的内置函数操作的结果是返回一个Column对象,而DataFrame天生就是"A distributed collection of data organized into named columns.",这就为数据的复杂分析建立了坚实的基础并提供了极大的方便性,例如说,我们在操作DataFrame的方法中可以随时调用内置函数进行业务需要的处理,这之于我们构建附件的业务逻辑而言是可以极大的减少不必须的时间消耗(基于上就是实际模型的映射),让我们聚焦在数据分析上,这对于提高工程师的生产力而言是非常有价值的Spark 1.5.x开始提供了大量的内置函数,例如agg:

    [plain] view plain copy
     
    1. def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {  
    2. groupBy().agg(aggExpr, aggExprs : _*)  
    3. }  

    还有max、mean、min、sum、avg、explode、size、sort_array、day、to_date、abs、acros、asin、atan
    总体上而言内置函数包含了五大基本类型:
    1、聚合函数,例如countDistinct、sumDistinct等;
    2、集合函数,例如sort_array、explode等
    3、日期、时间函数,例如hour、quarter、next_day
    4、数学函数,例如asin、atan、sqrt、tan、round等;
    5、开窗函数,例如rowNumber等
    6、字符串函数,concat、format_number、rexexp_extract
    7、其它函数,isNaN、sha、randn、callUDF

    第一步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,Spark程序在本地运行

    [plain] view plain copy
     
    1. val conf = new SparkConf() //创建SparkConf对象  
    2. conf.setAppName("SparkSQL") //设置应用程序的名称,在程序运行的监控界面可以看到名称  
    3. //conf.setMaster("spark://DaShuJu-040:7077") //此时,程序在Spark集群  
    4. conf.setMaster("local")  

    第二步:创建SparkContext对象
          SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala、Java、Python、R等都必须有一个SparkContext
          SparkContext核心作用:初始化Spark应用程序运行所需要的核心组件,包括DAGScheduler、TaskScheduler、SchedulerBackend
          同时还会负责Spark程序往Master注册程序等
          SparkContext是整个Spark应用程序中最为至关重要的一个对象

    [plain] view plain copy
     
    1. val sc = new SparkContext(conf) //创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息  
    2. val sqlContext = new SQLContext(sc)   //构建SQL上下文</span>  
    [plain] view plain copy
     
    1. //要使用Spark SQL的内置函数,就一定要导入SQLContext下的隐式转换   
    [plain] view plain copy
     
    1. import sqlContext.implicits._  


    第三步:模拟数据,最后生成RDD

    [plain] view plain copy
     
    1. val userData = Array(  
    2.       "2016-3-27,001,http://spark.apache.org/,1000",  
    3.       "2016-3-27,001,http://hadoop.apache.org/,1001",  
    4.       "2016-3-27,002,http://fink.apache.org/,1002",  
    5.       "2016-3-28,003,http://kafka.apache.org/,1020",  
    6.       "2016-3-28,004,http://spark.apache.org/,1010",  
    7.       "2016-3-28,002,http://hive.apache.org/,1200",  
    8.       "2016-3-28,001,http://parquet.apache.org/,1500",  
    9.       "2016-3-28,001,http://spark.apache.org/,1800"  
    10.     )</span>  
    [plain] view plain copy
     
    1. val userDataRDD = sc.parallelize(userData)  //生成DD分布式集合对象  
    2. </span>  



    第四步:根据业务需要对数据进行预处理生成DataFrame,要想把RDD转换成DataFrame,需要先把RDD中的元素类型变成Row类型
          于此同时要提供DataFrame中的Columns的元数据信息描述

    [plain] view plain copy
     
    1. val userDataRDDRow = userDataRDD.map(row => {val splited = row.split(",") ;Row(splited(0),splited(1).toInt,splited(2),splited(3).toInt)})  
    2. val structTypes = StructType(Array(  
    3.       StructField("time", StringType, true),  
    4.       StructField("id", IntegerType, true),  
    5.       StructField("url", StringType, true),  
    6.       StructField("amount", IntegerType, true)  
    7. ))  
    [plain] view plain copy
     
    1. <span style="font-family: Arial, Helvetica, sans-serif;">val userDataDF = sqlContext.createDataFrame(userDataRDDRow,structTypes)</span>  

    第五步:使用Spark SQL提供的内置函数对DataFrame进行操作,特别注意:内置函数生成的Column对象且自定进行CG;

    [plain] view plain copy
     
      1. userDataDF.groupBy("time").agg('time, countDistinct('id)).map(row=>Row(row(1),row(2))).collect.foreach(println)  
      2. userDataDF.groupBy("time").agg('time, sum('amount)).show()  
  • 相关阅读:
    hibernate的dialect大全
    jdbc.properties 链接各类数据库的基本配置以及URL写法
    Springboot中redis的学习练习
    博客开通了
    Java String类的hashCode()函数
    Java String类中CaseInsensitiveComparator.compare()方法的实现
    git pull远程所有分支
    Python的权限修饰符
    Tmux快捷键
    __future__模块
  • 原文地址:https://www.cnblogs.com/zlslch/p/6040401.html
Copyright © 2011-2022 走看看