|
import scala.testing.Benchmark |
|
import scala.actors._ |
|
import Actor._ |
|
import scala.collection.mutable.Map |
|
|
|
class SecondsHelper(sec: Int) { |
|
def seconds: Int = sec*1000 |
|
def second: Int = sec*1000 |
|
def minutes: Int = sec*1000*60 |
|
def minute: Int = sec*1000*60 |
|
def hours: Int = sec*1000*60*60 |
|
def hour: Int = sec*1000*60*60 |
|
} |
|
|
|
implicit def convertInt2Seconds(sec: Int) = new SecondsHelper(sec) |
|
|
|
class Connection |
|
object Connection { |
|
def apply() = new Connection |
|
} |
|
|
|
class Stats { |
|
def measure(name : String)(f: => Any): Any = { |
|
var answer: Any = None |
|
val bm = new Benchmark { |
|
def run = { |
|
answer = f |
|
None |
|
} |
|
} |
|
val results = bm.runBenchmark(1) |
|
val duration = results(0) / 1000.0 |
|
printf("Measured %s at %.2f seconds\n", name, duration) |
|
answer |
|
} |
|
} |
|
object Stats { |
|
def apply() = new Stats |
|
} |
|
|
|
class ConnectionPool(val size: Int) { |
|
def with_connection(f:(Connection) => Unit): Any = { |
|
f(new Connection) |
|
} |
|
} |
|
|
|
object ConnectionPool { |
|
def apply(size :Int) :ConnectionPool = new ConnectionPool(size) |
|
} |
|
|
|
|
|
trait Query { |
|
def select: Any |
|
def execute: Any |
|
} |
|
|
|
class BaseQuery (val connection :Connection , val query_string :String, args :Any*) extends Query { |
|
def select = { |
|
Thread.sleep(1000) |
|
println("Selecting "+query_string+" on "+connection) |
|
List(1, 2, 3) |
|
} |
|
|
|
def execute = { |
|
Thread.sleep(1000) |
|
println("Executing "+query_string+" on "+connection) |
|
1 |
|
} |
|
|
|
def reverse = this |
|
} |
|
object BaseQuery { |
|
def apply(connection :Connection , query_string :String, args :Any*): Query = |
|
new BaseQuery(connection, query_string, args) |
|
} |
|
|
|
trait QueryProxy extends Query { |
|
|
|
val query :Query |
|
|
|
def delegate(name :String)(f: => Any): Any |
|
|
|
def select = { |
|
delegate("select") { query.select } |
|
} |
|
|
|
def execute = { |
|
delegate("execute") { query.execute } |
|
} |
|
} |
|
|
|
class TimingOutQueryFactory(val query_factory: QueryFactory, val timeout: Int) extends QueryFactory { |
|
override def create(connection: Connection, query_string: String, args: Any*): QueryProxy = { |
|
new TimingOutQueryProxy(query_factory.create(connection, query_string, args), timeout) |
|
} |
|
} |
|
|
|
class TimingOutQueryProxy(override val query :Query, val timeout: Int) extends QueryProxy { |
|
override def delegate(name :String)(f: => Any): Any = { |
|
val caller = self |
|
actor { |
|
caller ! f |
|
} |
|
receiveWithin(timeout) { |
|
case TIMEOUT => println("timed out") |
|
case r: Any => { println("Did not timeout! Yay fast database!") ; r } |
|
} |
|
} |
|
} |
|
|
|
class QueryFactory { |
|
def create (connection: Connection, query_string: String, args: Any*): Query = { |
|
BaseQuery(connection, query_string, args) |
|
} |
|
} |
|
|
|
class MemoizingQueryFactory(val query_factory: QueryFactory) extends QueryFactory { |
|
override def create(connection: Connection, query_string: String, args: Any*): QueryProxy = { |
|
new MemoizingQueryProxy(query_factory.create(connection, query_string, args)) |
|
} |
|
} |
|
|
|
class MemoizingQueryProxy(override val query: Query) extends QueryProxy { |
|
val memo = Map[String, Any]() // remember results based on query string |
|
println("Instantiating Query Object") |
|
|
|
override def delegate(name :String)(f: => Any): Any = { |
|
// look up result in memo using query string return it if we find it |
|
val query_string = query.asInstanceOf[BaseQuery].query_string |
|
var answer = memo.getOrElse(query_string, None) |
|
if (answer == None) { |
|
answer = f |
|
// store result in memo if we had to calculate it |
|
memo += (query_string -> answer) |
|
} |
|
answer |
|
} |
|
} |
|
|
|
class StatsCollectingQueryFactory(val query_factory: QueryFactory, val stats: Stats) extends QueryFactory { |
|
override def create(connection: Connection, query_string: String, args: Any*): QueryProxy = { |
|
new StatsCollectingQueryProxy(query_factory.create(connection, query_string, args), stats) |
|
} |
|
} |
|
|
|
class StatsCollectingQueryProxy(override val query: Query, val stats: Stats) extends QueryProxy { |
|
override def delegate(name :String)(f: => Any): Any = { |
|
stats.measure(name) { f } |
|
} |
|
} |
|
|
|
class ReversingQueryFactory(val query_factory: QueryFactory) extends QueryFactory { |
|
override def create(connection: Connection, query_string: String, args: Any*): QueryProxy = { |
|
new ReversingQueryProxy(query_factory.create(connection, query_string, args)) |
|
} |
|
} |
|
|
|
class ReversingQueryProxy(override val query: Query) extends QueryProxy { |
|
override def delegate(name :String)(f: => Any): Any = { |
|
val answer = f |
|
answer match { |
|
case l: Seq[_] => l.reverse |
|
case _ => answer |
|
} |
|
} |
|
} |
|
|
|
class TransactionalQueryFactory(val query_factory: QueryFactory) extends QueryFactory { |
|
override def create(connection: Connection, query_string: String, args: Any*): QueryProxy = { |
|
new TransactionalQueryProxy(query_factory.create(connection, query_string, args)) |
|
} |
|
} |
|
|
|
class TransactionalQueryProxy(override val query: Query) extends QueryProxy { |
|
override def delegate(name :String)(f: => Any): Any = { |
|
// begin transaction |
|
val answer = f |
|
// end transaction |
|
answer |
|
} |
|
} |
|
|
|
class QueryEvaluator(val connection_pool: ConnectionPool, val query_factory: QueryFactory) { |
|
|
|
def select(query_string: String, args: Any*): Any = { |
|
connection_pool.with_connection { connection => |
|
query_factory.create(connection, query_string, args).select |
|
} |
|
} |
|
|
|
def execute(query_string: String, args: Any*): Any = { |
|
connection_pool.with_connection { connection => |
|
query_factory.create(connection, query_string, args).execute |
|
} |
|
} |
|
|
|
def transaction(f:(TransactionalQueryEvaluator) => Any): Any = { |
|
connection_pool.with_connection { connection => |
|
val tqe = new TransactionalQueryEvaluator(connection, query_factory) |
|
tqe.transaction { f(tqe) } |
|
} |
|
} |
|
|
|
} |
|
|
|
class TransactionalQueryEvaluator(val connection: Connection, val query_factory: QueryFactory) { |
|
def select(query_string: String, args: Any*): Any ={ |
|
query_factory.create(connection, query_string, args).select |
|
} |
|
|
|
def execute(query_string: String, args: Any*): Any = { |
|
query_factory.create(connection, query_string, args).execute |
|
} |
|
|
|
def transaction(f: => Any): Any = { |
|
// begin transaction |
|
val answer = f |
|
// end transaction |
|
answer |
|
} |
|
} |
|
|
|
|
|
|
|
val qf = new QueryFactory |
|
|
|
val config = List (("MemoizingQueryFactory", None), ("TimingOutQueryFactory", 2.seconds), ("StatsCollectingQueryFactory", Stats())) |
|
val query_factory = (qf /: config) { (factory, item) => |
|
var (decorator_name, settings) = item |
|
decorator_name match { |
|
case "MemoizingQueryFactory" => new MemoizingQueryFactory(factory) |
|
case "TimingOutQueryFactory" => new TimingOutQueryFactory(factory, settings.asInstanceOf[Int]) |
|
case "StatsCollectingQueryFactory" => new StatsCollectingQueryFactory(factory, settings.asInstanceOf[Stats]) |
|
} |
|
} |
|
List(query_factory, new ReversingQueryFactory(query_factory)).foreach { qf => |
|
val query_evaluator = new QueryEvaluator(ConnectionPool(20), qf) |
|
query_evaluator.transaction { t: TransactionalQueryEvaluator => |
|
t.select("SELECT ... FROM ... FOR UPDATE ...") |
|
t.execute("INSERT ...") |
|
t.execute("INSERT ...") |
|
} |
|
println |
|
} |