from celery/celery#4079 (comment)
from celery import Celery, bootsteps
from celery.signals import after_setup_logger, setup_logging, worker_ready, worker_shutdown
HEARTBEAT_FILE = Path("/tmp/worker_heartbeat")
READINESS_FILE = Path("/tmp/worker_ready")
...
class LivenessProbe(bootsteps.StartStopStep):
requires = {'celery.worker.components:Timer'}
def __init__(self, worker, **kwargs):
self.requests = []
self.tref = None
def start(self, worker):
self.tref = worker.timer.call_repeatedly(
1.0, self.update_heartbeat_file, (worker,), priority=10,
)
def stop(self, worker):
HEARTBEAT_FILE.unlink(missing_ok=True)
def update_heartbeat_file(self, worker):
HEARTBEAT_FILE.touch()
@worker_ready.connect
def worker_ready(**_):
READINESS_FILE.touch()
@worker_shutdown.connect
def worker_shutdown(**_):
READINESS_FILE.unlink(missing_ok=True)
...
app = Celery('project') # you will have this line already
...
app.steps["worker"].add(LivenessProbe)
and in the worker pod yaml:
readinessProbe:
exec:
command:
- test
- -f
- /tmp/worker_ready
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 1
failureThreshold: 3
livenessProbe:
exec:
command:
- /bin/sh
- -c
- find /tmp -name worker_heartbeat -mmin -1
initialDelaySeconds: 120
periodSeconds: 60
failureThreshold: 5
resources:
limits:
cpu: 2000m
memory: 4Gi
requests:
cpu: 50m
memory: 500Mi