Last active
May 12, 2017 15:02
-
-
Save michellemay/1ae47d91b6d45a85a27ed11c24fcceb2 to your computer and use it in GitHub Desktop.
HDFS to S3 using spark and hadoop
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.fs.{FileSystem, Path, FileUtil} | |
def elapsed[A](f: => A): (A, Double) = { | |
val s = System.nanoTime | |
val ret = f | |
val elapsed = (System.nanoTime - s) / 1e9 | |
(ret, elapsed) | |
} | |
val inputPathStr = "hdfs://namenode/user/hadoop/archive/" | |
val outputPathStr = "s3://bucket/archive/" | |
def folders() = { | |
val inputPath = new Path(inputPathStr) | |
val inputfs: FileSystem = inputPath.getFileSystem(spark.sparkContext.hadoopConfiguration) | |
val folders: List[String] = inputfs.listStatus(inputPath).map(_.getPath.toString).toList | |
folders | |
} | |
val result = elapsed { sc.parallelize(folders(), 100).mapPartitions(partition => { | |
val conf = new org.apache.hadoop.conf.Configuration() | |
val ifs = FileSystem.get(new java.net.URI(inputPathStr), conf) | |
val ofs = FileSystem.get(new java.net.URI(outputPathStr), conf) | |
partition.map(inputStr => { | |
val input = new Path(inputStr) | |
FileUtil.copy(ifs, input, ofs, new Path(outputPathStr, input.getName), false, conf) | |
}) | |
}).collect } | |
result._2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment