Files
mgmt/backend/app/dashboard/routes.py
scott dca541be61 add audit log page, multi-select project assignment, and status banner
- Full audit log page (VELA-only) with search, filters, and pagination
- Sidebar nav item gated on global_role (VELA users)
- User detail "Add to Project" now supports multi-select with checkboxes
- Status banner system: admin-configurable messages in topbar with
  info/warning/critical types, cycling, dismiss, and expand for long text
- Banner management UI in admin settings page
- Fix table overflow in audit log with min-w-0 on app shell

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 16:48:35 -07:00

230 lines
8.3 KiB
Python

from datetime import date, datetime, timedelta, timezone
from flask import jsonify, request, session as flask_session
from sqlalchemy import func
from app.auth.decorators import login_required, _get_user_permissions
from app.extensions import db
from app.dashboard import dashboard_bp
from app.projects.models import Project
from app.licenses.models import License
from app.certs.models import Cert
from app.audit import AuditLog
from app.users.models import User, UserPermission
from app.settings.models import Setting
def _user_project_ids():
"""Return set of accessible project IDs for current user, or None for all."""
username = flask_session.get("user", "")
perms = _get_user_permissions(username)
if perms["global_role"]:
return None
ids = set()
for key in perms["projects"]:
p = Project.query.filter_by(key=key).first()
if p:
ids.add(p.id)
return ids
@dashboard_bp.route("/stats", methods=["GET"])
@login_required
def stats():
today = date.today()
license_days = Setting.get_int("license_expiry_threshold_days") or 30
cert_days = Setting.get_int("cert_expiry_threshold_days") or 30
license_threshold = today + timedelta(days=license_days)
cert_threshold = today + timedelta(days=cert_days)
proj_ids = _user_project_ids()
# Scoped project queries (licenses scoped to projects; certs are global)
proj_q = Project.query
lic_q = License.query
cert_q = Cert.query
if proj_ids is not None:
proj_q = proj_q.filter(Project.id.in_(proj_ids))
lic_q = lic_q.filter(db.or_(License.project_id.in_(proj_ids), License.project_id.is_(None)))
projects_count = proj_q.count()
onboarding_projects = proj_q.filter(Project.status == "onboarding").count()
licenses_count = lic_q.count()
certs_count = cert_q.count()
# Expiring licenses (within threshold, excluding archived)
expiring_licenses_30d = lic_q.filter(
License.archived == False,
License.expiration_date.isnot(None),
License.expiration_date >= today,
License.expiration_date <= license_threshold,
).count()
# Expiring certs (within threshold, excluding archived)
cert_threshold_dt = datetime.combine(cert_threshold, datetime.min.time()).replace(tzinfo=timezone.utc)
now_dt = datetime.now(timezone.utc)
expiring_certs_30d = cert_q.filter(
Cert.archived == False,
Cert.not_valid_after.isnot(None),
Cert.not_valid_after >= now_dt,
Cert.not_valid_after <= cert_threshold_dt,
).count()
# Expired counts (excluding archived)
expired_licenses = lic_q.filter(
License.archived == False,
License.expiration_date.isnot(None),
License.expiration_date < today,
).count()
expired_certs = cert_q.filter(
Cert.archived == False,
Cert.not_valid_after.isnot(None),
Cert.not_valid_after < now_dt,
).count()
# User sponsorship stats — uses KC sponsor attribute via unsponsored_since
total_users = User.query.count()
# "Unsponsored" = users with unsponsored_since set (sponsor was cleared)
unsponsored_users = User.query.filter(User.unsponsored_since.isnot(None)).count()
# "Sponsored" = total minus unsponsored (users with a sponsor attribute set)
sponsored_users = total_users - unsponsored_users
return jsonify({
"projects_count": projects_count,
"onboarding_projects": onboarding_projects,
"total_users": total_users,
"sponsored_users": sponsored_users,
"unsponsored_users": unsponsored_users,
"licenses_count": licenses_count,
"certs_count": certs_count,
"expiring_licenses_30d": expiring_licenses_30d,
"expiring_certs_30d": expiring_certs_30d,
"expired_licenses": expired_licenses,
"expired_certs": expired_certs,
}), 200
@dashboard_bp.route("/expiring", methods=["GET"])
@login_required
def expiring():
today = date.today()
license_days = Setting.get_int("license_expiry_threshold_days") or 30
cert_days = Setting.get_int("cert_expiry_threshold_days") or 30
license_threshold = today + timedelta(days=license_days)
cert_threshold = today + timedelta(days=cert_days)
cert_threshold_dt = datetime.combine(cert_threshold, datetime.min.time()).replace(tzinfo=timezone.utc)
now_dt = datetime.now(timezone.utc)
proj_ids = _user_project_ids()
lic_q = License.query.filter(
License.archived == False,
License.expiration_date.isnot(None),
License.expiration_date >= today,
License.expiration_date <= license_threshold,
)
cert_q = Cert.query.filter(
Cert.archived == False,
Cert.not_valid_after.isnot(None),
Cert.not_valid_after >= now_dt,
Cert.not_valid_after <= cert_threshold_dt,
)
if proj_ids is not None:
lic_q = lic_q.filter(db.or_(License.project_id.in_(proj_ids), License.project_id.is_(None)))
licenses = lic_q.order_by(License.expiration_date.asc()).limit(3).all()
certs = cert_q.order_by(Cert.not_valid_after.asc()).limit(3).all()
# Next expiring item outside the "expiring soon" window (for empty-state message)
next_lic_q = License.query.filter(
License.archived == False,
License.expiration_date.isnot(None),
License.expiration_date > license_threshold,
)
next_cert_q = Cert.query.filter(
Cert.archived == False,
Cert.not_valid_after.isnot(None),
Cert.not_valid_after > cert_threshold_dt,
)
if proj_ids is not None:
next_lic_q = next_lic_q.filter(db.or_(License.project_id.in_(proj_ids), License.project_id.is_(None)))
next_lic = next_lic_q.order_by(License.expiration_date.asc()).first()
next_cert = next_cert_q.order_by(Cert.not_valid_after.asc()).first()
return jsonify({
"licenses": [l.to_dict() for l in licenses],
"certs": [c.to_dict() for c in certs],
"next_license_expiry": next_lic.expiration_date.isoformat() if next_lic else None,
"next_cert_expiry": next_cert.not_valid_after.isoformat() if next_cert else None,
}), 200
@dashboard_bp.route("/audit-log", methods=["GET"])
@login_required
def audit_log():
username = flask_session.get("user", "")
perms = _get_user_permissions(username)
if not perms["global_role"]:
return jsonify({"error": "Forbidden"}), 403
page = request.args.get("page", 1, type=int)
per_page = request.args.get("per_page", 50, type=int)
search = request.args.get("search", "").strip()
entity_type = request.args.get("entity_type", "").strip()
action = request.args.get("action", "").strip()
user_filter = request.args.get("user", "").strip()
query = AuditLog.query.order_by(AuditLog.created_at.desc())
if search:
like = f"%{search}%"
query = query.filter(db.or_(
AuditLog.user.ilike(like),
AuditLog.entity_type.ilike(like),
AuditLog.action.ilike(like),
AuditLog.details.ilike(like),
))
if entity_type:
query = query.filter(AuditLog.entity_type == entity_type)
if action:
query = query.filter(AuditLog.action == action)
if user_filter:
query = query.filter(AuditLog.user.ilike(f"%{user_filter}%"))
total = query.count()
entries = query.offset((page - 1) * per_page).limit(per_page).all()
return jsonify({
"entries": [e.to_dict() for e in entries],
"total": total,
"page": page,
"per_page": per_page,
}), 200
@dashboard_bp.route("/activity", methods=["GET"])
@login_required
def activity():
proj_ids = _user_project_ids()
query = AuditLog.query.order_by(AuditLog.created_at.desc())
if proj_ids is not None:
# Filter to entries about accessible projects
accessible_keys = [
p.key for p in Project.query.filter(Project.id.in_(proj_ids)).all()
] if proj_ids else []
if accessible_keys:
# Include project entries and project_user entries for accessible projects
project_filter = db.or_(
AuditLog.entity_type.in_(["project", "project_user"]),
AuditLog.entity_type.in_(["user", "license", "cert", "feedback"]),
)
query = query.filter(project_filter)
entries = query.limit(10).all()
return jsonify({"entries": [e.to_dict() for e in entries]}), 200