Source code for eztaskmanager.services.queues

"""Task queue services.

These are the class that implement the interface to the queue managers.

The abstract TaskQueueService class is the interface each class has to implement.

- RQTaskQueueService implements the service with Redis Queue.
- CeleryTaskQueueService implements the service with Celery (TBD)
"""
import datetime
import logging
from abc import ABC, abstractmethod

from django.utils import timezone
from django.utils.translation import gettext_lazy as _

from eztaskmanager.settings import EZTASKMANAGER_QUEUE_SERVICE_TYPE
from ..models import Task

logger = logging.getLogger(__name__)


[docs] class TaskQueueService(ABC): """Abstract base class for managing task queues."""
[docs] @abstractmethod def add(self, task): # pragma: no cover """To be implemented in concrete subclasses.""" pass
[docs] @abstractmethod def remove(self, task): # pragma: no cover """To be implemented in concrete subclasses.""" pass
[docs] class TaskQueueException(Exception): """Dedicated exception for TaskQueue classes.""" pass
# conditional import try: import django_rq
[docs] class RQTaskQueueService(TaskQueueService): """ A subclass of TaskQueueService that manages tasks using RQ (Redis Queue). Attributes: queue (Queue): The default RQ queue. Methods: - add(task, at=None): Enqueues a task to be executed either immediately or at a specific time. - remove(task): Cancels a task if it is currently in the queue. """ def __init__(self): self.queue = django_rq.get_queue('default') self.scheduler = django_rq.get_scheduler('default', interval=60)
[docs] def add(self, task: Task): """ Add the task to the Redis queue. If the task already has a scheduled_job_id, the existing job will be cancelled before creating a new one to prevent duplicate scheduling. This ensures idempotency when re-scheduling tasks after restarts. Args: task: The task to be added. Returns: The job created for the task, either scheduled or enqueued for immediate execution. Raises: TaskQueueException: If there is an error while launching the task. """ from eztaskmanager.services import run_management_command if task.scheduling and task.scheduling_utc < timezone.now(): raise TaskQueueException(_("It is not possible to schedule tasks in the past")) # Prevent duplicate scheduling - cancel existing job if present if task.scheduled_job_id: logger.warning( f"Task '{task.name}' (ID: {task.id}) already has scheduled job " f"'{task.scheduled_job_id}'. Cancelling old job before creating new one." ) try: # Try to cancel the existing scheduled job self.scheduler.cancel(task.scheduled_job_id) logger.info( f"Successfully cancelled old scheduled job '{task.scheduled_job_id}' " f"for task '{task.name}' (ID: {task.id})" ) except Exception as e: # Job might not exist in Redis anymore (already executed or manually removed) # This is acceptable - we'll create a new one logger.debug( f"Could not cancel old scheduled job '{task.scheduled_job_id}' " f"for task '{task.name}' (ID: {task.id}): {e}. " f"Job may have already been removed." ) finally: # Clear the old job ID to start fresh task.scheduled_job_id = None try: if task.scheduling: if task.is_periodic: # schedule execution at a point in time, with periodicity rq_job = self.scheduler.schedule( task.scheduling_utc, run_management_command, [task.id], interval=task.interval_in_seconds, result_ttl=int(1.5 * task.interval_in_seconds) ) else: # schedule execution at a point in time, with periodicity rq_job = self.scheduler.enqueue_at( task.scheduling_utc, run_management_command, task.id ) task.scheduled_job_id = rq_job.id task.status = Task.STATUS_SCHEDULED job_id, task.cached_next_ride = self.fetch_job_with_next_time(task) task.save() else: # enqueue for immediate execution rq_job = self.queue.enqueue(run_management_command, task.id) return rq_job except Exception as e: raise TaskQueueException(_(f"Failed to add task: {e}")) from e
[docs] def fetch_job_with_next_time(self, task): """Fetch the next job in the queue, with its execution time.""" try: job_id, next_time = next( (j, next_time) for j, next_time in self.scheduler.get_jobs(with_times=True) if j.id == task.scheduled_job_id ) next_time = timezone.make_aware(next_time, timezone=timezone.timezone.utc) return job_id, next_time except StopIteration: return None, None
[docs] def remove(self, task): """Remove the job from the queue and updates the tasks' values.""" job, next_time = self.fetch_job_with_next_time(task) if job: self.scheduler.cancel(job) task.scheduled_job_id = None task.cached_next_ride = None task.status = Task.STATUS_IDLE task.save()
available_service = RQTaskQueueService except ImportError: try: from celery import Celery, shared_task class CeleryTaskQueueService(TaskQueueService): """A subclass of TaskQueueService that manages tasks using Celery.""" def __init__(self): self.app = Celery( 'tasks', broker='redis://redis:6379/1', backend='redis://redis:6379:2' ) def add(self, task: Task): """Add a task to the Celery queue.""" try: if task.scheduling: delay = task.scheduling - datetime.datetime.now() self.app.send_task( 'eztaskmanager.services.queues.execute_management_command', args=[task], countdown=delay.total_seconds() ) else: self.app.send_task('eztaskmanager.services.queues.execute_management_command', args=[task]) return True except Exception as e: print(f"Error while launching task: {e}") def remove(self, task): """Remove a job from the Celery queue.""" try: # With Celery, it's not straightforward to remove a task from the queue # The recommended way to stop a task is to revoke it self.app.control.revoke(task.id) except Exception as e: print(f"Error while halting task: {e}") @shared_task def execute_management_command(task): """Wrap the management command executor for Celery.""" from eztaskmanager.services import run_management_command return run_management_command(task) available_service = CeleryTaskQueueService except ImportError: raise ImportError("Both django_rq and Celery packages are not installed.")
[docs] def get_task_service(): """Fetch the correct queue service, based on settings.""" if EZTASKMANAGER_QUEUE_SERVICE_TYPE == 'RQ' or available_service == RQTaskQueueService: return RQTaskQueueService() else: return CeleryTaskQueueService()