get rid of celery/celerybeat

alternative to https://github.com/ansible/awx/pull/2530 which makes use
of https://pypi.org/project/schedule/

this doesn't have support for any persistence (like how celery beat uses
a shelve file), because all of our periodic jobs run at most every few
minutes
This commit is contained in:
Ryan Petrello
2020-02-06 07:08:27 -05:00
parent 7b4adfcc15
commit 38a08d163c
10 changed files with 87 additions and 167 deletions
+52
View File
@@ -0,0 +1,52 @@
import logging
import threading
import time
from django.conf import settings
from django.db import connections
from schedule import Scheduler
from awx.main.dispatch.worker import TaskWorker
logger = logging.getLogger('awx.main.dispatch.periodic')
class Scheduler(Scheduler):
def run_continuously(self):
cease_continuous_run = threading.Event()
idle_seconds = max(
1,
min(self.jobs).period.total_seconds() / 2
)
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
try:
for conn in connections.all():
# If the database connection has a hiccup, re-establish a new
# connection
conn.close_if_unusable_or_obsolete()
self.run_pending()
except Exception:
logger.exception(
'encountered an error while scheduling periodic tasks'
)
time.sleep(idle_seconds)
logger.debug('periodic thread exiting...')
thread = ScheduleThread()
thread.daemon = True
thread.start()
return cease_continuous_run
def run_continuously():
scheduler = Scheduler()
for task in settings.CELERYBEAT_SCHEDULE.values():
apply_async = TaskWorker.resolve_callable(task['task']).apply_async
total_seconds = task['schedule'].total_seconds()
scheduler.every(total_seconds).seconds.do(apply_async)
return scheduler.run_continuously()
+6 -68
View File
@@ -16,6 +16,7 @@ from awx.main.dispatch.control import Control
from awx.main.dispatch.kombu import Connection
from awx.main.dispatch.pool import AutoscalePool
from awx.main.dispatch.worker import AWXConsumer, TaskWorker
from awx.main.dispatch import periodic
logger = logging.getLogger('awx.main.dispatch')
@@ -36,71 +37,6 @@ class Command(BaseCommand):
help=('cause the dispatcher to recycle all of its worker processes;'
'running jobs will run to completion first'))
def beat(self):
from celery import Celery
from celery.beat import PersistentScheduler
from celery.apps import beat
class AWXScheduler(PersistentScheduler):
def __init__(self, *args, **kwargs):
self.ppid = os.getppid()
super(AWXScheduler, self).__init__(*args, **kwargs)
def setup_schedule(self):
super(AWXScheduler, self).setup_schedule()
self.update_from_dict(settings.CELERYBEAT_SCHEDULE)
def tick(self, *args, **kwargs):
if os.getppid() != self.ppid:
# if the parent PID changes, this process has been orphaned
# via e.g., segfault or sigkill, we should exit too
raise SystemExit()
return super(AWXScheduler, self).tick(*args, **kwargs)
def apply_async(self, entry, producer=None, advance=True, **kwargs):
for conn in connections.all():
# If the database connection has a hiccup, re-establish a new
# connection
conn.close_if_unusable_or_obsolete()
task = TaskWorker.resolve_callable(entry.task)
result, queue = task.apply_async()
class TaskResult(object):
id = result['uuid']
return TaskResult()
sched_file = '/var/lib/awx/beat.db'
app = Celery()
app.conf.BROKER_URL = settings.BROKER_URL
app.conf.CELERY_TASK_RESULT_EXPIRES = False
# celery in py3 seems to have a bug where the celerybeat schedule
# shelve can become corrupted; we've _only_ seen this in Ubuntu and py36
# it can be avoided by detecting and removing the corrupted file
# at some point, we'll just stop using celerybeat, because it's clearly
# buggy, too -_-
#
# https://github.com/celery/celery/issues/4777
sched = AWXScheduler(schedule_filename=sched_file, app=app)
try:
sched.setup_schedule()
except Exception:
logger.exception('{} is corrupted, removing.'.format(sched_file))
sched._remove_db()
finally:
try:
sched.close()
except Exception:
logger.exception('{} failed to sync/close'.format(sched_file))
beat.Beat(
30,
app,
schedule=sched_file, scheduler_cls=AWXScheduler
).run()
def handle(self, *arg, **options):
if options.get('status'):
print(Control('dispatcher').status())
@@ -116,9 +52,10 @@ class Command(BaseCommand):
# for the DB and memcached connections (that way lies race conditions)
django_connection.close()
django_cache.close()
beat = Process(target=self.beat)
beat.daemon = True
beat.start()
# spawn a daemon thread to periodically enqueues scheduled tasks
# (like the node heartbeat)
cease_continuous_run = periodic.run_continuously()
reaper.reap()
consumer = None
@@ -152,6 +89,7 @@ class Command(BaseCommand):
)
consumer.run()
except KeyboardInterrupt:
cease_continuous_run.set()
logger.debug('Terminating Task Dispatcher')
if consumer:
consumer.stop()
+2
View File
@@ -1014,6 +1014,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
dir=settings.JOBOUTPUT_ROOT,
encoding='utf-8'
)
from awx.main.tasks import purge_old_stdout_files # circular import
purge_old_stdout_files.apply_async()
# Before the addition of event-based stdout, older versions of
# awx stored stdout as raw text blobs in a certain database column
-5
View File
@@ -434,10 +434,6 @@ CELERYBEAT_SCHEDULE = {
'schedule': timedelta(seconds=60),
'options': {'expires': 50,}
},
'purge_stdout_files': {
'task': 'awx.main.tasks.purge_old_stdout_files',
'schedule': timedelta(days=7)
},
'gather_analytics': {
'task': 'awx.main.tasks.gather_analytics',
'schedule': timedelta(minutes=5)
@@ -454,7 +450,6 @@ CELERYBEAT_SCHEDULE = {
},
# 'isolated_heartbeat': set up at the end of production.py and development.py
}
AWX_INCONSISTENT_TASK_INTERVAL = 60 * 3
AWX_CELERY_QUEUES_STATIC = [
CELERY_DEFAULT_QUEUE,