Skip to content

Instantly share code, notes, and snippets.

@michellemay
Last active May 12, 2017 15:02
Show Gist options
  • Save michellemay/1ae47d91b6d45a85a27ed11c24fcceb2 to your computer and use it in GitHub Desktop.
Save michellemay/1ae47d91b6d45a85a27ed11c24fcceb2 to your computer and use it in GitHub Desktop.
HDFS to S3 using spark and hadoop
import org.apache.hadoop.fs.{FileSystem, Path, FileUtil}
def elapsed[A](f: => A): (A, Double) = {
val s = System.nanoTime
val ret = f
val elapsed = (System.nanoTime - s) / 1e9
(ret, elapsed)
}
val inputPathStr = "hdfs://namenode/user/hadoop/archive/"
val outputPathStr = "s3://bucket/archive/"
def folders() = {
val inputPath = new Path(inputPathStr)
val inputfs: FileSystem = inputPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
val folders: List[String] = inputfs.listStatus(inputPath).map(_.getPath.toString).toList
folders
}
val result = elapsed { sc.parallelize(folders(), 100).mapPartitions(partition => {
val conf = new org.apache.hadoop.conf.Configuration()
val ifs = FileSystem.get(new java.net.URI(inputPathStr), conf)
val ofs = FileSystem.get(new java.net.URI(outputPathStr), conf)
partition.map(inputStr => {
val input = new Path(inputStr)
FileUtil.copy(ifs, input, ofs, new Path(outputPathStr, input.getName), false, conf)
})
}).collect }
result._2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment