-
-
Save sgodbillon/4250672 to your computer and use it in GitHub Desktop.
package bugs | |
import play.api.libs.iteratee._ | |
import scala.util.Failure | |
import scala.util.Success | |
import scala.concurrent.Future | |
import scala.concurrent.Promise | |
object StackOverflowErrorBug { | |
import scala.concurrent.ExecutionContext.Implicits.global | |
trait Cursor { | |
def iterator :Iterator[String] | |
def hasNext :Boolean | |
def next :Future[Cursor] | |
def i: Int = 0 | |
def n(cursor: Cursor) = | |
if(cursor.iterator.hasNext) { | |
Future(Some((cursor,Some(cursor.iterator.next)))) | |
} else if (cursor.hasNext) { | |
val fut = //cursor.next.map(c => Some((c,None))) | |
Future(Some(DefaultCursor(cursor.i + 1) -> None)) | |
print(fut + ",") | |
fut | |
} else { | |
Future(None) | |
} | |
def enumerate = { | |
CustomEnumerator.unfoldM(this) { cursor => | |
n(cursor) | |
}.andThen(Enumerator.eof).onDoneEnumerating{ | |
println("done") | |
} &> Enumeratee.collect { | |
case Some(e) => e | |
} | |
} | |
object CustomEnumerator { | |
def unfoldM[S,E](s:S)(f: S => Future[Option[(S,E)]] ): Enumerator[E] = checkContinue1(s)(new TreatCont1[E,S]{ | |
def apply[A](loop: (Iteratee[E,A],S) => Future[Iteratee[E,A]], s:S, k: Input[E] => Iteratee[E,A]):Future[Iteratee[E,A]] = f(s).flatMap { | |
case Some((newS,e)) => { | |
// if we don't create this intermediate promise, then a stackoverflowerror is eventually thrown | |
// original code -> | |
// loop(k(Input.El(e)),newS) | |
// <- original code | |
val promise = Promise[play.api.libs.iteratee.Iteratee[E,A]]() | |
loop(k(Input.El(e)),newS).onComplete { | |
case Success(s) => | |
promise.success(s) | |
case Failure(f) => | |
promise.failure(f) | |
} | |
promise.future | |
} | |
case None => Future(Cont(k)) | |
} | |
}) | |
trait TreatCont1[E,S]{ | |
def apply[A](loop: (Iteratee[E,A],S) => Future[Iteratee[E,A]], s:S, k: Input[E] => Iteratee[E,A]):Future[Iteratee[E,A]] | |
} | |
def checkContinue1[E,S](s:S)(inner:TreatCont1[E,S]) = new Enumerator[E] { | |
def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = { | |
def step(it: Iteratee[E, A], state:S): Future[Iteratee[E,A]] = it.fold{ | |
case Step.Done(a, e) => Future(Done(a,e)) | |
case Step.Cont(k) => inner[A](step,state,k) | |
case Step.Error(msg, e) => Future(Error(msg,e)) | |
} | |
step(it,s) | |
} | |
} | |
def loop(cursor: Cursor) :Future[Option[(Option[String], Cursor)]] = { | |
if(cursor.iterator.hasNext) | |
Future(Some(Some(cursor.iterator.next) -> cursor)) | |
else if(cursor.hasNext) | |
cursor.next.map(c => Some(None -> c)) | |
else Future(None) | |
} | |
} | |
} | |
case class DefaultCursor(override val i: Int) extends Cursor { | |
val iterator = { | |
val r = (for(j <- 0 to 1) yield i + "" + j) | |
r.toIterator | |
} | |
def hasNext = i < 5000 | |
def next = { | |
Future(DefaultCursor(i + 1)) | |
} | |
} | |
case class FlattenedCursor(cursor: Future[Cursor]) extends Cursor { | |
val iterator = Iterator.empty | |
def hasNext = true | |
def next = cursor | |
} | |
// should print "done: <some result>" at the end | |
def test = { | |
val enumerator = FlattenedCursor(Future(DefaultCursor(0))).enumerate | |
val fut = enumerator.apply(Iteratee.foreach({ e => | |
//println(e) | |
})) | |
val ff = Iteratee.flatten(fut).run | |
ff.onComplete { | |
case e => | |
println("done: " + e) | |
} | |
} | |
} |
name := "TestIteratees" | |
version := "1.0" | |
scalaVersion := "2.10.0-RC1" | |
resolvers += "Typesafe repository snapshots" at "http://repo.typesafe.com/typesafe/snapshots/" | |
resolvers += "Typesafe repository releases" at "http://repo.typesafe.com/typesafe/releases/" | |
libraryDependencies ++= Seq( | |
"play" % "play-iteratees_2.10" % "2.1-RC1" | |
) |
Tried it, sadly, completeWith
also fails :/
does this count as a valid (but minimal) test?
Enumerator.unfoldM(0){ (i => Future(Option((i+1,i)).filterNot(_ => i > 5000 )))} |>>> Iteratee.getChunks
not minimal enough? here might be a smaller one:
def tata(f:Future[Int]):Future[Int] = f.flatMap(i => if (i < 0) tata(Future(i+1)) else Future(i))
// small enough number, -750 is enough in the console
tata(-1000)
java.lang.IllegalStateException: Promise already completed.
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:58)
at scala.concurrent.Promise$class.failure(Promise.scala:107)
at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:58)
at scala.concurrent.Future$$anonfun$flatMap$1.liftedTree3$1(Future.scala:283)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:277)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:274)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:29)
at scala.concurrent.forkjoin.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1417)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:915)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:980)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
You're right :)
You can most likely replace the following:
val promise = Promiseplay.api.libs.iteratee.Iteratee[E,A]
loop(k(Input.El(e)),newS).onComplete {
case Success(s) =>
promise.success(s)
case Failure(f) =>
promise.failure(f)
}
promise.future
with