Files
eventify_backend/admin_api/tests.py

232 lines
8.4 KiB
Python
Raw Normal View History

"""Tests for the Audit Log module (admin_api v1.12.0).
Covers:
* `AuditLogListView` list + search + filter shape
* `AuditLogMetricsView` shape + `?nocache=1` bypass
* `UserStatusView` emits a row into `AuditLog` for every status change,
inside the same transaction as the state change
Run with:
python manage.py test admin_api.tests
"""
from __future__ import annotations
from django.core.cache import cache
from django.test import TestCase
from rest_framework_simplejwt.tokens import RefreshToken
from accounts.models import User
from admin_api.models import AuditLog
# ---------------------------------------------------------------------------
# Base — auth helper shared across cases
# ---------------------------------------------------------------------------
class _AuditTestBase(TestCase):
"""Gives each subclass an admin user + pre-issued JWT."""
@classmethod
def setUpTestData(cls):
cls.admin = User.objects.create_user(
username='audit.admin@eventifyplus.com',
email='audit.admin@eventifyplus.com',
password='irrelevant',
role='admin',
)
cls.admin.is_superuser = True
cls.admin.save()
def setUp(self):
# Metrics view caches by key; reset to keep cases independent.
cache.delete('admin_api:audit_log:metrics:v1')
access = str(RefreshToken.for_user(self.admin).access_token)
self.auth = {'HTTP_AUTHORIZATION': f'Bearer {access}'}
# ---------------------------------------------------------------------------
# AuditLogListView
# ---------------------------------------------------------------------------
class AuditLogListViewTests(_AuditTestBase):
url = '/api/v1/rbac/audit-log/'
def test_unauthenticated_returns_401(self):
resp = self.client.get(self.url)
self.assertEqual(resp.status_code, 401)
def test_authenticated_returns_paginated_shape(self):
AuditLog.objects.create(
user=self.admin,
action='user.suspended',
target_type='user',
target_id='42',
details={'reason': 'spam'},
)
resp = self.client.get(self.url, **self.auth)
self.assertEqual(resp.status_code, 200, resp.content)
body = resp.json()
for key in ('results', 'total', 'page', 'page_size', 'total_pages'):
self.assertIn(key, body)
self.assertEqual(body['total'], 1)
self.assertEqual(body['results'][0]['action'], 'user.suspended')
self.assertEqual(body['results'][0]['user']['email'], self.admin.email)
def test_search_narrows_results(self):
AuditLog.objects.create(
user=self.admin, action='user.suspended',
target_type='user', target_id='1', details={},
)
AuditLog.objects.create(
user=self.admin, action='event.approved',
target_type='event', target_id='1', details={},
)
resp = self.client.get(self.url, {'search': 'suspend'}, **self.auth)
self.assertEqual(resp.status_code, 200)
body = resp.json()
self.assertEqual(body['total'], 1)
self.assertEqual(body['results'][0]['action'], 'user.suspended')
def test_page_size_is_bounded(self):
# page_size=999 must be clamped to the 200-row upper bound.
resp = self.client.get(self.url, {'page_size': '999'}, **self.auth)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json()['page_size'], 200)
def test_invalid_pagination_falls_back_to_defaults(self):
resp = self.client.get(self.url, {'page': 'x', 'page_size': 'y'}, **self.auth)
self.assertEqual(resp.status_code, 200)
body = resp.json()
self.assertEqual(body['page'], 1)
self.assertEqual(body['page_size'], 50)
# ---------------------------------------------------------------------------
# AuditLogMetricsView
# ---------------------------------------------------------------------------
class AuditLogMetricsViewTests(_AuditTestBase):
url = '/api/v1/rbac/audit-log/metrics/'
def test_unauthenticated_returns_401(self):
resp = self.client.get(self.url)
self.assertEqual(resp.status_code, 401)
def test_returns_expected_shape(self):
AuditLog.objects.create(
user=self.admin, action='event.approved',
target_type='event', target_id='7', details={},
)
AuditLog.objects.create(
user=self.admin, action='department.created',
target_type='department', target_id='3', details={},
)
resp = self.client.get(self.url, **self.auth)
self.assertEqual(resp.status_code, 200, resp.content)
body = resp.json()
for key in ('total', 'today', 'week', 'distinct_users', 'by_action_group'):
self.assertIn(key, body)
self.assertEqual(body['total'], 2)
self.assertEqual(body['distinct_users'], 1)
# Each group present so frontend tooltip can render all 6 rows.
for group in ('create', 'update', 'delete', 'moderate', 'auth', 'other'):
self.assertIn(group, body['by_action_group'])
self.assertEqual(body['by_action_group']['moderate'], 1)
self.assertEqual(body['by_action_group']['create'], 1)
def test_nocache_bypasses_stale_cache(self):
# Prime cache with a fake payload.
cache.set(
'admin_api:audit_log:metrics:v1',
{
'total': 999,
'today': 0, 'week': 0, 'distinct_users': 0,
'by_action_group': {
'create': 0, 'update': 0, 'delete': 0,
'moderate': 0, 'auth': 0, 'other': 0,
},
},
60,
)
resp_cached = self.client.get(self.url, **self.auth)
self.assertEqual(resp_cached.json()['total'], 999)
resp_fresh = self.client.get(self.url, {'nocache': '1'}, **self.auth)
self.assertEqual(resp_fresh.json()['total'], 0) # real DB state
# ---------------------------------------------------------------------------
# UserStatusView — audit emission
# ---------------------------------------------------------------------------
class UserStatusAuditEmissionTests(_AuditTestBase):
"""Each status transition must leave a matching row in `AuditLog`.
The endpoint wraps the state change + audit log in `transaction.atomic()`
so the two can never disagree. These assertions catch regressions where a
new branch forgets the audit call.
"""
def _url(self, user_id: int) -> str:
return f'/api/v1/users/{user_id}/status/'
def setUp(self):
super().setUp()
self.target = User.objects.create_user(
username='target@example.com',
email='target@example.com',
password='irrelevant',
role='customer',
)
def test_suspend_emits_audit_row(self):
resp = self.client.patch(
self._url(self.target.id),
data={'action': 'suspend', 'reason': 'spam flood'},
content_type='application/json',
**self.auth,
)
self.assertEqual(resp.status_code, 200, resp.content)
log = AuditLog.objects.filter(
action='user.suspended', target_id=str(self.target.id),
).first()
self.assertIsNotNone(log, 'suspend did not emit audit log')
self.assertEqual(log.details.get('reason'), 'spam flood')
def test_ban_emits_audit_row(self):
resp = self.client.patch(
self._url(self.target.id),
data={'action': 'ban'},
content_type='application/json',
**self.auth,
)
self.assertEqual(resp.status_code, 200, resp.content)
self.assertTrue(
AuditLog.objects.filter(
action='user.banned', target_id=str(self.target.id),
).exists(),
'ban did not emit audit log',
)
def test_reinstate_emits_audit_row(self):
self.target.is_active = False
self.target.save()
resp = self.client.patch(
self._url(self.target.id),
data={'action': 'reinstate'},
content_type='application/json',
**self.auth,
)
self.assertEqual(resp.status_code, 200, resp.content)
self.assertTrue(
AuditLog.objects.filter(
action='user.reinstated', target_id=str(self.target.id),
).exists(),
'reinstate did not emit audit log',
)