feat(audit): add Audit Log module — coverage, metrics endpoint, indexes
- UserStatusView, EventModerationView, ReviewModerationView,
PartnerKYCReviewView: each state change now emits _audit_log()
inside the same transaction.atomic() block so the log stays
consistent with DB state on partial failure
- AuditLogMetricsView: GET /api/v1/rbac/audit-log/metrics/ returns
total/today/week/distinct_users/by_action_group; 60 s cache with
?nocache=1 bypass
- AuditLogListView: free-text search (Q over action/target/user),
page_size bounded to [1, 200]
- accounts.User.ALL_MODULES += 'audit-log';
StaffProfile.SCOPE_TO_MODULE['audit'] = 'audit-log'
- Migration 0005: composite indexes (action,-created_at) and
(target_type,target_id) on AuditLog
- admin_api/tests.py: 11 tests covering list shape, search,
page bounds, metrics shape+nocache, suspend/ban/reinstate
audit emission
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 12:39:38 +05:30
|
|
|
"""Tests for the Audit Log module (admin_api v1.12.0).
|
|
|
|
|
|
|
|
|
|
Covers:
|
|
|
|
|
* `AuditLogListView` — list + search + filter shape
|
|
|
|
|
* `AuditLogMetricsView` — shape + `?nocache=1` bypass
|
|
|
|
|
* `UserStatusView` — emits a row into `AuditLog` for every status change,
|
|
|
|
|
inside the same transaction as the state change
|
|
|
|
|
|
|
|
|
|
Run with:
|
|
|
|
|
python manage.py test admin_api.tests
|
|
|
|
|
"""
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
from django.core.cache import cache
|
|
|
|
|
from django.test import TestCase
|
|
|
|
|
from rest_framework_simplejwt.tokens import RefreshToken
|
|
|
|
|
|
|
|
|
|
from accounts.models import User
|
|
|
|
|
from admin_api.models import AuditLog
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Base — auth helper shared across cases
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _AuditTestBase(TestCase):
|
|
|
|
|
"""Gives each subclass an admin user + pre-issued JWT."""
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def setUpTestData(cls):
|
|
|
|
|
cls.admin = User.objects.create_user(
|
|
|
|
|
username='audit.admin@eventifyplus.com',
|
|
|
|
|
email='audit.admin@eventifyplus.com',
|
|
|
|
|
password='irrelevant',
|
|
|
|
|
role='admin',
|
|
|
|
|
)
|
|
|
|
|
cls.admin.is_superuser = True
|
|
|
|
|
cls.admin.save()
|
|
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
|
# Metrics view caches by key; reset to keep cases independent.
|
|
|
|
|
cache.delete('admin_api:audit_log:metrics:v1')
|
|
|
|
|
access = str(RefreshToken.for_user(self.admin).access_token)
|
|
|
|
|
self.auth = {'HTTP_AUTHORIZATION': f'Bearer {access}'}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# AuditLogListView
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AuditLogListViewTests(_AuditTestBase):
|
|
|
|
|
url = '/api/v1/rbac/audit-log/'
|
|
|
|
|
|
|
|
|
|
def test_unauthenticated_returns_401(self):
|
|
|
|
|
resp = self.client.get(self.url)
|
|
|
|
|
self.assertEqual(resp.status_code, 401)
|
|
|
|
|
|
|
|
|
|
def test_authenticated_returns_paginated_shape(self):
|
|
|
|
|
AuditLog.objects.create(
|
|
|
|
|
user=self.admin,
|
|
|
|
|
action='user.suspended',
|
|
|
|
|
target_type='user',
|
|
|
|
|
target_id='42',
|
|
|
|
|
details={'reason': 'spam'},
|
|
|
|
|
)
|
|
|
|
|
resp = self.client.get(self.url, **self.auth)
|
|
|
|
|
self.assertEqual(resp.status_code, 200, resp.content)
|
|
|
|
|
body = resp.json()
|
|
|
|
|
for key in ('results', 'total', 'page', 'page_size', 'total_pages'):
|
|
|
|
|
self.assertIn(key, body)
|
|
|
|
|
self.assertEqual(body['total'], 1)
|
|
|
|
|
self.assertEqual(body['results'][0]['action'], 'user.suspended')
|
|
|
|
|
self.assertEqual(body['results'][0]['user']['email'], self.admin.email)
|
|
|
|
|
|
|
|
|
|
def test_search_narrows_results(self):
|
|
|
|
|
AuditLog.objects.create(
|
|
|
|
|
user=self.admin, action='user.suspended',
|
|
|
|
|
target_type='user', target_id='1', details={},
|
|
|
|
|
)
|
|
|
|
|
AuditLog.objects.create(
|
|
|
|
|
user=self.admin, action='event.approved',
|
|
|
|
|
target_type='event', target_id='1', details={},
|
|
|
|
|
)
|
|
|
|
|
resp = self.client.get(self.url, {'search': 'suspend'}, **self.auth)
|
|
|
|
|
self.assertEqual(resp.status_code, 200)
|
|
|
|
|
body = resp.json()
|
|
|
|
|
self.assertEqual(body['total'], 1)
|
|
|
|
|
self.assertEqual(body['results'][0]['action'], 'user.suspended')
|
|
|
|
|
|
|
|
|
|
def test_page_size_is_bounded(self):
|
|
|
|
|
# page_size=999 must be clamped to the 200-row upper bound.
|
|
|
|
|
resp = self.client.get(self.url, {'page_size': '999'}, **self.auth)
|
|
|
|
|
self.assertEqual(resp.status_code, 200)
|
|
|
|
|
self.assertEqual(resp.json()['page_size'], 200)
|
|
|
|
|
|
|
|
|
|
def test_invalid_pagination_falls_back_to_defaults(self):
|
|
|
|
|
resp = self.client.get(self.url, {'page': 'x', 'page_size': 'y'}, **self.auth)
|
|
|
|
|
self.assertEqual(resp.status_code, 200)
|
|
|
|
|
body = resp.json()
|
|
|
|
|
self.assertEqual(body['page'], 1)
|
|
|
|
|
self.assertEqual(body['page_size'], 50)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# AuditLogMetricsView
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AuditLogMetricsViewTests(_AuditTestBase):
|
|
|
|
|
url = '/api/v1/rbac/audit-log/metrics/'
|
|
|
|
|
|
|
|
|
|
def test_unauthenticated_returns_401(self):
|
|
|
|
|
resp = self.client.get(self.url)
|
|
|
|
|
self.assertEqual(resp.status_code, 401)
|
|
|
|
|
|
|
|
|
|
def test_returns_expected_shape(self):
|
|
|
|
|
AuditLog.objects.create(
|
|
|
|
|
user=self.admin, action='event.approved',
|
|
|
|
|
target_type='event', target_id='7', details={},
|
|
|
|
|
)
|
|
|
|
|
AuditLog.objects.create(
|
|
|
|
|
user=self.admin, action='department.created',
|
|
|
|
|
target_type='department', target_id='3', details={},
|
|
|
|
|
)
|
|
|
|
|
resp = self.client.get(self.url, **self.auth)
|
|
|
|
|
self.assertEqual(resp.status_code, 200, resp.content)
|
|
|
|
|
body = resp.json()
|
|
|
|
|
for key in ('total', 'today', 'week', 'distinct_users', 'by_action_group'):
|
|
|
|
|
self.assertIn(key, body)
|
|
|
|
|
self.assertEqual(body['total'], 2)
|
|
|
|
|
self.assertEqual(body['distinct_users'], 1)
|
|
|
|
|
# Each group present so frontend tooltip can render all 6 rows.
|
|
|
|
|
for group in ('create', 'update', 'delete', 'moderate', 'auth', 'other'):
|
|
|
|
|
self.assertIn(group, body['by_action_group'])
|
|
|
|
|
self.assertEqual(body['by_action_group']['moderate'], 1)
|
|
|
|
|
self.assertEqual(body['by_action_group']['create'], 1)
|
|
|
|
|
|
|
|
|
|
def test_nocache_bypasses_stale_cache(self):
|
|
|
|
|
# Prime cache with a fake payload.
|
|
|
|
|
cache.set(
|
|
|
|
|
'admin_api:audit_log:metrics:v1',
|
|
|
|
|
{
|
|
|
|
|
'total': 999,
|
|
|
|
|
'today': 0, 'week': 0, 'distinct_users': 0,
|
|
|
|
|
'by_action_group': {
|
|
|
|
|
'create': 0, 'update': 0, 'delete': 0,
|
|
|
|
|
'moderate': 0, 'auth': 0, 'other': 0,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
60,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
resp_cached = self.client.get(self.url, **self.auth)
|
|
|
|
|
self.assertEqual(resp_cached.json()['total'], 999)
|
|
|
|
|
|
|
|
|
|
resp_fresh = self.client.get(self.url, {'nocache': '1'}, **self.auth)
|
|
|
|
|
self.assertEqual(resp_fresh.json()['total'], 0) # real DB state
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# UserStatusView — audit emission
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class UserStatusAuditEmissionTests(_AuditTestBase):
|
|
|
|
|
"""Each status transition must leave a matching row in `AuditLog`.
|
|
|
|
|
|
|
|
|
|
The endpoint wraps the state change + audit log in `transaction.atomic()`
|
|
|
|
|
so the two can never disagree. These assertions catch regressions where a
|
|
|
|
|
new branch forgets the audit call.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def _url(self, user_id: int) -> str:
|
|
|
|
|
return f'/api/v1/users/{user_id}/status/'
|
|
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
|
super().setUp()
|
|
|
|
|
self.target = User.objects.create_user(
|
|
|
|
|
username='target@example.com',
|
|
|
|
|
email='target@example.com',
|
|
|
|
|
password='irrelevant',
|
|
|
|
|
role='customer',
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def test_suspend_emits_audit_row(self):
|
|
|
|
|
resp = self.client.patch(
|
|
|
|
|
self._url(self.target.id),
|
|
|
|
|
data={'action': 'suspend', 'reason': 'spam flood'},
|
|
|
|
|
content_type='application/json',
|
|
|
|
|
**self.auth,
|
|
|
|
|
)
|
|
|
|
|
self.assertEqual(resp.status_code, 200, resp.content)
|
|
|
|
|
log = AuditLog.objects.filter(
|
|
|
|
|
action='user.suspended', target_id=str(self.target.id),
|
|
|
|
|
).first()
|
|
|
|
|
self.assertIsNotNone(log, 'suspend did not emit audit log')
|
|
|
|
|
self.assertEqual(log.details.get('reason'), 'spam flood')
|
|
|
|
|
|
|
|
|
|
def test_ban_emits_audit_row(self):
|
|
|
|
|
resp = self.client.patch(
|
|
|
|
|
self._url(self.target.id),
|
|
|
|
|
data={'action': 'ban'},
|
|
|
|
|
content_type='application/json',
|
|
|
|
|
**self.auth,
|
|
|
|
|
)
|
|
|
|
|
self.assertEqual(resp.status_code, 200, resp.content)
|
|
|
|
|
self.assertTrue(
|
|
|
|
|
AuditLog.objects.filter(
|
|
|
|
|
action='user.banned', target_id=str(self.target.id),
|
|
|
|
|
).exists(),
|
|
|
|
|
'ban did not emit audit log',
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def test_reinstate_emits_audit_row(self):
|
|
|
|
|
self.target.is_active = False
|
|
|
|
|
self.target.save()
|
|
|
|
|
resp = self.client.patch(
|
|
|
|
|
self._url(self.target.id),
|
|
|
|
|
data={'action': 'reinstate'},
|
|
|
|
|
content_type='application/json',
|
|
|
|
|
**self.auth,
|
|
|
|
|
)
|
|
|
|
|
self.assertEqual(resp.status_code, 200, resp.content)
|
|
|
|
|
self.assertTrue(
|
|
|
|
|
AuditLog.objects.filter(
|
|
|
|
|
action='user.reinstated', target_id=str(self.target.id),
|
|
|
|
|
).exists(),
|
|
|
|
|
'reinstate did not emit audit log',
|
|
|
|
|
)
|
2026-04-21 13:42:02 +05:30
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# AdminLoginView — audit emission
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AuthAuditEmissionTests(_AuditTestBase):
|
|
|
|
|
"""Successful and failed logins must leave matching rows in AuditLog."""
|
|
|
|
|
|
|
|
|
|
url = '/api/v1/admin/auth/login/'
|
|
|
|
|
|
|
|
|
|
def test_successful_login_emits_audit_row(self):
|
|
|
|
|
resp = self.client.post(
|
|
|
|
|
self.url,
|
|
|
|
|
data={'username': self.admin.username, 'password': 'irrelevant'},
|
|
|
|
|
content_type='application/json',
|
|
|
|
|
)
|
|
|
|
|
self.assertEqual(resp.status_code, 200, resp.content)
|
|
|
|
|
log = AuditLog.objects.filter(
|
|
|
|
|
action='auth.admin_login', target_id=str(self.admin.id),
|
|
|
|
|
).first()
|
|
|
|
|
self.assertIsNotNone(log, 'successful login did not emit audit log')
|
|
|
|
|
self.assertEqual(log.details.get('username'), self.admin.username)
|
|
|
|
|
|
|
|
|
|
def test_failed_login_emits_audit_row(self):
|
|
|
|
|
resp = self.client.post(
|
|
|
|
|
self.url,
|
|
|
|
|
data={'username': self.admin.username, 'password': 'wrong-password'},
|
|
|
|
|
content_type='application/json',
|
|
|
|
|
)
|
|
|
|
|
self.assertEqual(resp.status_code, 401, resp.content)
|
|
|
|
|
self.assertTrue(
|
|
|
|
|
AuditLog.objects.filter(action='auth.admin_login_failed').exists(),
|
|
|
|
|
'failed login did not emit audit log',
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# EventCreateView / EventUpdateView / EventDeleteView — audit emission
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EventCrudAuditTests(_AuditTestBase):
|
|
|
|
|
"""Event CRUD operations must emit matching audit rows."""
|
|
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
|
super().setUp()
|
|
|
|
|
from events.models import EventType
|
|
|
|
|
self.event_type = EventType.objects.create(event_type='Test Category')
|
|
|
|
|
|
|
|
|
|
def _create_event_id(self):
|
|
|
|
|
resp = self.client.post(
|
|
|
|
|
'/api/v1/events/create/',
|
|
|
|
|
data={'title': 'Test Event', 'eventType': self.event_type.id},
|
|
|
|
|
content_type='application/json',
|
|
|
|
|
**self.auth,
|
|
|
|
|
)
|
|
|
|
|
self.assertEqual(resp.status_code, 201, resp.content)
|
|
|
|
|
return resp.json()['id']
|
|
|
|
|
|
|
|
|
|
def test_create_event_emits_audit_row(self):
|
|
|
|
|
event_id = self._create_event_id()
|
|
|
|
|
self.assertTrue(
|
|
|
|
|
AuditLog.objects.filter(action='event.created', target_id=str(event_id)).exists(),
|
|
|
|
|
'event create did not emit audit log',
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def test_update_event_emits_audit_row(self):
|
|
|
|
|
event_id = self._create_event_id()
|
|
|
|
|
AuditLog.objects.all().delete()
|
|
|
|
|
resp = self.client.patch(
|
|
|
|
|
f'/api/v1/events/{event_id}/update/',
|
|
|
|
|
data={'title': 'Updated Title'},
|
|
|
|
|
content_type='application/json',
|
|
|
|
|
**self.auth,
|
|
|
|
|
)
|
|
|
|
|
self.assertEqual(resp.status_code, 200, resp.content)
|
|
|
|
|
log = AuditLog.objects.filter(action='event.updated', target_id=str(event_id)).first()
|
|
|
|
|
self.assertIsNotNone(log, 'event update did not emit audit log')
|
|
|
|
|
self.assertIn('title', log.details.get('changed_fields', []))
|
|
|
|
|
|
|
|
|
|
def test_delete_event_emits_audit_row(self):
|
|
|
|
|
event_id = self._create_event_id()
|
|
|
|
|
AuditLog.objects.all().delete()
|
|
|
|
|
resp = self.client.delete(
|
|
|
|
|
f'/api/v1/events/{event_id}/delete/',
|
|
|
|
|
**self.auth,
|
|
|
|
|
)
|
|
|
|
|
self.assertEqual(resp.status_code, 204)
|
|
|
|
|
self.assertTrue(
|
|
|
|
|
AuditLog.objects.filter(action='event.deleted', target_id=str(event_id)).exists(),
|
|
|
|
|
'event delete did not emit audit log',
|
|
|
|
|
)
|