154 lines
5.7 KiB
Python
154 lines
5.7 KiB
Python
|
|
"""Dispatch due ``NotificationSchedule`` jobs.
|
||
|
|
|
||
|
|
Host cron invokes this every ~15 minutes via ``docker exec``. The command
|
||
|
|
walks all active schedules, evaluates their cron expression against
|
||
|
|
``last_run_at`` using ``croniter``, and fires any that are due. A row-level
|
||
|
|
``select_for_update(skip_locked=True)`` prevents duplicate sends if two cron
|
||
|
|
ticks race or the container is restarted mid-run.
|
||
|
|
|
||
|
|
Evaluation timezone is **Asia/Kolkata** to match
|
||
|
|
``notifications/emails.py::_upcoming_week_bounds`` — the same wall-clock week
|
||
|
|
used in the outgoing email body.
|
||
|
|
|
||
|
|
Flags:
|
||
|
|
--schedule-id <id> Fire exactly one schedule, ignoring cron check.
|
||
|
|
--dry-run Resolve due schedules + render emails, send nothing.
|
||
|
|
"""
|
||
|
|
from datetime import datetime, timedelta
|
||
|
|
|
||
|
|
try:
|
||
|
|
from zoneinfo import ZoneInfo
|
||
|
|
except ImportError: # pragma: no cover — py<3.9
|
||
|
|
from backports.zoneinfo import ZoneInfo # type: ignore
|
||
|
|
|
||
|
|
from croniter import croniter
|
||
|
|
from django.core.management.base import BaseCommand, CommandError
|
||
|
|
from django.db import transaction
|
||
|
|
from django.utils import timezone
|
||
|
|
|
||
|
|
from eventify_logger.services import log
|
||
|
|
from notifications.emails import BUILDERS, render_and_send
|
||
|
|
from notifications.models import NotificationSchedule
|
||
|
|
|
||
|
|
|
||
|
|
IST = ZoneInfo('Asia/Kolkata')
|
||
|
|
|
||
|
|
|
||
|
|
def _is_due(schedule: NotificationSchedule, now_ist: datetime) -> bool:
|
||
|
|
"""Return True if ``schedule`` should fire at ``now_ist``.
|
||
|
|
|
||
|
|
``croniter`` is seeded with ``last_run_at`` (or one year ago for a fresh
|
||
|
|
schedule) and asked for the next fire time. If that time has already
|
||
|
|
passed relative to ``now_ist`` the schedule is due.
|
||
|
|
"""
|
||
|
|
if not croniter.is_valid(schedule.cron_expression):
|
||
|
|
return False
|
||
|
|
|
||
|
|
if schedule.last_run_at is not None:
|
||
|
|
seed = schedule.last_run_at.astimezone(IST)
|
||
|
|
else:
|
||
|
|
seed = now_ist - timedelta(days=365)
|
||
|
|
|
||
|
|
itr = croniter(schedule.cron_expression, seed)
|
||
|
|
next_fire = itr.get_next(datetime)
|
||
|
|
return next_fire <= now_ist
|
||
|
|
|
||
|
|
|
||
|
|
class Command(BaseCommand):
|
||
|
|
help = 'Dispatch due NotificationSchedule email jobs.'
|
||
|
|
|
||
|
|
def add_arguments(self, parser):
|
||
|
|
parser.add_argument(
|
||
|
|
'--schedule-id', type=int, default=None,
|
||
|
|
help='Force-run a single schedule by ID, ignoring cron check.',
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
'--dry-run', action='store_true',
|
||
|
|
help='Render and log but do not send or persist last_run_at.',
|
||
|
|
)
|
||
|
|
|
||
|
|
def handle(self, *args, **opts):
|
||
|
|
schedule_id = opts.get('schedule_id')
|
||
|
|
dry_run = opts.get('dry_run', False)
|
||
|
|
|
||
|
|
now_ist = datetime.now(IST)
|
||
|
|
qs = NotificationSchedule.objects.filter(is_active=True)
|
||
|
|
if schedule_id is not None:
|
||
|
|
qs = qs.filter(id=schedule_id)
|
||
|
|
|
||
|
|
candidate_ids = list(qs.values_list('id', flat=True))
|
||
|
|
if not candidate_ids:
|
||
|
|
self.stdout.write('No active schedules to evaluate.')
|
||
|
|
return
|
||
|
|
|
||
|
|
fired = 0
|
||
|
|
skipped = 0
|
||
|
|
errored = 0
|
||
|
|
|
||
|
|
for sid in candidate_ids:
|
||
|
|
with transaction.atomic():
|
||
|
|
locked_qs = (
|
||
|
|
NotificationSchedule.objects
|
||
|
|
.select_for_update(skip_locked=True)
|
||
|
|
.filter(id=sid, is_active=True)
|
||
|
|
)
|
||
|
|
schedule = locked_qs.first()
|
||
|
|
if schedule is None:
|
||
|
|
skipped += 1
|
||
|
|
continue
|
||
|
|
|
||
|
|
forced = schedule_id is not None
|
||
|
|
if not forced and not _is_due(schedule, now_ist):
|
||
|
|
skipped += 1
|
||
|
|
continue
|
||
|
|
|
||
|
|
if schedule.notification_type not in BUILDERS:
|
||
|
|
schedule.last_status = NotificationSchedule.STATUS_ERROR
|
||
|
|
schedule.last_error = (
|
||
|
|
f'No builder registered for {schedule.notification_type!r}'
|
||
|
|
)
|
||
|
|
schedule.save(update_fields=['last_status', 'last_error', 'updated_at'])
|
||
|
|
errored += 1
|
||
|
|
continue
|
||
|
|
|
||
|
|
if dry_run:
|
||
|
|
self.stdout.write(
|
||
|
|
f'[dry-run] would fire schedule {schedule.id} '
|
||
|
|
f'({schedule.name}) type={schedule.notification_type}'
|
||
|
|
)
|
||
|
|
fired += 1
|
||
|
|
continue
|
||
|
|
|
||
|
|
try:
|
||
|
|
recipient_count = render_and_send(schedule)
|
||
|
|
except Exception as exc: # noqa: BLE001 — wide catch, store msg
|
||
|
|
log('error', 'notification dispatch failed', logger_data={
|
||
|
|
'schedule_id': schedule.id,
|
||
|
|
'schedule_name': schedule.name,
|
||
|
|
'error': str(exc),
|
||
|
|
})
|
||
|
|
schedule.last_status = NotificationSchedule.STATUS_ERROR
|
||
|
|
schedule.last_error = str(exc)[:2000]
|
||
|
|
schedule.save(update_fields=['last_status', 'last_error', 'updated_at'])
|
||
|
|
errored += 1
|
||
|
|
continue
|
||
|
|
|
||
|
|
schedule.last_run_at = timezone.now()
|
||
|
|
schedule.last_status = NotificationSchedule.STATUS_SUCCESS
|
||
|
|
schedule.last_error = ''
|
||
|
|
schedule.save(update_fields=[
|
||
|
|
'last_run_at', 'last_status', 'last_error', 'updated_at',
|
||
|
|
])
|
||
|
|
fired += 1
|
||
|
|
self.stdout.write(
|
||
|
|
f'Fired schedule {schedule.id} ({schedule.name}) '
|
||
|
|
f'→ {recipient_count} recipient(s)'
|
||
|
|
)
|
||
|
|
|
||
|
|
summary = f'Done. fired={fired} skipped={skipped} errored={errored}'
|
||
|
|
self.stdout.write(summary)
|
||
|
|
log('info', 'send_scheduled_notifications complete', logger_data={
|
||
|
|
'fired': fired, 'skipped': skipped, 'errored': errored,
|
||
|
|
'dry_run': dry_run, 'forced_id': schedule_id,
|
||
|
|
})
|