Skip to content

Instantly share code, notes, and snippets.

@anthavio
Last active December 13, 2018 17:05
Show Gist options
  • Save anthavio/a7a856654fbdf3471a1588c008d111d3 to your computer and use it in GitHub Desktop.
Save anthavio/a7a856654fbdf3471a1588c008d111d3 to your computer and use it in GitHub Desktop.
Standalone Spark cluster http://master:8080/json/ client
package zx.spark
import java.time.{Instant, LocalDateTime, ZoneId}
import java.util.concurrent.TimeUnit
import cats.effect.Sync
import cats.implicits._
import com.fasterxml.jackson.annotation.JsonFormat
import com.fasterxml.jackson.core.{JsonParser, JsonToken}
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.databind.deser.std.StdDeserializer
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.databind.{DeserializationContext, DeserializationFeature, JsonDeserializer, ObjectMapper}
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.softwaremill.sttp._
import scala.util.Try
import com.typesafe.scalalogging.StrictLogging
import scala.concurrent.duration.{Duration, FiniteDuration}
/**
* Standalone Spark cluster http://master:8080/json/
* Not available for Mesos, Yarn, ... clusters
*/
trait SparkSaAPI[F[_]] {
def fetch(): F[SaApiResponse]
}
object SparkSaAPI {
def apply[F[_]: Sync](
endpointUrl: String,
connTimeout: Duration = Duration.apply(3, TimeUnit.SECONDS),
readTimeout: Duration = Duration.apply(15, TimeUnit.SECONDS)): SparkSaAPI[F] =
new DefaultSparkSaAPI[F](endpointUrl, connTimeout, readTimeout)
}
/**
* Ignoring fields workers, activedrivers, completedapps as we do not need them
*/
case class SaApiResponse(
url: String,
cores: Int,
coresUsed: Int,
memory: Int,
memoryused: Int,
activeapps: List[SaApiSparkApp])
sealed trait SaApiAppState
sealed trait ActiveSaAppState extends SaApiAppState {
val isActive = true
}
sealed trait InactiveSaAppState extends SaApiAppState {
val isActive = false
}
object SaApiAppState {
case object RUNNING extends ActiveSaAppState
case object WAITING extends ActiveSaAppState
case object FINISHED extends InactiveSaAppState
case object KILLED extends InactiveSaAppState
def fromString(stateStr: String): Option[SaApiAppState] = stateStr match {
case "RUNNING" => Some(RUNNING)
case "WAITING" => Some(WAITING)
case "FINISHED" => Some(FINISHED)
case "KILLED" => Some(KILLED)
case _ => None
}
}
case class SaApiSparkApp(
id: String,
name: String,
cores: Int,
user: String,
memoryperslave: Int,
state: SaApiAppState,
@JsonDeserialize(using = classOf[MillisecondDeserializer])
starttime: LocalDateTime, // 1544355643695
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "EEE MMM d HH:mm:ss z yyyy")
submitdate: LocalDateTime, // "Sun Dec 09 12:40:43 CET 2018"
duration: java.time.Duration)
class DefaultSparkSaAPI[F[_]: Sync](endpointUrl: String, connTimeout: Duration, readTimeout: Duration)
extends SparkSaAPI[F] with StrictLogging {
private val jackson = new ObjectMapper() with ScalaObjectMapper
jackson.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
jackson.configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, false) // DurationDeserializer
jackson
.registerModule(DefaultScalaModule) // Beware https://issues.apache.org/jira/browse/SPARK-22128
.registerModule(new JavaTimeModule())
val module = new SimpleModule()
module.addDeserializer(classOf[SaApiAppState], new SaApiAppStateDeserializer())
jackson.registerModule(module)
class SaApiAppStateDeserializer extends StdDeserializer[SaApiAppState](classOf[SaApiAppState]) {
def deserialize(parser: JsonParser, context: DeserializationContext): SaApiAppState =
SaApiAppState
.fromString(parser.getValueAsString)
.getOrElse(throw new IllegalArgumentException(s"${parser.getValueAsString} is not a valid state value"))
}
private implicit val backend: SttpBackend[Id, Nothing] = HttpURLConnectionBackend(
options = SttpBackendOptions.connectionTimeout(FiniteDuration(connTimeout.toMillis, TimeUnit.MILLISECONDS)))
def parseJson[B: Manifest](content: String): Either[Throwable, B] = Either.fromTry(Try(jackson.readValue[B](content)))
def asJson[B: Manifest]: ResponseAs[Either[Throwable, B], Nothing] = asString.map(content => parseJson[B](content))
def fetch(): F[SaApiResponse] = {
def extract(response: Response[Either[Throwable, SaApiResponse]]): F[SaApiResponse] = {
response.body match {
case Left(content) =>
Sync[F].raiseError(
new RuntimeException(
s"Unexpected ${response.code} ${response.statusText} response from $endpointUrl :\n$content"))
case Right(parsed) => Sync[F].fromEither(parsed)
}
}
Sync[F].defer {
val request =
sttp.get(uri"$endpointUrl").readTimeout(readTimeout).response(asJson[SaApiResponse])
logger.debug(s"Requesting ${request.method} ${request.uri}")
request
.send()
.flatMap(response => extract(response))
}
}
}
class MillisecondDeserializer(val zoneId: ZoneId) extends JsonDeserializer[LocalDateTime] {
def this() = this(ZoneId.of("UTC"))
override def deserialize(parser: JsonParser, ctxt: DeserializationContext): LocalDateTime = {
val token = parser.getCurrentToken
token match {
case JsonToken.VALUE_STRING => toDate(java.lang.Long.parseLong(parser.getText.trim))
case JsonToken.VALUE_NUMBER_INT => toDate(parser.getLongValue)
case _ => throw ctxt.wrongTokenException(parser, JsonToken.VALUE_STRING, "Expected a string or numeric value")
}
}
private def toDate(millis: Long) = Instant.ofEpochMilli(millis).atZone(zoneId).toLocalDateTime
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment