Files
mgmt/ldap-gateway/tests/test_directory_search.py
scott f4bd03dc52 Add read-only LDAP gateway microservice backed by Keycloak
New ldap-gateway/ service (Twisted + ldaptor) that exposes Keycloak users
and groups over LDAP v3 for legacy apps and Linux hosts (SSSD/NSS).

Design (see ldap-gateway/SPEC.md):
- Reads directly from the KC Admin API with an in-memory TTL cache +
  background refresh; fully stateless, no DB or queue.
- Service-bind read-only: only configured service accounts may bind; no
  end-user password auth. Writes return unwillingToPerform.
- Serves POSIX + inetOrgPerson entries (uid/gid/home, memberOf, group
  memberUid/member). UID/GID from a KC custom attribute else derived from
  the stable KC UUID.
- Pluggable IdentityResolver (LDAP_IDENTITY_SOURCE): username today, a
  cert_cn strategy stubbed for the future cert-CN-as-identity direction.

Build/deploy:
- build.sh builds and (optionally) pushes mgmt-ldap-gateway alongside
  backend/frontend.
- Helm: ldap-gateway deployment/service/configmap, gated by
  ldapGateway.enabled (off by default), reusing the shared KC secret.

Verified end to end against a live Keycloak: service bind, user/group
search, anonymous + bad-password denial, write rejection; container image
builds, runs as non-root, warms from KC, and serves searches. 125 tests
pass; ruff clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 10:37:20 -07:00

255 lines
8.9 KiB
Python

"""Tests for :mod:`ldap_gateway.directory` — tree build + search behaviour.
Builds a tree from a synthetic :class:`~ldap_gateway.cache.DirectorySnapshot` (no
network) and asserts the DIT shape, search results, group membership, exclusion of
filtered users, and the empty-then-swapped behaviour of :class:`DirectoryHolder`.
ldaptor's in-memory ``search``/``lookup``/``subtree`` fire their Deferreds and invoke
their callbacks synchronously for an in-memory tree, so we collect results via a
callback and read them straight away without pytest-twisted.
"""
from __future__ import annotations
from typing import Any
from ldap_gateway.cache import DirectorySnapshot
from ldap_gateway.directory import DirectoryHolder, build_tree
from ldap_gateway.identity import get_identity_resolver
from .conftest import make_config
# ---------------------------------------------------------------------------
# Synthetic KC fixtures + ldaptor sync helpers
# ---------------------------------------------------------------------------
def _user(
user_id: str,
username: str,
*,
first: str = "",
last: str = "",
email: str = "",
enabled: bool = True,
attributes: dict[str, list[str]] | None = None,
) -> dict[str, Any]:
return {
"id": user_id,
"username": username,
"firstName": first,
"lastName": last,
"email": email,
"enabled": enabled,
"attributes": attributes or {},
}
def _group(group_id: str, name: str) -> dict[str, Any]:
return {"id": group_id, "name": name, "attributes": {}}
def _make_snapshot() -> DirectorySnapshot:
alice = _user("u-alice", "alice", first="Alice", last="Anderson", email="a@x.com")
bob = _user("u-bob", "bob", first="Bob", last="Baker", email="b@x.com")
# Disabled user -> excluded by default.
carol = _user("u-carol", "carol", first="Carol", last="C", enabled=False)
# Service-account user -> excluded by default.
svc = _user("u-svc", "service-account-app", first="Svc", last="Acct")
devs = _group("g-devs", "ACME-bitbucket-read")
empties = _group("g-empty", "ACME-empty")
return DirectorySnapshot(
users=[alice, bob, carol, svc],
groups=[devs, empties],
members_by_group={
# devs contains alice + bob, plus the excluded carol (must be dropped).
"g-devs": [alice, bob, carol],
"g-empty": [],
},
)
def _search(entry, **kwargs) -> list:
"""Run an ldaptor in-memory search synchronously, returning matched entries."""
found: list = []
entry.search(callback=found.append, **kwargs)
return found
def _lookup(root, dn: str):
"""Look up a DN synchronously; return the entry or ``None`` if not found."""
box: list = []
d = root.lookup(dn)
d.addCallbacks(box.append, lambda _f: None)
return box[0] if box else None
def _values(entry, attr: str) -> set[str]:
"""Return the value set of an attribute on an entry (empty set if absent)."""
if attr not in entry:
return set()
return {str(v) for v in entry.get(attr)}
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
def test_root_and_ou_structure() -> None:
config = make_config()
resolver = get_identity_resolver(config)
root = build_tree(_make_snapshot(), config, resolver)
assert root.dn.getText() == "dc=mgmt,dc=example,dc=com"
assert _values(root, "objectClass") >= {"dcObject", "organization"}
assert _values(root, "dc") == {"mgmt"}
assert _values(root, "o") == {"mgmt"}
base = config.base_dn
assert _lookup(root, f"ou=people,{base}") is not None
assert _lookup(root, f"ou=groups,{base}") is not None
assert _lookup(root, f"ou=services,{base}") is not None
def test_user_entries_exist_at_expected_dns() -> None:
config = make_config()
resolver = get_identity_resolver(config)
root = build_tree(_make_snapshot(), config, resolver)
base = config.base_dn
alice = _lookup(root, f"uid=alice,ou=people,{base}")
assert alice is not None
assert _values(alice, "uid") == {"alice"}
assert _values(alice, "cn") == {"Alice Anderson"}
assert _values(alice, "sn") == {"Anderson"}
assert _values(alice, "mail") == {"a@x.com"}
assert _values(alice, "entryUUID") == {"u-alice"}
# POSIX attrs always present.
assert _values(alice, "uidNumber")
assert _values(alice, "homeDirectory") == {"/home/alice"}
# userPassword must never be served.
assert "userPassword" not in alice
assert _lookup(root, f"uid=bob,ou=people,{base}") is not None
def test_search_by_uid_filter_returns_user() -> None:
config = make_config()
resolver = get_identity_resolver(config)
root = build_tree(_make_snapshot(), config, resolver)
results = _search(root, filterText="(uid=alice)")
dns = {e.dn.getText() for e in results}
assert dns == {"uid=alice,ou=people,dc=mgmt,dc=example,dc=com"}
def test_excluded_users_absent() -> None:
config = make_config()
resolver = get_identity_resolver(config)
root = build_tree(_make_snapshot(), config, resolver)
base = config.base_dn
# Disabled user.
assert _lookup(root, f"uid=carol,ou=people,{base}") is None
assert _search(root, filterText="(uid=carol)") == []
# Service-account user.
assert _lookup(root, f"uid=service-account-app,ou=people,{base}") is None
assert _search(root, filterText="(uid=service-account-app)") == []
def test_group_membership_uses_member_and_memberUid() -> None:
config = make_config()
resolver = get_identity_resolver(config)
root = build_tree(_make_snapshot(), config, resolver)
base = config.base_dn
devs = _lookup(root, f"cn=ACME-bitbucket-read,ou=groups,{base}")
assert devs is not None
# Only the included members (alice, bob); excluded carol dropped.
assert _values(devs, "memberUid") == {"alice", "bob"}
assert _values(devs, "member") == {
f"uid=alice,ou=people,{base}",
f"uid=bob,ou=people,{base}",
}
# Non-empty group gains groupOfNames.
assert _values(devs, "objectClass") >= {"posixGroup", "groupOfNames"}
# alice's memberOf reflects the group.
alice = _lookup(root, f"uid=alice,ou=people,{base}")
assert f"cn=ACME-bitbucket-read,ou=groups,{base}" in _values(alice, "memberOf")
def test_empty_group_has_no_groupOfNames() -> None:
config = make_config()
resolver = get_identity_resolver(config)
root = build_tree(_make_snapshot(), config, resolver)
base = config.base_dn
empty = _lookup(root, f"cn=ACME-empty,ou=groups,{base}")
assert empty is not None
assert _values(empty, "objectClass") == {"top", "posixGroup"}
assert "member" not in empty
assert "memberUid" not in empty
def test_service_account_entry_present_without_password() -> None:
svc_dn = "cn=svc-ro,ou=services,dc=mgmt,dc=example,dc=com"
config = make_config(service_accounts={svc_dn: "topsecret"})
resolver = get_identity_resolver(config)
root = build_tree(_make_snapshot(), config, resolver)
svc = _lookup(root, svc_dn)
assert svc is not None
assert _values(svc, "cn") == {"svc-ro"}
assert "userPassword" not in svc
# The configured password must not leak into any attribute.
for attr in svc.keys():
assert "topsecret" not in _values(svc, attr)
def test_directory_holder_starts_empty_then_reflects_swap() -> None:
config = make_config()
resolver = get_identity_resolver(config)
holder = DirectoryHolder(config, resolver)
base = config.base_dn
# Empty-but-valid before any refresh: root + OUs exist, no people.
root = holder.tree
assert root.dn.getText() == base
assert _lookup(root, f"ou=people,{base}") is not None
assert _search(root, filterText="(uid=alice)") == []
# After a swap the new tree is reflected.
holder.swap(_make_snapshot())
swapped = holder.tree
assert swapped is not root
assert _lookup(swapped, f"uid=alice,ou=people,{base}") is not None
assert {e.dn.getText() for e in _search(swapped, filterText="(uid=bob)")} == {
f"uid=bob,ou=people,{base}"
}
def test_skips_user_when_resolver_returns_none() -> None:
# cert_cn resolver returns None for users lacking the cert attribute.
config = make_config(identity_source="cert_cn", cert_cn_attribute="certCN")
resolver = get_identity_resolver(config)
base = config.base_dn
with_cn = _user("u-dave", "dave", first="Dave", attributes={"certCN": ["dave.cn"]})
without_cn = _user("u-erin", "erin", first="Erin")
snapshot = DirectorySnapshot(
users=[with_cn, without_cn],
groups=[],
members_by_group={},
)
root = build_tree(snapshot, config, resolver)
# cert_cn resolver uses cn= as the RDN attribute.
assert _lookup(root, f"cn=dave.cn,ou=people,{base}") is not None
# erin had no certCN attribute -> skipped entirely.
assert _search(root, filterText="(uid=erin)") == []