Spark使用OSS Select加速数据查询( 三 )


sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4bdef487

scala> sqlContext.sql("CREATE TEMPORARY VIEW people USING com.aliyun.oss " +
|"OPTIONS (" +
|"oss.bucket 'select-test-sz', " +
|"oss.prefix 'people', " + // objects with this prefix belong to this table
|"oss.schema 'name string, company string, age long'," + // like 'column_a long, column_b string'
|"oss.data.format 'csv'," + // we only support csv now
|"oss.input.csv.header 'None'," +
|"oss.input.csv.recordDelimiter 'rn'," +
|"oss.input.csv.fieldDelimiter ','," +
|"oss.input.csv.commentChar '#'," +
|"oss.input.csv.quoteChar '"'," +
|"oss.output.csv.recordDelimiter 'n'," +
|"oss.output.csv.fieldDelimiter ','," +
|"oss.output.csv.quoteChar '"'," +
|"oss.endpoint 'oss-cn-shenzhen.aliyuncs.com', " +
|"oss.accessKeyId 'Your Access Key Id', " +
|"oss.accessKeySecret 'Your Access Key Secret')")
res0: org.apache.spark.sql.DataFrame = []

scala>val sql: String = "select count(*) from people where name like 'Lora%'"
sql: String = select count(*) from people where name like 'Lora%'

scala>sqlContext.sql(sql).show()
+--------+
|count(1)|
+--------+
|31770|
+--------+

scala> val textFile = sc.textFile("oss://select-test-sz/people/")
textFile: org.apache.spark.rdd.RDD[String] = oss://select-test-sz/people/ MapPartitionsRDD[8] at textFile at <console>:24

scala> textFile.map(line => line.split(',')).filter(_(0).startsWith("Lora")).count()
res3: Long = 31770
从下图可看到:使用OSS Select查询数据耗时为15s,不使用OSS Select查询数据耗时为54s,使用OSS Select能大幅度加快查询速度 。

Spark使用OSS Select加速数据查询


Spark对接OSS Select支持包的实现(Preview)通过扩展Spark的 DataSource API 可以实现Spark对接OSS Select 。通过实现PrunedFilteredScan,可以把需要的列和过滤条件下推到OSS Select执行 。目前这个支持包还在开发中,定义的规范和支持的过滤条件如下: