Source code for eztaskmanager.services

import logging
from typing import Optional

from django.core.management import call_command

from eztaskmanager.models import LaunchReport, Task
from eztaskmanager.services.logger import (DatabaseLogHandler,
                                           verbosity2loglevel)
from eztaskmanager.services.notifications import emit_notifications
from eztaskmanager.services.queues import get_task_service


[docs] def run_management_command(task_id: int): """ Execute a management command. Creates a LaunchReport for this execution. :param task_id: The task object representing the management command to be executed. :type task_id: int :return: None """ task: Optional[Task] = None # Define your format fmt = '%(asctime)s - %(levelname)s - %(message)s' datefmt = '%m-%d-%Y %H:%M:%S' # You can define the date format as per your requirement formatter = logging.Formatter(fmt, datefmt) local_logger = logging.getLogger("eztaskmanager.services.logger") # task re-hydration try: task: Task = Task.objects.get(id=task_id) local_logger.setLevel(verbosity2loglevel(int(task.options.get('verbosity', 1)))) except Task.DoesNotExist: local_logger.error(f"Task with id {task_id} not found") finally: local_logger.info('Finished') if task is not None: service = get_task_service() report = LaunchReport(task=task) report.save() task.prune_reports() result = LaunchReport.RESULT_OK # Check and Set DatabaseLogHandler if not any(isinstance(handler, DatabaseLogHandler) for handler in local_logger.handlers): handler = DatabaseLogHandler(report.id) local_logger.addHandler(handler) # Check and Set StreamHandler if not any(isinstance(handler, logging.StreamHandler) for handler in local_logger.handlers): handler = logging.StreamHandler() handler.setFormatter(formatter) local_logger.addHandler(handler) local_logger.info('Starting') task_original_status = task.status task.status = Task.STATUS_STARTED task.save() # Execute the command try: call_command(task.command.name, *task.complete_args, launch_report_id=report.id) except Exception as e: result = LaunchReport.RESULT_FAILED local_logger.error(f"EXCEPTION raised: {e}") finally: local_logger.info('Finished') if result != LaunchReport.RESULT_FAILED: if report.n_log_errors: result = LaunchReport.RESULT_ERRORS elif report.n_log_warnings: result = LaunchReport.RESULT_WARNINGS report.invocation_result = result report.save() task.cached_last_invocation_result = report.invocation_result task.cached_last_invocation_n_errors = report.n_log_errors task.cached_last_invocation_n_warnings = report.n_log_warnings task.cached_last_invocation_datetime = report.invocation_datetime task.status = task_original_status _, task.cached_next_ride = service.fetch_job_with_next_time(task) # a non-periodic task is set back to IDLE and its scheduled job id set to None if task.status == Task.STATUS_SCHEDULED and not task.is_periodic: task.status = Task.STATUS_IDLE task.scheduled_job_id = None task.save() # Finally, emit notifications try: emit_notifications(report) except Exception: pass