Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save davidgin/5709580d7573491a980bf475752d1819 to your computer and use it in GitHub Desktop.
Save davidgin/5709580d7573491a980bf475752d1819 to your computer and use it in GitHub Desktop.
hdfs scala utils for modified folders
import org.apache.hadoop.fs._
import org.apache.hadoop.conf.Configuration
import scala.collection.mutable.ListBuffer
object HdfsUtil {
val pf = new PathFilter {
override def accept(path: Path): Boolean = {
val name = path.getName
!name.startsWith(".")
}
}
def getNewPaths(fs: FileSystem, path: Path, epochTS: Long): List[Path] = fs.globStatus(path,pf).filter { x => x.getModificationTime > epochTS }.map { x => x.getPath }.toList
def getDirtyHoursAsTupleList(fs: FileSystem, path: Path, epochTS: Long): Set[(String, String)] = getNewPaths(fs, path, epochTS).map { p => (p.getParent.getParent.getName, p.getParent.getName) }.toSet
def getDirtyHoursAsString(fs: FileSystem, path: Path, epochTS: Long): String = getNewPaths(fs, path, epochTS).map { p => (p.getParent.getParent.getName, p.getParent.getName) }.mkString(",")
def getNewestPathByName(fs: FileSystem, path: Path): Long =
{
try {
fs.globStatus(path,pf).maxBy { x => x.getPath.getName }.getPath.getName.toLong
} catch {
//when no previous runs, timestamp will be oldest timestamp. E.g., 0
case uoe: UnsupportedOperationException => 0
}
}
def getNewestPathByModificationTime(fs: FileSystem, path: Path): Long =
try {
fs.globStatus(path,pf).maxBy { x => x.getModificationTime }.getModificationTime
} catch {
//when no previous runs, timestamp will be oldest timestamp. E.g., 0
case uoe: UnsupportedOperationException => 0
}
def getHadoopFS: FileSystem = FileSystem.get(new Configuration)
def getHadoopFS(conf: Configuration): FileSystem = FileSystem.get(conf)
def comparePathsModifications(fs: FileSystem, dailyPath: Path, hourlyPath: Path): ListBuffer[String] = {
def getParentName(f: FileStatus): String = f.getPath.getParent.getName
def getGrandParentName(f: FileStatus): String = f.getPath.getParent.getParent.getName
def getNameModMap(p: Path, f: FileStatus => String): Map[String, Long] =
fs.globStatus(p,pf).groupBy { f }.map(f => f._1 -> f._2.maxBy { x => x.getModificationTime }.getModificationTime.toLong)
val dailyfolders1 = getNameModMap(dailyPath, getParentName)
val hourlyFolders = getNameModMap(hourlyPath, getGrandParentName)
val folders = ListBuffer[String]()
for ((k, v) <- hourlyFolders) if (dailyfolders1.getOrElse(k, 0l) < v ) folders append k
folders
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment