"""Tests for the Audit Log module (admin_api v1.12.0). Covers: * `AuditLogListView` — list + search + filter shape * `AuditLogMetricsView` — shape + `?nocache=1` bypass * `UserStatusView` — emits a row into `AuditLog` for every status change, inside the same transaction as the state change Run with: python manage.py test admin_api.tests """ from __future__ import annotations from django.core.cache import cache from django.test import TestCase from rest_framework_simplejwt.tokens import RefreshToken from accounts.models import User from admin_api.models import AuditLog # --------------------------------------------------------------------------- # Base — auth helper shared across cases # --------------------------------------------------------------------------- class _AuditTestBase(TestCase): """Gives each subclass an admin user + pre-issued JWT.""" @classmethod def setUpTestData(cls): cls.admin = User.objects.create_user( username='audit.admin@eventifyplus.com', email='audit.admin@eventifyplus.com', password='irrelevant', role='admin', ) cls.admin.is_superuser = True cls.admin.save() def setUp(self): # Metrics view caches by key; reset to keep cases independent. cache.delete('admin_api:audit_log:metrics:v1') access = str(RefreshToken.for_user(self.admin).access_token) self.auth = {'HTTP_AUTHORIZATION': f'Bearer {access}'} # --------------------------------------------------------------------------- # AuditLogListView # --------------------------------------------------------------------------- class AuditLogListViewTests(_AuditTestBase): url = '/api/v1/rbac/audit-log/' def test_unauthenticated_returns_401(self): resp = self.client.get(self.url) self.assertEqual(resp.status_code, 401) def test_authenticated_returns_paginated_shape(self): AuditLog.objects.create( user=self.admin, action='user.suspended', target_type='user', target_id='42', details={'reason': 'spam'}, ) resp = self.client.get(self.url, **self.auth) self.assertEqual(resp.status_code, 200, resp.content) body = resp.json() for key in ('results', 'total', 'page', 'page_size', 'total_pages'): self.assertIn(key, body) self.assertEqual(body['total'], 1) self.assertEqual(body['results'][0]['action'], 'user.suspended') self.assertEqual(body['results'][0]['user']['email'], self.admin.email) def test_search_narrows_results(self): AuditLog.objects.create( user=self.admin, action='user.suspended', target_type='user', target_id='1', details={}, ) AuditLog.objects.create( user=self.admin, action='event.approved', target_type='event', target_id='1', details={}, ) resp = self.client.get(self.url, {'search': 'suspend'}, **self.auth) self.assertEqual(resp.status_code, 200) body = resp.json() self.assertEqual(body['total'], 1) self.assertEqual(body['results'][0]['action'], 'user.suspended') def test_page_size_is_bounded(self): # page_size=999 must be clamped to the 200-row upper bound. resp = self.client.get(self.url, {'page_size': '999'}, **self.auth) self.assertEqual(resp.status_code, 200) self.assertEqual(resp.json()['page_size'], 200) def test_invalid_pagination_falls_back_to_defaults(self): resp = self.client.get(self.url, {'page': 'x', 'page_size': 'y'}, **self.auth) self.assertEqual(resp.status_code, 200) body = resp.json() self.assertEqual(body['page'], 1) self.assertEqual(body['page_size'], 50) # --------------------------------------------------------------------------- # AuditLogMetricsView # --------------------------------------------------------------------------- class AuditLogMetricsViewTests(_AuditTestBase): url = '/api/v1/rbac/audit-log/metrics/' def test_unauthenticated_returns_401(self): resp = self.client.get(self.url) self.assertEqual(resp.status_code, 401) def test_returns_expected_shape(self): AuditLog.objects.create( user=self.admin, action='event.approved', target_type='event', target_id='7', details={}, ) AuditLog.objects.create( user=self.admin, action='department.created', target_type='department', target_id='3', details={}, ) resp = self.client.get(self.url, **self.auth) self.assertEqual(resp.status_code, 200, resp.content) body = resp.json() for key in ('total', 'today', 'week', 'distinct_users', 'by_action_group'): self.assertIn(key, body) self.assertEqual(body['total'], 2) self.assertEqual(body['distinct_users'], 1) # Each group present so frontend tooltip can render all 6 rows. for group in ('create', 'update', 'delete', 'moderate', 'auth', 'other'): self.assertIn(group, body['by_action_group']) self.assertEqual(body['by_action_group']['moderate'], 1) self.assertEqual(body['by_action_group']['create'], 1) def test_nocache_bypasses_stale_cache(self): # Prime cache with a fake payload. cache.set( 'admin_api:audit_log:metrics:v1', { 'total': 999, 'today': 0, 'week': 0, 'distinct_users': 0, 'by_action_group': { 'create': 0, 'update': 0, 'delete': 0, 'moderate': 0, 'auth': 0, 'other': 0, }, }, 60, ) resp_cached = self.client.get(self.url, **self.auth) self.assertEqual(resp_cached.json()['total'], 999) resp_fresh = self.client.get(self.url, {'nocache': '1'}, **self.auth) self.assertEqual(resp_fresh.json()['total'], 0) # real DB state # --------------------------------------------------------------------------- # UserStatusView — audit emission # --------------------------------------------------------------------------- class UserStatusAuditEmissionTests(_AuditTestBase): """Each status transition must leave a matching row in `AuditLog`. The endpoint wraps the state change + audit log in `transaction.atomic()` so the two can never disagree. These assertions catch regressions where a new branch forgets the audit call. """ def _url(self, user_id: int) -> str: return f'/api/v1/users/{user_id}/status/' def setUp(self): super().setUp() self.target = User.objects.create_user( username='target@example.com', email='target@example.com', password='irrelevant', role='customer', ) def test_suspend_emits_audit_row(self): resp = self.client.patch( self._url(self.target.id), data={'action': 'suspend', 'reason': 'spam flood'}, content_type='application/json', **self.auth, ) self.assertEqual(resp.status_code, 200, resp.content) log = AuditLog.objects.filter( action='user.suspended', target_id=str(self.target.id), ).first() self.assertIsNotNone(log, 'suspend did not emit audit log') self.assertEqual(log.details.get('reason'), 'spam flood') def test_ban_emits_audit_row(self): resp = self.client.patch( self._url(self.target.id), data={'action': 'ban'}, content_type='application/json', **self.auth, ) self.assertEqual(resp.status_code, 200, resp.content) self.assertTrue( AuditLog.objects.filter( action='user.banned', target_id=str(self.target.id), ).exists(), 'ban did not emit audit log', ) def test_reinstate_emits_audit_row(self): self.target.is_active = False self.target.save() resp = self.client.patch( self._url(self.target.id), data={'action': 'reinstate'}, content_type='application/json', **self.auth, ) self.assertEqual(resp.status_code, 200, resp.content) self.assertTrue( AuditLog.objects.filter( action='user.reinstated', target_id=str(self.target.id), ).exists(), 'reinstate did not emit audit log', ) # --------------------------------------------------------------------------- # AdminLoginView — audit emission # --------------------------------------------------------------------------- class AuthAuditEmissionTests(_AuditTestBase): """Successful and failed logins must leave matching rows in AuditLog.""" url = '/api/v1/admin/auth/login/' def test_successful_login_emits_audit_row(self): resp = self.client.post( self.url, data={'username': self.admin.username, 'password': 'irrelevant'}, content_type='application/json', ) self.assertEqual(resp.status_code, 200, resp.content) log = AuditLog.objects.filter( action='auth.admin_login', target_id=str(self.admin.id), ).first() self.assertIsNotNone(log, 'successful login did not emit audit log') self.assertEqual(log.details.get('username'), self.admin.username) def test_failed_login_emits_audit_row(self): resp = self.client.post( self.url, data={'username': self.admin.username, 'password': 'wrong-password'}, content_type='application/json', ) self.assertEqual(resp.status_code, 401, resp.content) self.assertTrue( AuditLog.objects.filter(action='auth.admin_login_failed').exists(), 'failed login did not emit audit log', ) # --------------------------------------------------------------------------- # EventCreateView / EventUpdateView / EventDeleteView — audit emission # --------------------------------------------------------------------------- class EventCrudAuditTests(_AuditTestBase): """Event CRUD operations must emit matching audit rows.""" def setUp(self): super().setUp() from events.models import EventType self.event_type = EventType.objects.create(event_type='Test Category') def _create_event_id(self): resp = self.client.post( '/api/v1/events/create/', data={'title': 'Test Event', 'eventType': self.event_type.id}, content_type='application/json', **self.auth, ) self.assertEqual(resp.status_code, 201, resp.content) return resp.json()['id'] def test_create_event_emits_audit_row(self): event_id = self._create_event_id() self.assertTrue( AuditLog.objects.filter(action='event.created', target_id=str(event_id)).exists(), 'event create did not emit audit log', ) def test_update_event_emits_audit_row(self): event_id = self._create_event_id() AuditLog.objects.all().delete() resp = self.client.patch( f'/api/v1/events/{event_id}/update/', data={'title': 'Updated Title'}, content_type='application/json', **self.auth, ) self.assertEqual(resp.status_code, 200, resp.content) log = AuditLog.objects.filter(action='event.updated', target_id=str(event_id)).first() self.assertIsNotNone(log, 'event update did not emit audit log') self.assertIn('title', log.details.get('changed_fields', [])) def test_delete_event_emits_audit_row(self): event_id = self._create_event_id() AuditLog.objects.all().delete() resp = self.client.delete( f'/api/v1/events/{event_id}/delete/', **self.auth, ) self.assertEqual(resp.status_code, 204) self.assertTrue( AuditLog.objects.filter(action='event.deleted', target_id=str(event_id)).exists(), 'event delete did not emit audit log', )