说明
OSS Select还支持其他过滤条件,详情请参见 SelectObject 。
对比TPC-H的查询通过测试TPC-H中query1.sql对于lineitem这个table的查询性能,来检验配置效果 。为了能使OSS Select过滤更多的数据,我们将where条件由l_shipdate <= ‘1998-09-16’改为where l_shipdate > ‘1997-09-16’,测试数据大小为2.27 GB 。仅使用Spark SQL查询和在Spark SQL上使用OSS Select查询的方式如下:
- 仅使用Spark SQL查询
[root@cdh-master ~]# hadoop fs -ls oss://select-test-sz/data/lineitem.csv
-rw-rw-rw-1 2441079322 2018-10-31 11:18 oss://select-test-sz/data/lineitem.csv - 在Spark SQL上使用OSS Select查询
scala> import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType, DoubleType}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType, DoubleType}
scala> import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.{Row, SQLContext}
scala> val sqlContext = spark.sqlContext
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@74e2cfc5
scala> val textFile = sc.textFile("oss://select-test-sz/data/lineitem.csv")
textFile: org.apache.spark.rdd.RDD[String] = oss://select-test-sz/data/lineitem.csv MapPartitionsRDD[1] at textFile at <console>:26
scala> val dataRdd = textFile.map(_.split('|'))
dataRdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:28
scala> val schema = StructType(
|List(
|StructField("L_ORDERKEY",LongType,true),
|StructField("L_PARTKEY",LongType,true),
|StructField("L_SUPPKEY",LongType,true),
|StructField("L_LINENUMBER",IntegerType,true),
|StructField("L_QUANTITY",DoubleType,true),
|StructField("L_EXTENDEDPRICE",DoubleType,true),
|StructField("L_DISCOUNT",DoubleType,true),
|StructField("L_TAX",DoubleType,true),
|StructField("L_RETURNFLAG",StringType,true),
|StructField("L_LINESTATUS",StringType,true),
|StructField("L_SHIPDATE",StringType,true),
|StructField("L_COMMITDATE",StringType,true),
|StructField("L_RECEIPTDATE",StringType,true),
|StructField("L_SHIPINSTRUCT",StringType,true),
|StructField("L_SHIPMODE",StringType,true),
|StructField("L_COMMENT",StringType,true)
|)
| )
schema: org.apache.spark.sql.types.StructType = StructType(StructField(L_ORDERKEY,LongType,true), StructField(L_PARTKEY,LongType,true), StructField(L_SUPPKEY,LongType,true), StructField(L_LINENUMBER,IntegerType,true), StructField(L_QUANTITY,DoubleType,true), StructField(L_EXTENDEDPRICE,DoubleType,true), StructField(L_DISCOUNT,DoubleType,true), StructField(L_TAX,DoubleType,true), StructField(L_RETURNFLAG,StringType,true), StructField(L_LINESTATUS,StringType,true), StructField(L_SHIPDATE,StringType,true), StructField(L_COMMITDATE,StringType,true), StructField(L_RECEIPTDATE,StringType,true), StructField(L_SHIPINSTRUCT,StringType,true), StructField(L_SHIPMODE,StringType,true), StructField(L_COMMENT,StringType,true))
scala> val dataRowRdd = dataRdd.map(p => Row(p(0).toLong, p(1).toLong, p(2).toLong, p(3).toInt, p(4).toDouble, p(5).toDouble, p(6).toDouble, p(7).toDouble, p(8), p(9), p(10), p(11), p(12), p(13), p(14), p(15)))
dataRowRdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:30
scala> val dataFrame = sqlContext.createDataFrame(dataRowRdd, schema)
dataFrame: org.apache.spark.sql.DataFrame = [L_ORDERKEY: bigint, L_PARTKEY: bigint ... 14 more fields]
scala> dataFrame.createOrReplaceTempView("lineitem")
scala> spark.sql("select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from lineitem where l_shipdate > '1997-09-16' group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus").show()推荐阅读
- 通过OSS 构建大数据分析PB级数仓
- 别克昂科威点烟器怎么使用
- 壁挂炉停暖如何使用 壁挂炉怎么停暖气功能
- 《怪物猎人崛起》大剑流斩连段使用心得
- 使用洗衣机方式不当比没洗之前更脏
- 移动式脚手架使用规范 移动脚手架使用规范是什么
- 华为电话手表怎么使用
- 如何使用指南针确定方向,如何使用指南针确定方向的方法
- 宝骏510寿命
- 一辆汽车能开多少年
