Files
mgmt/backend/app/users/routes.py
scott 79e71f0244 Add Keycloak Admin API client with route integration and admin discrepancy stub
Introduces a full Keycloak REST API client (app/keycloak.py) that syncs
user CRUD, project groups, and group membership to Keycloak. Operates in
disabled mode when KEYCLOAK_URL is unset. All local DB operations succeed
first; Keycloak failures are logged and surfaced as warnings, never blocking.
Users are disabled in Keycloak on delete, never removed. Adds a Coming Soon
discrepancy management section to the admin panel.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 16:47:32 -07:00

226 lines
7.2 KiB
Python

import logging
from datetime import datetime, timezone
from flask import request, jsonify
from app.auth.decorators import login_required
from app.audit import log_audit, AuditLog
from app.extensions import db
from app.keycloak import keycloak, KeycloakError
from app.users import users_bp
from app.users.models import User, ProjectUser
logger = logging.getLogger(__name__)
# ── User CRUD ──────────────────────────────────────────────────
@users_bp.route("", methods=["GET"])
@login_required
def list_users():
query = User.query
search = request.args.get("search", "").strip()
if search:
pattern = f"%{search}%"
query = query.filter(
db.or_(
User.name.ilike(pattern),
User.email.ilike(pattern),
)
)
sort_field = request.args.get("sort", "name")
order = request.args.get("order", "asc")
sortable = {
"name": User.name,
"email": User.email,
"created_at": User.created_at,
}
col = sortable.get(sort_field, User.name)
if order == "desc":
col = col.desc()
query = query.order_by(col)
users = query.all()
return jsonify({"users": [u.to_dict(include_projects=True) for u in users]}), 200
@users_bp.route("", methods=["POST"])
@login_required
def create_user():
data = request.get_json()
if not data:
return jsonify({"error": "Missing JSON body"}), 400
name = data.get("name", "").strip()
email = data.get("email", "").strip()
if not name or not email:
return jsonify({"error": "Fields 'name' and 'email' are required"}), 400
if User.query.filter_by(email=email).first():
return jsonify({"error": f"User with email '{email}' already exists"}), 400
user = User(
name=name,
email=email,
keycloak_id=data.get("keycloak_id"),
)
db.session.add(user)
db.session.commit()
log_audit("user", user.id, "created", {"name": user.name, "email": user.email})
# Sync to Keycloak (non-blocking — local user is already persisted)
kc_warning = None
if not user.keycloak_id:
try:
kc_id = keycloak.create_user(email=user.email, name=user.name)
if kc_id:
user.keycloak_id = kc_id
db.session.commit()
except KeycloakError as exc:
logger.warning("Keycloak sync failed on user create: %s", exc)
kc_warning = f"User created locally but Keycloak sync failed: {exc}"
result = user.to_dict(include_projects=True)
if kc_warning:
result["keycloak_warning"] = kc_warning
return jsonify(result), 201
@users_bp.route("/<int:user_id>", methods=["GET"])
@login_required
def get_user(user_id):
user = db.session.get(User, user_id)
if not user:
return jsonify({"error": "User not found"}), 404
return jsonify(user.to_dict(include_projects=True)), 200
@users_bp.route("/<int:user_id>", methods=["PUT"])
@login_required
def update_user(user_id):
user = db.session.get(User, user_id)
if not user:
return jsonify({"error": "User not found"}), 404
data = request.get_json()
if not data:
return jsonify({"error": "Missing JSON body"}), 400
changes = {}
for field in ("name", "email", "keycloak_id"):
if field in data:
old_val = getattr(user, field)
new_val = data[field]
setattr(user, field, new_val)
if old_val != new_val:
changes[field] = {"old": old_val, "new": new_val}
if "active" in data:
old_active = user.active
new_active = bool(data["active"])
user.active = new_active
if old_active != new_active:
changes["active"] = {"old": old_active, "new": new_active}
user.updated_at = datetime.now(timezone.utc)
db.session.commit()
log_audit("user", user.id, "updated", changes)
# Sync changes to Keycloak
kc_warning = None
if user.keycloak_id and changes:
try:
keycloak.update_user(
keycloak_id=user.keycloak_id,
email=user.email,
name=user.name,
enabled=user.active,
)
except KeycloakError as exc:
logger.warning("Keycloak sync failed on user update: %s", exc)
kc_warning = f"User updated locally but Keycloak sync failed: {exc}"
result = user.to_dict(include_projects=True)
if kc_warning:
result["keycloak_warning"] = kc_warning
return jsonify(result), 200
@users_bp.route("/<int:user_id>", methods=["DELETE"])
@login_required
def delete_user(user_id):
user = db.session.get(User, user_id)
if not user:
return jsonify({"error": "User not found"}), 404
user_name = user.name
keycloak_id = user.keycloak_id
# Remove all project memberships first
ProjectUser.query.filter_by(user_id=user_id).delete()
db.session.delete(user)
db.session.commit()
log_audit("user", user_id, "deleted", {"name": user_name})
# Disable in Keycloak (never delete — local data must persist)
kc_warning = None
if keycloak_id:
try:
keycloak.disable_user(keycloak_id)
except KeycloakError as exc:
logger.warning("Keycloak sync failed on user delete: %s", exc)
kc_warning = f"User deleted locally but Keycloak disable failed: {exc}"
result = {"message": f"User '{user_name}' deleted"}
if kc_warning:
result["keycloak_warning"] = kc_warning
return jsonify(result), 200
# ── User audit history ─────────────────────────────────────────
@users_bp.route("/<int:user_id>/history", methods=["GET"])
@login_required
def user_history(user_id):
"""Return audit log entries relevant to this user."""
user = db.session.get(User, user_id)
if not user:
return jsonify({"error": "User not found"}), 404
# Get entries where this user is the subject (entity_type=user, entity_id=user_id)
# or where the user is mentioned in details (project membership changes)
direct_entries = AuditLog.query.filter(
AuditLog.entity_type == "user",
AuditLog.entity_id == user_id,
)
# Also get project_user entries that reference this user
membership_entries = AuditLog.query.filter(
AuditLog.entity_type == "project_user",
AuditLog.details.contains(f'"user_id": {user_id}'),
)
entries = direct_entries.union(membership_entries).order_by(AuditLog.created_at.desc()).limit(50).all()
return jsonify({"entries": [e.to_dict() for e in entries]}), 200
# ── Project membership endpoints ───────────────────────────────
@users_bp.route("/<int:user_id>/projects", methods=["GET"])
@login_required
def user_projects(user_id):
"""List all project memberships for a user."""
user = db.session.get(User, user_id)
if not user:
return jsonify({"error": "User not found"}), 404
memberships = ProjectUser.query.filter_by(user_id=user_id).all()
return jsonify({"memberships": [m.to_dict() for m in memberships]}), 200