Skip to content

Instantly share code, notes, and snippets.

@TonioGela
Created November 13, 2023 08:40
Show Gist options
  • Save TonioGela/32064e0d76069b66bf7e9ee60a72c765 to your computer and use it in GitHub Desktop.
Save TonioGela/32064e0d76069b66bf7e9ee60a72c765 to your computer and use it in GitHub Desktop.
Fancy JLine inspired way of handling single stdin keypresses on tty based terminals
//> using dep co.fs2::fs2-io::3.9.3
import fs2.*
import fs2.io.process.*
import cats.effect.*
import cats.syntax.all.*
import sun.misc.*
import cats.effect.std.Dispatcher
object Foo extends IOApp.Simple:
val run: IO[Unit] =
Dispatcher
.sequential[IO]
.use(dispatcher =>
runWithOutput("/bin/stty", "-f", "/dev/tty", "-g").flatMap(prevSttyMode =>
runCommand("/bin/stty", "-f", "/dev/tty", "raw", "-echo") >> // sets the console to raw mode
IO.print("\u001b[?;47;h") >> // alternate buffer mode
IO.print("\u001b[?25l") >> // hides cursor
IO.print("\u001b[2J") >> // clears whole screen
registerHandler(dispatcher) >>
fs2.io
.stdin[IO](1024)
.through(handleKeyPresses)
.onFinalize(
IO.print("\u001B[?;47;l") >> // restores normal buffer mode
IO.print("\u001B[?;25;h") >> // shows cursor
IO.print("\u001b[2J") >> // clears whole screen
runCommand("/bin/stty", "-f", "/dev/tty", prevSttyMode) // sets the console to the previous stty mode
)
.compile
.drain
)
)
def registerHandler(dispatcher: Dispatcher[IO]): IO[Unit] = IO.delay(
Signal.handle(
new Signal("WINCH"), // This signal is sent to the process each time the window is resized
new SignalHandler() {
def handle(signal: Signal): Unit = dispatcher.unsafeRunAndForget(
IO.println(s"\u001B[GYou resized the window")
)
}
)
)
def runCommand(command: String, args: String*): IO[Unit] = runWithOutput(command, args*).void
def runWithOutput(command: String, args: String*): IO[String] =
ProcessBuilder(command, args.toList)
.spawn[IO]
.use: p =>
(p.exitValue, p.stdout.through(fs2.text.utf8.decode).compile.foldMonoid).flatMapN:
case (0, stdout) => stdout.pure[IO]
case (code, _) => IO.raiseError(new Exception(s"$command ${args.mkString(" ")} terminated with error $code"))
def handleKeyPresses(stream: Stream[IO, Byte]): Stream[IO, Unit] =
def loop(s: Stream[IO, Byte]): Pull[IO, Nothing, Unit] = s.pull.uncons1.flatMap:
case None => Pull.done
case Some((3, _)) => Pull.done // 3 is CTRL+C, you have to EXPLICTLY manage it
case Some((b, rest)) =>
Pull.eval(IO.println(s"\u001B[GI received byte: %04d".format(b))) >> loop(
rest
) // ESC[G is "Beginning of the line"
loop(stream).stream
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment