Skip to content

Commit e6f7580

Browse files
committed
[scheduler] Fix re-scheduling tasks bug during maintenance
Tasks were incorrectly rescheduled during the maintenance phase. The code was taking the data from the first task's job and not from the last one. This caused the task to run from the very beginning, again. Signed-off-by: Santiago Dueñas <[email protected]>
1 parent 58f3692 commit e6f7580

File tree

2 files changed

+114
-66
lines changed

2 files changed

+114
-66
lines changed

src/grimoirelab/core/scheduler/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def maintain_tasks() -> None:
122122
)
123123

124124
for task in tasks:
125-
job_db = task.jobs.order_by('scheduled_at').first()
125+
job_db = task.jobs.order_by('-scheduled_at').first()
126126

127127
try:
128128
rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection(task.default_job_queue))

tests/scheduler/test_scheduler.py

Lines changed: 113 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,65 @@ def on_failure(job, connection, t, value, traceback):
8080
return on_failure(*args, **kwargs)
8181

8282

83+
class OnSuccessCallbackTestTask(Task):
84+
"""Class for testing on success callback calls"""
85+
86+
TASK_TYPE = 'callback_test_task'
87+
88+
def prepare_job_parameters(self):
89+
return self.task_args
90+
91+
def can_be_retried(self):
92+
return True
93+
94+
@property
95+
def default_job_queue(self):
96+
return 'testing'
97+
98+
@staticmethod
99+
def job_function(*args, **kwargs):
100+
def add_numbers(a, b):
101+
return a + b
102+
return add_numbers(*args, **kwargs)
103+
104+
@staticmethod
105+
def on_success_callback(*args, **kwargs):
106+
return _on_success_callback(*args, **kwargs)
107+
108+
@staticmethod
109+
def on_failure_callback(*args, **kwargs):
110+
return _on_failure_callback(*args, **kwargs)
111+
112+
113+
class OnFailureCallbackTestTask(Task):
114+
"""Class for testing on failure callback calls"""
115+
116+
TASK_TYPE = 'failure_test_task'
117+
118+
def prepare_job_parameters(self):
119+
return self.task_args
120+
121+
def can_be_retried(self):
122+
return True
123+
124+
@property
125+
def default_job_queue(self):
126+
return 'testing'
127+
128+
@staticmethod
129+
def job_function(*args, **kwargs):
130+
raise Exception("Error")
131+
132+
@staticmethod
133+
def on_success_callback(*args, **kwargs):
134+
return _on_success_callback(*args, **kwargs)
135+
136+
@staticmethod
137+
def on_failure_callback(job, connection, t, value, traceback, *args, **kwargs):
138+
job.progress = str(t)
139+
return _on_failure_callback(job, connection, t, value, traceback, *args, **kwargs)
140+
141+
83142
class TestScheduleTask(GrimoireLabTestCase):
84143
"""Unit tests for scheduling tasks"""
85144

@@ -222,16 +281,25 @@ class TestMaintainTasks(GrimoireLabTestCase):
222281

223282
def setUp(self):
224283
GRIMOIRELAB_TASK_MODELS.clear()
225-
task_class, job_class = register_task_model('test_task', SchedulerTestTask)
284+
task_class_sched, job_class_sched = register_task_model(
285+
'test_task', SchedulerTestTask
286+
)
287+
task_class_callback, job_class_callback = register_task_model(
288+
'callback_test_task', OnSuccessCallbackTestTask
289+
)
226290

227291
def cleanup_test_model():
228292
with django.db.connection.schema_editor() as schema_editor:
229-
schema_editor.delete_model(job_class)
230-
schema_editor.delete_model(task_class)
293+
schema_editor.delete_model(job_class_sched)
294+
schema_editor.delete_model(task_class_sched)
295+
schema_editor.delete_model(job_class_callback)
296+
schema_editor.delete_model(task_class_callback)
231297

232298
with django.db.connection.schema_editor() as schema_editor:
233-
schema_editor.create_model(task_class)
234-
schema_editor.create_model(job_class)
299+
schema_editor.create_model(task_class_sched)
300+
schema_editor.create_model(job_class_sched)
301+
schema_editor.create_model(task_class_callback)
302+
schema_editor.create_model(job_class_callback)
235303

236304
self.addCleanup(cleanup_test_model)
237305
super().setUp()
@@ -259,7 +327,7 @@ def test_maintain_tasks_reschedule(self):
259327

260328
# Check if jobs were re-scheduled
261329

262-
# Task1 was't re-scheduled
330+
# Task1 wasn't re-scheduled
263331
job_db = task1.jobs.first()
264332
self.assertLessEqual(job_db.last_modified, before_dt)
265333
self.assertLessEqual(job_db.last_modified, after_dt)
@@ -272,6 +340,45 @@ def test_maintain_tasks_reschedule(self):
272340
job_rq = rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection())
273341
self.assertEqual(job_rq.id, job_db.uuid)
274342

343+
def test_maintain_tasks_reschedule_multiple_jobs(self):
344+
"""Tasks with multiple finished jobs are re-scheduled"""
345+
346+
task_args = {
347+
'a': 1,
348+
'b': 2,
349+
}
350+
351+
task = schedule_task('callback_test_task', task_args, job_interval=0)
352+
worker = django_rq.workers.get_worker(task.default_job_queue)
353+
worker.work(burst=True, with_scheduler=True)
354+
worker.work(burst=True, with_scheduler=True)
355+
worker.work(burst=True, with_scheduler=True)
356+
357+
# Three jobs were processed and one is still pending
358+
self.assertEqual(task.jobs.count(), 4)
359+
360+
# Delete the last jobs manually to create the inconsistent state
361+
job_db = task.jobs.last()
362+
job_rq = rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection())
363+
job_rq.delete()
364+
365+
# Run the maintenance tasks
366+
before_dt = grimoirelab_toolkit.datetime.datetime_utcnow()
367+
maintain_tasks()
368+
after_dt = grimoirelab_toolkit.datetime.datetime_utcnow()
369+
370+
# Task was re-scheduler
371+
job_db = task.jobs.last()
372+
self.assertGreaterEqual(job_db.last_modified, before_dt)
373+
self.assertLessEqual(job_db.last_modified, after_dt)
374+
375+
# New job was created
376+
job_rq = rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection())
377+
self.assertEqual(job_rq.id, job_db.uuid)
378+
379+
worker.work(burst=True, with_scheduler=True)
380+
self.assertEqual(task.jobs.count(), 5)
381+
275382
def test_maintain_tasks_reschedule_expired_scheduled_at(self):
276383
"""Tasks with inconsistent state with expired scheduled time are re-scheduled with current time"""
277384

@@ -329,36 +436,6 @@ def test_maintain_tasks_reschedule_non_expired_scheduled_at(self):
329436
self.assertEqual(job_rq.id, job_db.uuid)
330437

331438

332-
class OnSuccessCallbackTestTask(Task):
333-
"""Class for testing on success callback calls"""
334-
335-
TASK_TYPE = 'callback_test_task'
336-
337-
def prepare_job_parameters(self):
338-
return self.task_args
339-
340-
def can_be_retried(self):
341-
return True
342-
343-
@property
344-
def default_job_queue(self):
345-
return 'testing'
346-
347-
@staticmethod
348-
def job_function(*args, **kwargs):
349-
def add_numbers(a, b):
350-
return a + b
351-
return add_numbers(*args, **kwargs)
352-
353-
@staticmethod
354-
def on_success_callback(*args, **kwargs):
355-
return _on_success_callback(*args, **kwargs)
356-
357-
@staticmethod
358-
def on_failure_callback(*args, **kwargs):
359-
return _on_failure_callback(*args, **kwargs)
360-
361-
362439
class TestCancelTask(GrimoireLabTestCase):
363440
"""Unit tests for canceling tasks"""
364441

@@ -546,35 +623,6 @@ def test_interval_between_jobs(self, mock_utcnow):
546623
self.assertEqual(self.job_class.objects.count(), 2)
547624

548625

549-
class OnFailureCallbackTestTask(Task):
550-
"""Class for testing on failure callback calls"""
551-
552-
TASK_TYPE = 'failure_test_task'
553-
554-
def prepare_job_parameters(self):
555-
return self.task_args
556-
557-
def can_be_retried(self):
558-
return True
559-
560-
@property
561-
def default_job_queue(self):
562-
return 'testing'
563-
564-
@staticmethod
565-
def job_function(*args, **kwargs):
566-
raise Exception("Error")
567-
568-
@staticmethod
569-
def on_success_callback(*args, **kwargs):
570-
return _on_success_callback(*args, **kwargs)
571-
572-
@staticmethod
573-
def on_failure_callback(job, connection, t, value, traceback, *args, **kwargs):
574-
job.progress = str(t)
575-
return _on_failure_callback(job, connection, t, value, traceback, *args, **kwargs)
576-
577-
578626
class OnFailureNoRetryTestTask(Task):
579627
"""Class for testing on failure callback calls with no retry"""
580628

0 commit comments

Comments
 (0)