sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4bdef487
scala> sqlContext.sql("CREATE TEMPORARY VIEW people USING com.aliyun.oss " +
|"OPTIONS (" +
|"oss.bucket 'select-test-sz', " +
|"oss.prefix 'people', " + // objects with this prefix belong to this table
|"oss.schema 'name string, company string, age long'," + // like 'column_a long, column_b string'
|"oss.data.format 'csv'," + // we only support csv now
|"oss.input.csv.header 'None'," +
|"oss.input.csv.recordDelimiter 'rn'," +
|"oss.input.csv.fieldDelimiter ','," +
|"oss.input.csv.commentChar '#'," +
|"oss.input.csv.quoteChar '"'," +
|"oss.output.csv.recordDelimiter 'n'," +
|"oss.output.csv.fieldDelimiter ','," +
|"oss.output.csv.quoteChar '"'," +
|"oss.endpoint 'oss-cn-shenzhen.aliyuncs.com', " +
|"oss.accessKeyId 'Your Access Key Id', " +
|"oss.accessKeySecret 'Your Access Key Secret')")
res0: org.apache.spark.sql.DataFrame = []
scala>val sql: String = "select count(*) from people where name like 'Lora%'"
sql: String = select count(*) from people where name like 'Lora%'
scala>sqlContext.sql(sql).show()
+--------+
|count(1)|
+--------+
|31770|
+--------+
scala> val textFile = sc.textFile("oss://select-test-sz/people/")
textFile: org.apache.spark.rdd.RDD[String] = oss://select-test-sz/people/ MapPartitionsRDD[8] at textFile at <console>:24
scala> textFile.map(line => line.split(',')).filter(_(0).startsWith("Lora")).count()
res3: Long = 31770
从下图可看到:使用OSS Select查询数据耗时为15s,不使用OSS Select查询数据耗时为54s,使用OSS Select能大幅度加快查询速度 。

Spark对接OSS Select支持包的实现(Preview)通过扩展Spark的 DataSource API 可以实现Spark对接OSS Select 。通过实现PrunedFilteredScan,可以把需要的列和过滤条件下推到OSS Select执行 。目前这个支持包还在开发中,定义的规范和支持的过滤条件如下:
- 定义的规范:
scala> sqlContext.sql("CREATE TEMPORARY VIEW people USING com.aliyun.oss " +
|"OPTIONS (" +
|"oss.bucket 'select-test-sz', " +
|"oss.prefix 'people', " + // objects with this prefix belong to this table
|"oss.schema 'name string, company string, age long'," + // like 'column_a long, column_b string'
|"oss.data.format 'csv'," + // we only support csv now
|"oss.input.csv.header 'None'," +
|"oss.input.csv.recordDelimiter 'rn'," +
|"oss.input.csv.fieldDelimiter ','," +
|"oss.input.csv.commentChar '#'," +
|"oss.input.csv.quoteChar '"'," +
|"oss.output.csv.recordDelimiter 'n'," +
|"oss.output.csv.fieldDelimiter ','," +
|"oss.output.csv.quoteChar '"'," +
|"oss.endpoint 'oss-cn-shenzhen.aliyuncs.com', " +
|"oss.accessKeyId 'Your Access Key Id', " +
|"oss.accessKeySecret 'Your Access Key Secret')") 字段说明oss.bucket数据所在的Bucket 。oss.prefix拥有这个前缀的Object都属于定义的这个TEMPORARY VIEW 。oss.schema这个TEMPORARY VIEW的schema,目前通过字符串指定,后续会通过一个文件来指定schema 。oss.data.format数据内容的格式,目前支持CSV格式,其他格式也会陆续支持 。oss.input.csv.*定义CSV输入格式参数 。oss.output.csv.*定义CSV输出格式参数 。oss.endpointBucket所在的Endpoint 。oss.accessKeyId填写AccessKeyId 。oss.accessKeySecret填写AccessKeySecret 。说明 目前只定义了基本参数,详情请参见 SelectObject,其余参数将陆续支持 。
- 支持的过滤条件:=,<,>,<=, >=,||,or,not,and,in,like(StringStartsWith,StringEndsWith,StringContains) 。对于不能下推的过滤条件,例如算术运算、字符串拼接等通过PrunedFilteredScan获取不到的条件,则只下推需要的列到OSS Select 。
推荐阅读
- 通过OSS 构建大数据分析PB级数仓
- 别克昂科威点烟器怎么使用
- 壁挂炉停暖如何使用 壁挂炉怎么停暖气功能
- 《怪物猎人崛起》大剑流斩连段使用心得
- 使用洗衣机方式不当比没洗之前更脏
- 移动式脚手架使用规范 移动脚手架使用规范是什么
- 华为电话手表怎么使用
- 如何使用指南针确定方向,如何使用指南针确定方向的方法
- 宝骏510寿命
- 一辆汽车能开多少年
