Skip to content

Instantly share code, notes, and snippets.

@otobrglez
Created July 11, 2024 14:36
Show Gist options
  • Save otobrglez/1407b2fc62cf439d46cfbeef2a56cf80 to your computer and use it in GitHub Desktop.
Save otobrglez/1407b2fc62cf439d46cfbeef2a56cf80 to your computer and use it in GitHub Desktop.
Quartz Scheduler with Scala and Cats Effect / FS2
package si.ogrodje.oge.scheduler
import cats.effect.unsafe.implicits.global
import cats.effect.{IO, Resource}
import fs2.Stream
import fs2.concurrent.Topic
import org.quartz.*
import org.quartz.JobBuilder.*
import org.quartz.TriggerBuilder.*
import org.quartz.impl.StdSchedulerFactory
import org.typelevel.log4cats.LoggerFactory
import org.typelevel.log4cats.slf4j.Slf4jFactory
import si.ogrodje.oge.scheduler.QScheduler.EventKind
import java.time.Instant
import java.util
import java.util.{Random, TimeZone}
import scala.jdk.CollectionConverters.*
import scala.reflect.ClassTag
final class QScheduler private (
events: Topic[IO, EventKind],
scheduler: Scheduler
):
private val CET = TimeZone.getTimeZone("CET").toZoneId
private given lf: LoggerFactory[IO] = Slf4jFactory.create[IO]
private val logger = lf.getLogger
private def start: IO[Unit] = IO(scheduler.start()) <* logger.info("Scheduler started")
private def close: IO[Unit] = IO(scheduler.shutdown())
final private class TriggerEvent extends org.quartz.Job:
override def execute(context: JobExecutionContext): Unit =
val name = context.getMergedJobDataMap.getString("name")
context.getMergedJobDataMap.get("callback").asInstanceOf[EventKind => Unit](name)
private def trigger[T <: Trigger](
name: String,
schedulerBuilder: ScheduleBuilder[T]
)(using ClassTag[T]): IO[Unit] =
val classOfT: Class[? <: org.quartz.Job] = new TriggerEvent().getClass
val jMap: java.util.Map[String, EventKind => Unit] = Map("callback" -> { (event: String) =>
(logger.debug(s"Pushing event $event to the main topic") *> events.publish1(event).void).unsafeRunSync()
}).asJava
IO(
scheduler.scheduleJob(
newJob(classOfT).withIdentity(name).usingJobData("name", name).usingJobData(new JobDataMap(jMap)).build(),
newTrigger().withSchedule(schedulerBuilder).build()
)
).flatTap(date => logger.info(s"First run of $name is scheduled at ${date.toInstant.atZone(CET)}")).void
private def listenTo(eventKind: EventKind): Resource[IO, Stream[IO, EventKind]] =
events.subscribeAwaitUnbounded.map(_.filter(_ == eventKind))
def at[T <: Trigger, A](
schedule: ScheduleBuilder[T],
name: String = s"at-${Instant.now().toEpochMilli}-${new Random().nextInt(10_000)}"
)(
io: => IO[A]
)(using ClassTag[T]): Resource[IO, Unit] =
listenTo(name)
.evalTap(_ => trigger(name, schedule))
.evalMap(_.evalMap(_ => io).compile.drain)
object QScheduler:
type EventKind = String
def resource: Resource[IO, QScheduler] = for
events <- Resource.eval(Topic[IO, EventKind]())
scheduler <- Resource
.make(IO.pure(new QScheduler(events, StdSchedulerFactory.getDefaultScheduler)))(_.close)
.evalTap(_.start)
yield scheduler
package si.ogrodje.oge.apps
import cats.effect.kernel.Ref
import cats.effect.{IO, Resource, ResourceApp}
import cats.syntax.parallel.*
import org.quartz.CronScheduleBuilder.cronSchedule
import org.quartz.SimpleScheduleBuilder.simpleSchedule as sch
import si.ogrodje.oge.scheduler.QScheduler
import java.util.TimeZone
object SchedulerDemoApp extends ResourceApp.Forever:
private val CET: TimeZone = TimeZone.getTimeZone("CET")
def run(args: List[String]): Resource[IO, Unit] = QScheduler.resource.flatMap: scheduler =>
for
cnt <- Resource.eval(Ref.of[IO, Int](0))
_ <- (
// Repetition with intervals
scheduler.at(sch.withRepeatCount(5).withIntervalInSeconds(5))(IO.println("every 5 seconds (5 times)")),
scheduler.at(sch.withRepeatCount(15).withIntervalInSeconds(2))(IO.println("every 2 seconds (15 times)")),
scheduler.at(sch.withRepeatCount(10).withIntervalInSeconds(7).repeatForever())(
cnt.getAndUpdate(_+1).flatTap(n => IO.println(s"Number N is $n"))
),
// Cron syntax
scheduler.at(cronSchedule("0 54 15 ? * *").inTimeZone(CET))(IO.println("At a very specific time")),
scheduler.at(cronSchedule("0 */5 * * * ?").inTimeZone(CET), name="every-5")(IO.println("Every 5 minutes"))
).parTupled
yield ()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment