Skip to content

Instantly share code, notes, and snippets.

@kirked
Last active August 15, 2024 16:36
Show Gist options
  • Save kirked/32ed1773363dc9b9fb507bc8a07a4db4 to your computer and use it in GitHub Desktop.
Save kirked/32ed1773363dc9b9fb507bc8a07a4db4 to your computer and use it in GitHub Desktop.
Clojure atoms, in Scala
/*------------------------------------------------------------------------------
* MIT License
*
* Copyright (c) 2017 Doug Kirk
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*----------------------------------------------------------------------------*/
sealed trait Atom[A] {
def apply(): A
def reset(a: A): A
def swap(f: A => A): A
def addWatch(watcher: Atom.Watcher[A]): Unit
def removeWatch(watcher: Atom.Watcher[A]): Unit
}
object Atom {
import java.util.concurrent.atomic._
import scala.language.implicitConversions
def apply[A <: AnyRef](initial: A): Atom[A] = new AtomImpl[A](new AtomicReference[A](initial))
def apply(initial: Boolean): Atom[Boolean] = new BooleanAtom(new AtomicBoolean(initial))
def apply(initial: Int): Atom[Int] = new IntAtom(new AtomicInteger(initial))
def apply(initial: Long): Atom[Long] = new LongAtom(new AtomicLong(initial))
implicit def atom2value[A](atom: Atom[A]): A = atom.apply
object ConstantOps {
def constantly[A](value: A)(_: A) = value
def identity[A](value: A) = value
}
object BooleanOps {
def not(b: Boolean) = !b
def and(left: Boolean)(right: Boolean) = left && right
def or(left: Boolean)(right: Boolean) = left || right
def nand(left: Boolean)(right: Boolean) = !(left && right)
def nor(left: Boolean)(right: Boolean) = !(left || right)
}
object IntOps {
def inc(i: Int) = i + 1
def dec(i: Int) = i - 1
def inc(l: Long) = l + 1
def dec(l: Long) = l - 1
}
object SeqOps {
def concat[A](tail: Seq[A])(as: Seq[A]) = as ++ tail
def conj[A](a: A)(as: Seq[A]) = as :+ a
def disj[A](a: A)(as: Seq[A]) = {
val (left, right) = as.span(_ != a)
left ++ right.drop(1)
}
def select[A](f: A => Boolean)(as: Seq[A]) = as filter f
def drop[A](n: Int)(as: Seq[A]) = as drop n
def take[A](n: Int)(as: Seq[A]) = as take n
}
object SetOps {
def conj[A](a: A)(as: Set[A]) = as + a
def disj[A](a: A)(as: Set[A]) = as - a
def union[A](others: Set[A])(as: Set[A]) = as union others
def difference[A](others: Set[A])(as: Set[A]) = as diff others
def intersection[A](others: Set[A])(as: Set[A]) = as intersect others
def select[A](f: A => Boolean)(as: Set[A]) = as filter f
}
object MapOps {
def assoc[K, V](k: K, v: V)(m: Map[K, V]) = m + (k -> v)
def dissoc[K, V](k: K)(m: Map[K, V]) = m - k
def select[K, V](f: ((K, V)) => Boolean)(m: Map[K, V]) = m filter f
def selectKeys[K, V](f: K => Boolean)(m: Map[K, V]) = m filter { case (k, _) => f(k) }
}
type Watcher[A] = (Atom[A], /* old */A, /* new */A) => Unit
//------------------------------------------
// private implementation
private class AtomImpl[A](ref: AtomicReference[A]) extends Atom[A] with WatchSupport[A] {
def apply() = ref.get
def reset(a: A): A = { ref.set(a); a }
def swap(f: A => A): A = {
while (true) {
val oldvalue = ref.get
val newvalue = f(oldvalue)
if (ref.compareAndSet(oldvalue, newvalue)) {
notifyWatchers(oldvalue, newvalue)
return newvalue
}
}
// never reached...but we must satisfy the type checker
throw new IllegalStateException("exiting swap without matching old value")
}
override def toString() = s"Atom<$ref>"
}
private class BooleanAtom(ref: AtomicBoolean) extends Atom[Boolean] with WatchSupport[Boolean] {
def apply(): Boolean = ref.get
def reset(b: Boolean): Boolean = { ref.set(b); b }
def swap(f: Boolean => Boolean): Boolean = {
while (true) {
val oldvalue = ref.get
val newvalue = f(oldvalue)
if (ref.compareAndSet(oldvalue, newvalue)) {
notifyWatchers(oldvalue, newvalue)
return newvalue
}
}
// never reached...but we must satisfy the type checker
throw new IllegalStateException("exiting swap without matching old value")
}
override def toString() = s"BooleanAtom<$ref>"
}
private class IntAtom(ref: AtomicInteger) extends Atom[Int] with WatchSupport[Int] {
def apply(): Int = ref.get
def reset(i: Int): Int = { ref.set(i); i }
def swap(f: Int => Int): Int = {
while (true) {
val oldvalue = ref.get
val newvalue = f(oldvalue)
if (ref.compareAndSet(oldvalue, newvalue)) {
notifyWatchers(oldvalue, newvalue)
return newvalue
}
}
// never reached...but we must satisfy the type checker
throw new IllegalStateException("exiting swap without matching old value")
}
override def toString() = s"IntAtom<$ref>"
}
private class LongAtom(ref: AtomicLong) extends Atom[Long] with WatchSupport[Long] {
def apply(): Long = ref.get
def reset(l: Long): Long = { ref.set(l); l }
def swap(f: Long => Long): Long = {
while (true) {
val oldvalue = ref.get
val newvalue = f(oldvalue)
if (ref.compareAndSet(oldvalue, newvalue)) {
notifyWatchers(oldvalue, newvalue)
return newvalue
}
}
// never reached...but we must satisfy the type checker
throw new IllegalStateException("exiting swap without matching old value")
}
override def toString() = s"LongAtom<$ref>"
}
sealed trait WatchSupport[A] { self: Atom[A] =>
private[this] val watchers = new AtomicReference[Set[Watcher[A]]](Set.empty[Watcher[A]])
final def addWatch(watcher: Watcher[A]): Unit = swapWatchers(_ + watcher)
final def removeWatch(watcher: Watcher[A]): Unit = swapWatchers(_ - watcher)
protected def notifyWatchers(oldvalue: A, newvalue: A): Unit =
watchers.get.foreach { _(self, oldvalue, newvalue) }
private[this] def swapWatchers(f: Set[Watcher[A]] => Set[Watcher[A]]): Unit = {
while (true) {
val oldvalue = watchers.get
val newvalue = f(oldvalue)
if (watchers.compareAndSet(oldvalue, newvalue)) {
return ()
}
}
}
}
}
@kirked
Copy link
Author

kirked commented May 1, 2017

Supports Int, Long, Boolean, and AnyRef values (matching the atomic types in java.util.concurrent.atomic), including watch support.

Sample use:

import Atom.IntOps._

val counter = Atom(0)
counter addWatch { case (_, oldval, newval) => println(s"counter has changed from $oldval to $newval") }

counter swap inc  // -> counter has changed from 0 to 1
counter swap inc  // -> counter has changed from 1 to 2
counter reset 0   // -> counter has changed from 2 to 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment