Skip to content

Instantly share code, notes, and snippets.

@Vitaliy-Savkin
Created February 7, 2017 12:45
Show Gist options
  • Save Vitaliy-Savkin/ed25da9eed9129afd20a095ba4d328d6 to your computer and use it in GitHub Desktop.
Save Vitaliy-Savkin/ed25da9eed9129afd20a095ba4d328d6 to your computer and use it in GitHub Desktop.
trait PlayerProfile
trait PlayerProps
trait Signal
type EnrichedSignal = (Signal, PlayerProfile, PlayerProps)
val sc: StreamingContext = ???
val ss: DStream[(String, Signal)] = ???
val as: DStream[(String, PlayerProfile)] = ???
val ps: DStream[(String, PlayerProps)] = ???
val spec = StateSpec
.function(trackStateFunc _)
.initialState(sc.sparkContext.emptyRDD[(String, (PlayerProfile, PlayerProps))])
def goToCassandra(key: String): (PlayerProfile, PlayerProps) = ???
def trackStateFunc(
batchTime: Time,
key: String,
value: Option[(Option[PlayerProfile], Option[PlayerProps])],
state: State[(PlayerProfile, PlayerProps)]): Option[(PlayerProfile, PlayerProps)] = {
state.getOption().map { case (profile, props) =>
value.map { case (newProfile, newProps) =>
(newProfile.getOrElse(profile), newProps.getOrElse(props))
}
}.getOrElse(Some(goToCassandra(key)))
}
val result: DStream[EnrichedSignal] = ss
.leftOuterJoin(as)
.leftOuterJoin(ps)
.map { case (key, ((_, profile), props)) => key -> (profile, props) }
.mapWithState(spec)
.stateSnapshots()
.join(ss)
.map { case (_, ((profile, props), signal)) => (signal, profile, props) }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment