Last active
January 24, 2019 20:30
-
-
Save Timshel/1d9ace8b3fd441c4c4940d734ec76286 to your computer and use it in GitHub Desktop.
Implementation of a Throttle with a sliding window (with a probably useless `nice` delay)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage | |
import akka.stream.stage._ | |
import akka.stream._ | |
import scala.concurrent.duration.{ FiniteDuration, _ } | |
/** | |
* Implementation of a Throttle with a sliding window | |
*/ | |
class SlidingThrottle[T] | |
(max: Int, per: FiniteDuration) | |
(nice: FiniteDuration = SlidingThrottle.nice(max, per)) extends SimpleLinearGraphStage[T] { | |
require(max > 0, "max must be > 0") | |
require(per.toNanos > 0, "per time must be > 0") | |
require(per.toNanos >= max, "Rates larger than 1 unit / nanosecond are not supported") | |
private val nanosPer = per.toNanos | |
private val nanosNice = nice.toNanos | |
private val timerName: String = "ThrottleTimer" | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { | |
var willStop = false | |
var emittedTimes = scala.collection.immutable.Queue.empty[Long] | |
var last: Long = System.nanoTime | |
var currentElement: T = _ | |
def pushThenLog(elem: T): Unit = { | |
push(out, elem) | |
last = System.nanoTime | |
emittedTimes = emittedTimes :+ last | |
if( willStop ) completeStage() | |
} | |
def schedule(elem: T, nanos: Long): Unit = { | |
currentElement = elem | |
scheduleOnce(timerName, nanos.nanos) | |
} | |
def receive(elem: T): Unit = { | |
var now = System.nanoTime | |
emittedTimes = emittedTimes.dropWhile { t => t + nanosPer < now } | |
if( emittedTimes.length < max ) { | |
val delay = last + nanosNice - now | |
if( delay >= 0 ) schedule(elem, delay) else pushThenLog(elem) | |
} else { | |
schedule(elem, emittedTimes.head + nanosPer - System.nanoTime) | |
} | |
} | |
// This scope is here just to not retain an extra reference to the handler below. | |
// We can't put this code into preRestart() because setHandler() must be called before that. | |
{ | |
val handler = new InHandler with OutHandler { | |
override def onUpstreamFinish(): Unit = | |
if (isAvailable(out) && isTimerActive(timerName)) willStop = true | |
else completeStage() | |
override def onPush(): Unit = receive(grab(in)) | |
override def onPull(): Unit = pull(in) | |
} | |
setHandler(in, handler) | |
setHandler(out, handler) | |
// After this point, we no longer need the `handler` so it can just fall out of scope. | |
} | |
override protected def onTimer(key: Any): Unit = { | |
var elem = currentElement | |
currentElement = null.asInstanceOf[T] | |
receive(elem) | |
} | |
} | |
override def toString = "Throttle" | |
} | |
object SlidingThrottle{ | |
def nice(max: Int, per: FiniteDuration): FiniteDuration = | |
((per.toNanos / max) * 0.1).toLong.nanos | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment