Files
Sicherhaven 66e41ba647 feat(audit): extend audit coverage to all admin interactions (v1.13.0)
- _audit_log helper: optional user= kwarg for login-time calls
- AdminLoginView: auth.admin_login / auth.admin_login_failed
- PartnerStatusView: partner.status_changed (atomic)
- PartnerOnboardView: partner.onboarded
- PartnerStaffCreateView: partner.staff.created
- EventCreateView/UpdateView/DeleteView: event.created/updated/deleted (atomic)
- EventPrimaryImageView: event.primary_image_changed
- SettlementReleaseView: settlement.released (atomic)
- ReviewDeleteView: review.deleted (atomic)
- LeadUpdateView: lead.updated
- PaymentGatewaySettingsView: gateway.created/updated/deleted
- tests: AuthAuditEmissionTests + EventCrudAuditTests (16 total, all green)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 13:42:02 +05:30

326 lines
12 KiB
Python

"""Tests for the Audit Log module (admin_api v1.12.0).
Covers:
* `AuditLogListView` — list + search + filter shape
* `AuditLogMetricsView` — shape + `?nocache=1` bypass
* `UserStatusView` — emits a row into `AuditLog` for every status change,
inside the same transaction as the state change
Run with:
python manage.py test admin_api.tests
"""
from __future__ import annotations
from django.core.cache import cache
from django.test import TestCase
from rest_framework_simplejwt.tokens import RefreshToken
from accounts.models import User
from admin_api.models import AuditLog
# ---------------------------------------------------------------------------
# Base — auth helper shared across cases
# ---------------------------------------------------------------------------
class _AuditTestBase(TestCase):
"""Gives each subclass an admin user + pre-issued JWT."""
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(
username='audit.admin@eventifyplus.com',
email='audit.admin@eventifyplus.com',
password='irrelevant',
role='admin',
)
cls.admin.is_superuser = True
cls.admin.save()
def setUp(self):
# Metrics view caches by key; reset to keep cases independent.
cache.delete('admin_api:audit_log:metrics:v1')
access = str(RefreshToken.for_user(self.admin).access_token)
self.auth = {'HTTP_AUTHORIZATION': f'Bearer {access}'}
# ---------------------------------------------------------------------------
# AuditLogListView
# ---------------------------------------------------------------------------
class AuditLogListViewTests(_AuditTestBase):
url = '/api/v1/rbac/audit-log/'
def test_unauthenticated_returns_401(self):
resp = self.client.get(self.url)
self.assertEqual(resp.status_code, 401)
def test_authenticated_returns_paginated_shape(self):
AuditLog.objects.create(
user=self.admin,
action='user.suspended',
target_type='user',
target_id='42',
details={'reason': 'spam'},
)
resp = self.client.get(self.url, **self.auth)
self.assertEqual(resp.status_code, 200, resp.content)
body = resp.json()
for key in ('results', 'total', 'page', 'page_size', 'total_pages'):
self.assertIn(key, body)
self.assertEqual(body['total'], 1)
self.assertEqual(body['results'][0]['action'], 'user.suspended')
self.assertEqual(body['results'][0]['user']['email'], self.admin.email)
def test_search_narrows_results(self):
AuditLog.objects.create(
user=self.admin, action='user.suspended',
target_type='user', target_id='1', details={},
)
AuditLog.objects.create(
user=self.admin, action='event.approved',
target_type='event', target_id='1', details={},
)
resp = self.client.get(self.url, {'search': 'suspend'}, **self.auth)
self.assertEqual(resp.status_code, 200)
body = resp.json()
self.assertEqual(body['total'], 1)
self.assertEqual(body['results'][0]['action'], 'user.suspended')
def test_page_size_is_bounded(self):
# page_size=999 must be clamped to the 200-row upper bound.
resp = self.client.get(self.url, {'page_size': '999'}, **self.auth)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json()['page_size'], 200)
def test_invalid_pagination_falls_back_to_defaults(self):
resp = self.client.get(self.url, {'page': 'x', 'page_size': 'y'}, **self.auth)
self.assertEqual(resp.status_code, 200)
body = resp.json()
self.assertEqual(body['page'], 1)
self.assertEqual(body['page_size'], 50)
# ---------------------------------------------------------------------------
# AuditLogMetricsView
# ---------------------------------------------------------------------------
class AuditLogMetricsViewTests(_AuditTestBase):
url = '/api/v1/rbac/audit-log/metrics/'
def test_unauthenticated_returns_401(self):
resp = self.client.get(self.url)
self.assertEqual(resp.status_code, 401)
def test_returns_expected_shape(self):
AuditLog.objects.create(
user=self.admin, action='event.approved',
target_type='event', target_id='7', details={},
)
AuditLog.objects.create(
user=self.admin, action='department.created',
target_type='department', target_id='3', details={},
)
resp = self.client.get(self.url, **self.auth)
self.assertEqual(resp.status_code, 200, resp.content)
body = resp.json()
for key in ('total', 'today', 'week', 'distinct_users', 'by_action_group'):
self.assertIn(key, body)
self.assertEqual(body['total'], 2)
self.assertEqual(body['distinct_users'], 1)
# Each group present so frontend tooltip can render all 6 rows.
for group in ('create', 'update', 'delete', 'moderate', 'auth', 'other'):
self.assertIn(group, body['by_action_group'])
self.assertEqual(body['by_action_group']['moderate'], 1)
self.assertEqual(body['by_action_group']['create'], 1)
def test_nocache_bypasses_stale_cache(self):
# Prime cache with a fake payload.
cache.set(
'admin_api:audit_log:metrics:v1',
{
'total': 999,
'today': 0, 'week': 0, 'distinct_users': 0,
'by_action_group': {
'create': 0, 'update': 0, 'delete': 0,
'moderate': 0, 'auth': 0, 'other': 0,
},
},
60,
)
resp_cached = self.client.get(self.url, **self.auth)
self.assertEqual(resp_cached.json()['total'], 999)
resp_fresh = self.client.get(self.url, {'nocache': '1'}, **self.auth)
self.assertEqual(resp_fresh.json()['total'], 0) # real DB state
# ---------------------------------------------------------------------------
# UserStatusView — audit emission
# ---------------------------------------------------------------------------
class UserStatusAuditEmissionTests(_AuditTestBase):
"""Each status transition must leave a matching row in `AuditLog`.
The endpoint wraps the state change + audit log in `transaction.atomic()`
so the two can never disagree. These assertions catch regressions where a
new branch forgets the audit call.
"""
def _url(self, user_id: int) -> str:
return f'/api/v1/users/{user_id}/status/'
def setUp(self):
super().setUp()
self.target = User.objects.create_user(
username='target@example.com',
email='target@example.com',
password='irrelevant',
role='customer',
)
def test_suspend_emits_audit_row(self):
resp = self.client.patch(
self._url(self.target.id),
data={'action': 'suspend', 'reason': 'spam flood'},
content_type='application/json',
**self.auth,
)
self.assertEqual(resp.status_code, 200, resp.content)
log = AuditLog.objects.filter(
action='user.suspended', target_id=str(self.target.id),
).first()
self.assertIsNotNone(log, 'suspend did not emit audit log')
self.assertEqual(log.details.get('reason'), 'spam flood')
def test_ban_emits_audit_row(self):
resp = self.client.patch(
self._url(self.target.id),
data={'action': 'ban'},
content_type='application/json',
**self.auth,
)
self.assertEqual(resp.status_code, 200, resp.content)
self.assertTrue(
AuditLog.objects.filter(
action='user.banned', target_id=str(self.target.id),
).exists(),
'ban did not emit audit log',
)
def test_reinstate_emits_audit_row(self):
self.target.is_active = False
self.target.save()
resp = self.client.patch(
self._url(self.target.id),
data={'action': 'reinstate'},
content_type='application/json',
**self.auth,
)
self.assertEqual(resp.status_code, 200, resp.content)
self.assertTrue(
AuditLog.objects.filter(
action='user.reinstated', target_id=str(self.target.id),
).exists(),
'reinstate did not emit audit log',
)
# ---------------------------------------------------------------------------
# AdminLoginView — audit emission
# ---------------------------------------------------------------------------
class AuthAuditEmissionTests(_AuditTestBase):
"""Successful and failed logins must leave matching rows in AuditLog."""
url = '/api/v1/admin/auth/login/'
def test_successful_login_emits_audit_row(self):
resp = self.client.post(
self.url,
data={'username': self.admin.username, 'password': 'irrelevant'},
content_type='application/json',
)
self.assertEqual(resp.status_code, 200, resp.content)
log = AuditLog.objects.filter(
action='auth.admin_login', target_id=str(self.admin.id),
).first()
self.assertIsNotNone(log, 'successful login did not emit audit log')
self.assertEqual(log.details.get('username'), self.admin.username)
def test_failed_login_emits_audit_row(self):
resp = self.client.post(
self.url,
data={'username': self.admin.username, 'password': 'wrong-password'},
content_type='application/json',
)
self.assertEqual(resp.status_code, 401, resp.content)
self.assertTrue(
AuditLog.objects.filter(action='auth.admin_login_failed').exists(),
'failed login did not emit audit log',
)
# ---------------------------------------------------------------------------
# EventCreateView / EventUpdateView / EventDeleteView — audit emission
# ---------------------------------------------------------------------------
class EventCrudAuditTests(_AuditTestBase):
"""Event CRUD operations must emit matching audit rows."""
def setUp(self):
super().setUp()
from events.models import EventType
self.event_type = EventType.objects.create(event_type='Test Category')
def _create_event_id(self):
resp = self.client.post(
'/api/v1/events/create/',
data={'title': 'Test Event', 'eventType': self.event_type.id},
content_type='application/json',
**self.auth,
)
self.assertEqual(resp.status_code, 201, resp.content)
return resp.json()['id']
def test_create_event_emits_audit_row(self):
event_id = self._create_event_id()
self.assertTrue(
AuditLog.objects.filter(action='event.created', target_id=str(event_id)).exists(),
'event create did not emit audit log',
)
def test_update_event_emits_audit_row(self):
event_id = self._create_event_id()
AuditLog.objects.all().delete()
resp = self.client.patch(
f'/api/v1/events/{event_id}/update/',
data={'title': 'Updated Title'},
content_type='application/json',
**self.auth,
)
self.assertEqual(resp.status_code, 200, resp.content)
log = AuditLog.objects.filter(action='event.updated', target_id=str(event_id)).first()
self.assertIsNotNone(log, 'event update did not emit audit log')
self.assertIn('title', log.details.get('changed_fields', []))
def test_delete_event_emits_audit_row(self):
event_id = self._create_event_id()
AuditLog.objects.all().delete()
resp = self.client.delete(
f'/api/v1/events/{event_id}/delete/',
**self.auth,
)
self.assertEqual(resp.status_code, 204)
self.assertTrue(
AuditLog.objects.filter(action='event.deleted', target_id=str(event_id)).exists(),
'event delete did not emit audit log',
)