Created
July 18, 2014 20:40
-
-
Save j14159/d3cbe172f7b962d74d09 to your computer and use it in GitHub Desktop.
Naive/early S3N RDD for Spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Started to rough this naive S3-native filesystem RDD out because I need to use IAM | |
* profiles for S3 access and also https://issues.apache.org/jira/browse/HADOOP-3733. | |
* | |
* Use at your own risk, bear in mind this is maybe 30 - 45min of work and testing and | |
* expect it to behave as such. | |
* | |
* Feedback/criticism/discussion welcome via Github/Twitter | |
* | |
* In addition to Spark 1.0.x, this depends on Amazon's S3 SDK, dependency is as follows: | |
* "com.amazonaws" % "aws-java-sdk" % "1.7.4" | |
*/ | |
package com.askuity.rdd | |
import com.amazonaws.auth.InstanceProfileCredentialsProvider | |
import com.amazonaws.services.s3.AmazonS3Client | |
import com.amazonaws.services.s3.model.{ GetObjectRequest, ObjectListing } | |
import java.io.{ BufferedReader, InputStreamReader } | |
import org.apache.spark.{ Partition, SparkContext, TaskContext } | |
import org.apache.spark.rdd.RDD | |
import scala.collection.JavaConverters._ | |
private [rdd] case class S3NPartition(idx: Int, bucket: String, path: String) extends Partition { | |
def index = idx | |
} | |
/** | |
* Directly construct and use, roughly equivalent to SparkContext.textFile calls but give this | |
* a list/sequence of files you want to load. This currently makes 1 Partition per file and | |
* once constructed, just use it like any other RDD. | |
* | |
* Example below will construct a RDD from all files starting with "some-files/file-" in the | |
* S3 bucket "my-bucket": | |
* | |
* new S3RDD(yourSparkContext, "my-bucket", new S3NListing("my-bukkit").list("some-files/file-")) | |
*/ | |
class S3NRDD(sc: SparkContext, bucket: String, files: Seq[String]) extends RDD[String](sc, Nil) { | |
private def instanceCreds() = new InstanceProfileCredentialsProvider().getCredentials | |
override def getPartitions: Array[Partition] = | |
files.zipWithIndex.map { case (fn, i) => S3NPartition(i, bucket, fn) }.toArray | |
override def compute(split: Partition, context: TaskContext): Iterator[String] = split match { | |
case S3NPartition(_, bucket, path) => | |
val client = new AmazonS3Client(instanceCreds()) | |
val obj = client.getObject(new GetObjectRequest(bucket, path)) | |
val br = new BufferedReader(new InputStreamReader(obj.getObjectContent())) | |
Iterator.continually(br.readLine()).takeWhile { | |
case null => | |
br.close() | |
false | |
case _ => true | |
} | |
} | |
} | |
/** | |
* Simple helper to find files within the given bucket. | |
*/ | |
class S3NListing(bucket: String) { | |
private def instanceCreds() = new InstanceProfileCredentialsProvider().getCredentials | |
lazy val client = new AmazonS3Client(instanceCreds) | |
/** | |
* List files behind a given prefix, e.g. "" for all, "my-folder", | |
* "my-folder/files-that-start-like-this", etc. Will eagerly fetch | |
* all truncated results. | |
*/ | |
def list(folder: String) = recursiveListing(folder, None, Nil) | |
@scala.annotation.tailrec | |
private def recursiveListing(folder: String, prev: Option[ObjectListing], memo: List[Seq[String]]): List[String] = prev match { | |
case None => | |
val listing = client.listObjects(bucket, folder) | |
val keys = listing.getObjectSummaries.asScala.map(_.getKey) | |
if (listing.isTruncated) | |
recursiveListing(folder, Some(listing), keys :: memo) | |
else | |
keys.toList | |
case Some(lastListing) => | |
val listing = client.listNextBatchOfObjects(lastListing) | |
val keys = listing.getObjectSummaries.asScala.map(_.getKey()) | |
if(listing.isTruncated) | |
recursiveListing(folder, Some(listing), keys :: memo) | |
else | |
(keys :: memo).flatten | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment