import re
from io import StringIO
from django.apps import apps
from django.core.management import load_command_class
from django.db import models
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from eztaskmanager.settings import EZTASKMANAGER_N_REPORTS_INLINE
[docs]
class AppCommand(models.Model):
"""An application command representation."""
name = models.CharField(max_length=100)
app_name = models.CharField(max_length=100)
active = models.BooleanField(default=True)
[docs]
def get_command_class(self):
"""Get the command class."""
return load_command_class(app_name=self.app_name, name=self.name)
@property
def help_text(self):
"""Get the command help text."""
output = StringIO()
command_class = self.get_command_class()
command_class.create_parser("", self.name).print_help(file=output)
return output.getvalue()
def __str__(self):
"""Return the string representation of the app command."""
return f"{self.app_name}: {self.name}"
class Meta:
"""Django model options."""
verbose_name = _("Command")
verbose_name_plural = _("Commands")
[docs]
class LaunchReport(models.Model):
"""A report of a task execution with log."""
RESULT_NO = ""
RESULT_OK = "ok"
RESULT_FAILED = "failed"
RESULT_ERRORS = "errors"
RESULT_WARNINGS = "warnings"
RESULT_CHOICES = (
(RESULT_NO, "---"),
(RESULT_OK, "OK"),
(RESULT_FAILED, "FAILED"),
(RESULT_ERRORS, "ERRORS"),
(RESULT_WARNINGS, "WARNINGS"),
)
task = models.ForeignKey("Task", on_delete=models.CASCADE)
invocation_result = models.CharField(
max_length=20, choices=RESULT_CHOICES, default=RESULT_NO
)
invocation_datetime = models.DateTimeField(auto_now_add=True)
[docs]
@classmethod
def get_notification_handlers(cls):
"""Get the list of notification handlers to send the report to."""
return apps.get_app_config('eztaskmanager').notification_handlers
[docs]
def get_log_lines(self):
"""Format the log entries, here is an example."""
log_lines = [
f"{log.timestamp} - {log.level} - {log.message}"
for log in self.logs.order_by('timestamp')
]
return log_lines
[docs]
def read_log_lines(self, offset: int):
"""
Use an offset to read lines of the llog related to the report (self) starting from the offset.
:param: offset lines to start from
:return: 2-tuple (list, int)
- list of lines of log records from offset
- the number of total lines
"""
log_lines = [
f"{log.timestamp} - {log.level} - {log.message}"
for log in self.logs.all()
]
return log_lines[offset:], len(log_lines)
[docs]
def log_tail(self, n_lines=10):
"""Return the last lines of the logs of a launch_report."""
# Get the related logs
logs = self.logs.order_by('-timestamp')[:n_lines]
total_logs = self.logs.count()
hidden_lines = total_logs - n_lines
report_lines = []
if hidden_lines > 0:
report_lines.append(f"{hidden_lines} lines hidden ...")
for log in reversed(logs):
# Format the log entries as you like, here is an example
log_line = f"{log.timestamp} - {log.level} - {log.message}"
report_lines.append(log_line)
report = "\n".join(report_lines)
return report
@property
def n_log_lines(self):
"""Return the number of log lines for this report."""
return self.logs.count()
@property
def n_log_errors(self):
"""Return the number of errors in this report."""
return self.logs.filter(level="ERROR").count()
@property
def n_log_warnings(self):
"""Return the number of warnings in this report."""
return self.logs.filter(level="WARNING").count()
[docs]
def delete(self, *args, **kwargs):
"""Refresh the task cache after deleting this report."""
task = self.task
# call the parent delete method
super().delete(*args, **kwargs)
# re-calculate cache
task.compute_cache()
def __str__(self):
"""Return the string representation of the app command."""
return (
f"LaunchReport {self.task.name} {self.invocation_result}"
f" {self.invocation_datetime}"
)
[docs]
class Log(models.Model):
"""The log generated by a report."""
launch_report = models.ForeignKey(
'LaunchReport',
on_delete=models.CASCADE,
related_name='logs'
)
timestamp = models.DateTimeField(auto_now_add=True)
level = models.CharField(max_length=10)
message = models.TextField()
class Meta:
ordering = ['timestamp']
def __str__(self):
return f'Log {self.id}: {self.level} at {self.timestamp}'
[docs]
class TaskCategory(models.Model):
"""A task category, used to group tasks when numbers go up."""
name = models.CharField(max_length=255)
def __str__(self):
"""Return the string representation of the task category."""
return self.name
class Meta:
"""Django model options."""
verbose_name = _("Task category")
verbose_name_plural = _("Tasks categories")
[docs]
class Task(models.Model):
"""
A command related task.
Represents a management command with a defined set of arguments (
"""
REPETITION_PERIOD_MINUTE = "minute"
REPETITION_PERIOD_HOUR = "hour"
REPETITION_PERIOD_DAY = "day"
REPETITION_PERIOD_WEEK = "week"
REPETITION_PERIOD_MONTH = "month"
REPETITION_PERIOD_CHOICES = (
(REPETITION_PERIOD_MINUTE, "MINUTE"),
(REPETITION_PERIOD_HOUR, "HOUR"),
(REPETITION_PERIOD_DAY, "DAY"),
(REPETITION_PERIOD_MONTH, "MONTH"),
)
STATUS_IDLE = "idle"
STATUS_SPOOLED = "spooled"
STATUS_SCHEDULED = "scheduled"
STATUS_STARTED = "started"
STATUS_CHOICES = (
(STATUS_IDLE, "IDLE"),
(STATUS_SPOOLED, "SPOOLED"),
(STATUS_SCHEDULED, "SCHEDULED"),
(STATUS_STARTED, "STARTED"),
)
name = models.CharField(max_length=255)
command = models.ForeignKey(
AppCommand, on_delete=models.CASCADE, limit_choices_to={"active": True}
)
arguments = models.TextField(
blank=True,
help_text=_(
'Separate arguments with a comma ","'
'and parameters with a blank space " ". '
"eg: -f, --secondarg param1 param2, --thirdarg=pippo, --thirdarg"
),
)
category = models.ForeignKey(
TaskCategory,
on_delete=models.DO_NOTHING,
blank=True,
null=True,
help_text=_("Choose a category for this task"),
)
status = models.CharField(
max_length=20, choices=STATUS_CHOICES, default=STATUS_IDLE, editable=False
)
scheduling = models.DateTimeField(
blank=True, null=True,
verbose_name=_("Initial scheduling")
)
@property
def scheduling_utc(self):
"""Sho the scheduling time, in UTC."""
if self.scheduling:
return self.scheduling.astimezone(timezone.timezone.utc)
else:
return None
repetition_period = models.CharField(
max_length=20, choices=REPETITION_PERIOD_CHOICES, blank=True
)
repetition_rate = models.PositiveSmallIntegerField(blank=True, null=True)
@property
def is_periodic(self):
"""A periodic task is such only if both repetition period and rate are set."""
return self.repetition_period is not None and self.repetition_rate is not None
note = models.TextField(
blank=True, null=True, help_text=_("A note on how this task is used.")
)
scheduled_job_id = models.CharField(
max_length=64,
blank=True, null=True,
help_text=_("A unique identifier for the scheduled job, if any")
)
cached_last_invocation_datetime = models.DateTimeField(
blank=True, null=True, verbose_name=_("Last datetime")
)
cached_last_invocation_result = models.CharField(
max_length=20,
choices=LaunchReport.RESULT_CHOICES,
blank=True,
null=True,
verbose_name=_("Last result"),
)
cached_last_invocation_n_errors = models.PositiveIntegerField(
null=True, blank=True, verbose_name=_("Errors")
)
cached_last_invocation_n_warnings = models.PositiveIntegerField(
null=True, blank=True, verbose_name=_("Warnings")
)
cached_next_ride = models.DateTimeField(
blank=True, null=True, verbose_name=_("Next execution time"),
)
@property
def interval_in_seconds(self):
"""
Returns the interval in seconds based on the repetition period and rate.
Returns:
int: The interval in seconds.
Example:
# Create an instance of the class
obj = MyClass()
# Set the repetition period and rate
obj.repetition_period = 'day'
obj.repetition_rate = 2
# Calculate the interval in seconds
result = obj.interval_in_seconds() # Returns 2 * 24 * 60 * 60
"""
period_to_seconds = {
self.REPETITION_PERIOD_MINUTE: 60,
self.REPETITION_PERIOD_HOUR: 60 * 60,
self.REPETITION_PERIOD_DAY: 24 * 60 * 60,
self.REPETITION_PERIOD_WEEK: 7 * 24 * 60 * 60,
self.REPETITION_PERIOD_MONTH: 30 * 24 * 60 * 60
}
return period_to_seconds[self.repetition_period] * self.repetition_rate
@property
def _args_dict(self):
"""
This method returns a dictionary containing arguments and their corresponding parameters.
It parses the 'arguments' attribute of the instance and splits it into individual arguments
* using a comma as a delimiter. Each argument is then further split into chunks using whitespace or
an equals sign as a separator. The first chunk is considered the argument name, while
* the second chunk, if present, is considered the parameter. The resulting arguments and parameters
are stored in the dictionary 'res'.
Parameters:
- None
Returns:
- res (dict): A dictionary containing argument-parameter pairs.
If the 'arguments' attribute is empty or consists of whitespace characters only,
an empty dictionary is returned.
Example usage:
```
instance = MyClass()
arguments = "arg1, arg2=param2, arg3 = param3"
result = instance._args_dict() # { 'arg1': None, 'arg2': 'param2', 'arg3': 'param3' }
```
"""
res = {}
if not self.arguments or self.arguments.strip() == "":
return res
args = re.split(r"\s*,\s*", self.arguments)
for arg in args:
arg_chunks = [x for x in re.split(r"\s*=\s*|\s+", arg) if x]
argument = arg_chunks[0]
params = arg_chunks[1:] if len(arg_chunks) > 1 else None
if params and len(params) > 1:
params = ' '.join(params)
elif params:
params = params[0]
res[argument] = params
return res
@property
def args(self):
"""Get the task args."""
return [f"{x}" for x, y in self._args_dict.items() if not y]
@property
def options(self):
"""Get the task options."""
return {
f"{x}".strip("-").replace("-", "_"): f"{y}"
for x, y in self._args_dict.items()
if y
}
@property
def complete_args(self):
"""
Returns a list containing all the non-null values from the dictionary of arguments.
Get all task args in order to avoid problems with required options.
:return: A list containing non-null argument values.
As suggested here:
https://stackoverflow.com/questions/32036562/call-command-argument-is-required
"""
return list(
filter(
lambda x: x is not None,
(
item
for sublist in [(k, v) for k, v in self._args_dict.items()]
for item in sublist
),
)
)
[docs]
def compute_cache(self):
"""Compute cached values for this task."""
reports = self.launchreport_set.order_by('-invocation_datetime')
if reports.exists():
latest_report = reports[0] # get the latest execution report
self.cached_last_invocation_datetime = latest_report.invocation_datetime
self.cached_last_invocation_result = latest_report.invocation_result
self.cached_last_invocation_n_errors = latest_report.n_log_errors
self.cached_last_invocation_n_warnings = latest_report.n_log_warnings
# self.cached_next_ride = latest_report.next_ride
else:
# no reports, set all cached values to None
self.cached_last_invocation_datetime = None
self.cached_last_invocation_result = None
self.cached_last_invocation_n_errors = None
self.cached_last_invocation_n_warnings = None
self.cached_next_ride = None
self.save()
[docs]
def prune_reports(self, n: int = EZTASKMANAGER_N_REPORTS_INLINE):
"""Delete all Task's LaunchReports except latest `n`."""
if n:
last_n_reports_ids = (
LaunchReport.objects.filter(task=self)
.order_by("-id")[:n]
.values_list("id", flat=True)
)
LaunchReport.objects.filter(task=self).exclude(
pk__in=list(last_n_reports_ids)
).delete()
self.compute_cache()
def __str__(self):
"""Return the string representation of the task."""
return f"{self.name} ({self.status})"
class Meta:
"""Django model options."""
verbose_name = _("Task")
verbose_name_plural = _("Tasks")