Created
April 28, 2023 12:49
-
-
Save antdking/0eacd8d9df8bda120dcaceea6eaab1f7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from contextlib import contextmanager | |
import inspect | |
from tempfile import NamedTemporaryFile | |
import typing as t | |
import typing_extensions as te | |
import os | |
import pystemd | |
import pystemd.run | |
import systemd.journal | |
import itertools | |
# They tried to make executable modules work, | |
# but it actually breaks standard usage :laughing: | |
if t.TYPE_CHECKING: | |
systemd_run = pystemd.run.run | |
else: | |
systemd_run = pystemd.run | |
import logging | |
logging.basicConfig() | |
log = logging.getLogger("systemd-launcher") | |
log.setLevel(logging.DEBUG) | |
Unit: te.TypeAlias = pystemd.systemd1.Unit | |
USER_MODE = os.geteuid() != 0 | |
def start(cmd: t.List[str]) -> Unit: | |
# start in exec mode, so that we return after the actual execution starts | |
return systemd_run(cmd, user_mode=USER_MODE, service_type="exec", env={"PYTHONUNBUFFERED": "1"}) | |
@contextmanager | |
def log_stream(unit: Unit) -> t.Generator[t.Iterable[str], None, None]: | |
# TODO: work out how to get this from the Unit object | |
unit_filter_key = "_SYSTEMD_USER_UNIT" if USER_MODE else "_SYSTEMD_UNIT" | |
log.debug("Starting log stream: %s", unit.external_id.decode()) | |
journal_reader = systemd.journal.Reader() | |
# see: https://man7.org/linux/man-pages/man7/systemd.journal-fields.7.html | |
journal_reader.add_match(**{unit_filter_key: unit.external_id.decode()}) | |
journal_reader.seek_realtime(unit.Service.ExecMainStartTimestamp) | |
log.debug("Set clock + unit filter") | |
# Journal.Reader is a repeatable read iterator: It'll raise StopIteration whenever | |
# the current batch is consumed, and then iterate it again for the next batch. | |
# | |
# | |
def inner(): | |
# Keep reading while journal is reporting anything other than No-ops | |
# Note: just because we receive an Append, doesn't mean there's data to read | |
while (status := journal_reader.wait(0.1)) != systemd.journal.NOP: | |
log.debug("Got status: %i", status) | |
for entry in journal_reader: | |
yield entry["MESSAGE"] | |
log.debug("Finished batch: %i", status) | |
def outer() -> t.Generator[str, None, None]: | |
log.debug("Starting main loop, current PID: %i", unit.Service.MainPID) | |
# MainPID gets set to 0 when the service finishes | |
while unit.Service.MainPID != 0: | |
log.debug("Sending batch") | |
yield from inner() | |
log.debug("Service finished, sending final batch") | |
yield from inner() | |
yield outer() | |
def main(): | |
f = None | |
try: | |
# quick script to test a long running process with stdout output | |
with NamedTemporaryFile("w", delete=False) as f: | |
f.write(inspect.cleandoc(""" | |
#!/usr/bin/env python | |
import time, sys | |
try: | |
count_to = int(sys.argv[1]) | |
except IndexError: | |
count_to = 100 | |
for i in range(count_to): | |
print(f" Hello: {i}") | |
time.sleep(1) | |
""")) | |
f.flush() | |
os.chmod(f.name, 0o755) | |
unit = start([f.name, "20"]) | |
with log_stream(unit) as stream: | |
for line in stream: | |
log.info(line) | |
finally: | |
if f is not None: | |
os.unlink(f.name) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment