fix(scripts): implement /sop-n/a N/A declarations + ci-drift polling sentinel #1192

Closed
core-devops wants to merge 1 commits from infra/sop-n-a-plus-drift-fix into staging
2 changed files with 258 additions and 31 deletions
+51 -18
View File
@@ -234,20 +234,37 @@ def ci_job_names(ci_doc: dict) -> set[str]:
return names
def sentinel_needs(ci_doc: dict) -> set[str]:
def sentinel_needs(ci_doc: dict) -> tuple[set[str], bool]:
"""Return (needs_set, is_polling).
is_polling is True when the sentinel has no `needs:` key — indicating
it is a polling-based aggregator (polls /statuses/{sha} rather than
using GHA `needs:` dependencies). F1/F1b checks are skipped for
polling sentinels because `jobs - needs` would always fire false
positives: a polling sentinel intentionally gates all CI jobs
without listing them in `needs:`.
"""
sentinel = ci_doc.get("jobs", {}).get(SENTINEL_JOB)
if not isinstance(sentinel, dict):
sys.stderr.write(
f"::error::sentinel job '{SENTINEL_JOB}' not found in {CI_WORKFLOW_PATH}\n"
)
sys.exit(3)
needs = sentinel.get("needs", [])
if isinstance(needs, str):
needs = [needs]
if not isinstance(needs, list):
# A polling sentinel explicitly omits `needs:` and uses a run: step
# that independently polls statuses. A `needs:`-based sentinel always
# has that key (possibly empty). We check for presence of the key,
# not just whether the list is empty — an empty list is a legitimate
# (if unusual) dependency sentinel.
sentinel_keys = sentinel if isinstance(sentinel, dict) else {}
has_needs_key = "needs" in sentinel_keys
needs_raw = sentinel_keys.get("needs", [])
if isinstance(needs_raw, str):
needs_raw = [needs_raw]
if not isinstance(needs_raw, list):
sys.stderr.write("::error::sentinel `needs:` is neither list nor string\n")
sys.exit(3)
return set(needs)
is_polling = not has_needs_key
return set(needs_raw), is_polling
def required_checks_env(audit_doc: dict) -> set[str]:
@@ -328,7 +345,7 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
jobs = ci_job_names(ci_doc)
jobs_all = ci_jobs_all(ci_doc)
needs = sentinel_needs(ci_doc)
needs, is_polling_sentinel = sentinel_needs(ci_doc)
env_set = required_checks_env(audit_doc)
# Protection
@@ -369,6 +386,7 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
"branch": branch,
"ci_jobs": sorted(jobs),
"sentinel_needs": sorted(needs),
"is_polling_sentinel": is_polling_sentinel,
"protection_contexts_skipped": True,
"protection_http_status": http_status,
"audit_env_checks": sorted(env_set),
@@ -383,23 +401,37 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
sys.exit(4)
contexts = set(protection.get("status_check_contexts") or [])
# ----- F1: job exists in CI but not under sentinel.needs -----
missing_from_needs = sorted(jobs - needs)
if missing_from_needs:
# ----- Polling sentinel: skip F1/F1b (no `needs:` — gates jobs directly) -----
if is_polling_sentinel:
findings.append(
"F1 — jobs in ci.yml NOT under sentinel `needs:` (sentinel doesn't gate them):\n"
+ "\n".join(f" - {n}" for n in missing_from_needs)
"NOTE — sentinel is polling-based (no `needs:` key). "
"F1/F1b checks skipped: a polling sentinel gates all CI jobs "
"without listing them in `needs:`. This is architecturally "
"correct — no drift. Close this issue if the finding is only "
"the F1 NOTE above."
)
# ----- F1: job exists in CI but not under sentinel.needs -----
# Skipped for polling sentinels: they gate all jobs without `needs:`.
if not is_polling_sentinel:
missing_from_needs = sorted(jobs - needs)
if missing_from_needs:
findings.append(
"F1 — jobs in ci.yml NOT under sentinel `needs:` (sentinel doesn't gate them):\n"
+ "\n".join(f" - {n}" for n in missing_from_needs)
)
# ----- F1b: needs lists a job that doesn't exist (typo) -----
# Compare against jobs_all (incl. event-gated jobs); a typo is a
# typo regardless of `if:` gating.
stale_needs = sorted(needs - jobs_all)
if stale_needs:
findings.append(
"F1b — sentinel `needs:` lists jobs NOT present in ci.yml (typo or removed job):\n"
+ "\n".join(f" - {n}" for n in stale_needs)
)
# Skipped for polling sentinels: they have no `needs:`.
if not is_polling_sentinel:
stale_needs = sorted(needs - jobs_all)
if stale_needs:
findings.append(
"F1b — sentinel `needs:` lists jobs NOT present in ci.yml (typo or removed job):\n"
+ "\n".join(f" - {n}" for n in stale_needs)
)
# ----- F2: protection context has no emitting job -----
# Compute the contexts the CI YAML actually produces. The sentinel
@@ -445,6 +477,7 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
"branch": branch,
"ci_jobs": sorted(jobs),
"sentinel_needs": sorted(needs),
"is_polling_sentinel": is_polling_sentinel,
"protection_contexts": sorted(contexts),
"audit_env_checks": sorted(env_set),
"expected_contexts": sorted(emitted_contexts),
+207 -13
View File
@@ -102,7 +102,7 @@ def normalize_slug(raw: str, numeric_aliases: dict[int, str] | None = None) -> s
# ---------------------------------------------------------------------------
# Comment parsing — /sop-ack and /sop-revoke
# Comment parsing — /sop-ack, /sop-revoke, and /sop-n/a
# ---------------------------------------------------------------------------
# A directive must be on its own line. Permits leading whitespace.
@@ -114,21 +114,33 @@ _DIRECTIVE_RE = re.compile(
re.MULTILINE,
)
# /sop-n/a <gate> [reason] — declare a qa/sec gate N/A.
# Gate names: qa-review, security-review (match review-check.sh context names).
_NA_DIRECTIVE_RE = re.compile(
r"^[ \t]*/sop-n/a[ \t]+([A-Za-z0-9_\-]+)(?:[ \t]+(.*))?[ \t]*$",
re.MULTILINE,
)
def parse_directives(
comment_body: str,
numeric_aliases: dict[int, str],
) -> list[tuple[str, str, str]]:
"""Extract /sop-ack and /sop-revoke directives from a comment body.
) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]]]:
"""Extract /sop-ack, /sop-revoke, and /sop-n/a directives from a comment body.
Returns a list of (kind, canonical_slug, note) tuples where:
kind is "sop-ack" or "sop-revoke"
canonical_slug is the normalized form (or "" if unparseable)
note is the trailing free-text (may be "")
Returns (directives, na_directives) where:
directives is a list of (kind, canonical_slug, note) tuples
kind is "sop-ack" or "sop-revoke"
canonical_slug is the normalized form (or "" if unparseable)
note is the trailing free-text (may be "")
na_directives is a list of (gate_name, reason) tuples
gate_name is "qa-review" or "security-review" (raw from comment)
reason is the free-text after the gate name (may be "")
"""
out: list[tuple[str, str, str]] = []
na_out: list[tuple[str, str, str]] = []
if not comment_body:
return out
return out, na_out
for m in _DIRECTIVE_RE.finditer(comment_body):
kind = m.group(1)
raw_slug = (m.group(2) or "").strip()
@@ -159,7 +171,11 @@ def parse_directives(
# If we collapsed multi-word slug into kebab and there's a
# trailing-text group too, append it.
out.append((kind, canonical, note_from_group))
return out
for m in _NA_DIRECTIVE_RE.finditer(comment_body):
gate_raw = (m.group(1) or "").strip()
reason = (m.group(2) or "").strip()
na_out.append((gate_raw.lower(), reason))
return out, na_out
# ---------------------------------------------------------------------------
@@ -249,7 +265,8 @@ def compute_ack_state(
user = (c.get("user") or {}).get("login", "")
if not user:
continue
for kind, slug, _note in parse_directives(body, numeric_aliases):
directives, _na = parse_directives(body, numeric_aliases)
for kind, slug, _note in directives:
if not slug:
unparseable_per_user[user] = unparseable_per_user.get(user, 0) + 1
continue
@@ -301,6 +318,82 @@ def compute_ack_state(
}
def compute_na_state(
comments: list[dict[str, Any]],
pr_author: str,
na_gates: dict[str, dict[str, Any]],
team_membership_probe: "callable[[str, list[str]], list[str]]",
) -> dict[str, dict[str, Any]]:
"""Compute per-gate N/A declaration state.
Each comment is processed in chronological order. The most-recent
N/A directive per (commenter, gate) wins.
Returns a dict keyed by gate name:
{
"qa-review": {
"declared": True,
"declared_by": "core-qa-agent",
"reason": "CI/non-security-touching",
"valid": True, # non-author + in required team
"error": None, # error string if invalid
},
...
}
Undeclared gates have declared=False; invalid gates have declared=True, valid=False.
"""
# Step 1: collapse N/A directives per (commenter, gate) — most recent wins.
latest_na: dict[tuple[str, str], tuple[str, str]] = {}
for c in comments:
body = c.get("body", "") or ""
user = (c.get("user") or {}).get("login", "")
if not user:
continue
_, na_directives = parse_directives(body, {})
for gate, reason in na_directives:
if gate not in na_gates:
continue
latest_na[(user, gate)] = (gate, reason)
# Step 2: initialise all gates as undeclared.
result: dict[str, dict[str, Any]] = {
g: {"declared": False, "declared_by": "", "reason": "", "valid": False, "error": None}
for g in na_gates
}
# Step 3: evaluate each gate's most-recent N/A declaration.
for (user, gate), (gate_name, reason) in latest_na.items():
if gate_name not in na_gates:
continue
cfg = na_gates[gate_name]
required_teams: list[str] = cfg.get("required_teams", [])
entry: dict[str, Any] = {
"declared": True,
"declared_by": user,
"reason": reason,
"valid": False,
"error": None,
}
# Authors cannot self-declare N/A (gate script enforces same rule).
if user == pr_author:
entry["error"] = "self-declare N/A rejected"
else:
# Probe team membership: is the declarer in any required team?
approved = team_membership_probe(f"na:{gate_name}", [user])
if user in approved:
entry["valid"] = True
else:
# 403 from team API means token owner not in that team.
# Fail-closed: treat unknown membership as invalid.
entry["error"] = f"{user} not in required team {required_teams}"
result[gate_name] = entry
return result
# ---------------------------------------------------------------------------
# Gitea API client
# ---------------------------------------------------------------------------
@@ -460,10 +553,29 @@ def _load_config_minimal(path: str) -> dict[str, Any]:
tier_failure_mode), top-level list of maps (items:), and within an
item map: scalars + lists of scalars. Does NOT support nested lists,
YAML anchors, multi-doc, or flow style.
Key names containing '/' (e.g. n/a_gates) are handled by using
rpartition(':') — splitting at the LAST colon so embedded colons
in the key are preserved.
"""
with open(path) as f:
lines = f.readlines()
return _parse_minimal_yaml(lines)
# Preprocess: for lines at indent 0 that contain '/' before ':',
# use rpartition so the key keeps the '/'. e.g.
# "n/a_gates:" → key="n/a_gates", val=""
# "n/a_gates: value" → key="n/a_gates", val="value"
processed: list[str] = []
for raw in lines:
stripped = raw.rstrip("\n")
indent = len(stripped) - len(stripped.lstrip(" "))
content = stripped.lstrip(" ")
if indent == 0 and "/" in content and ":" in content:
# Use rpartition so the last ':' is the key-value separator.
key, _, val = content.rpartition(":")
processed.append(" " * indent + key.strip() + ": " + val.strip())
else:
processed.append(stripped)
return _parse_minimal_yaml(processed)
def _parse_minimal_yaml(lines: list[str]) -> dict[str, Any]: # noqa: C901
@@ -800,15 +912,97 @@ def main(argv: list[str] | None = None) -> int:
extra = " (" + "; ".join(extras) + ")" if extras else ""
print(f"::notice:: [WAIT] {slug} — no valid peer-ack yet{extra}")
# ----- N/A gate declarations (RFC#324 §N/A follow-up) -----
# sop-checklist.yml fires on /sop-n/a comments; this step posts the
# `sop-checklist / na-declarations (pull_request)` status that
# review-check.sh reads to waive the Gitea-APPROVE requirement.
na_gates: dict[str, Any] = cfg.get("n/a_gates") or {}
# Build a team-membership probe for N/A gates (separate cache from items probe).
na_cache: dict[tuple[str, int], bool | None] = {}
def na_probe(slug_hint: str, users: list[str]) -> list[str]:
# slug_hint is "na:{gate_name}" — extract gate name and required teams.
gate_name = slug_hint.removeprefix("na:")
gate_cfg = na_gates.get(gate_name, {})
team_names: list[str] = gate_cfg.get("required_teams", [])
# Resolve team names → ids.
team_ids: list[int] = []
for tn in team_names:
tid = client.resolve_team_id(args.owner, tn) # noqa: SLF001
if tid is None:
code, data = client._req( # noqa: SLF001
"GET", f"/orgs/{args.owner}/teams"
)
if code == 200 and isinstance(data, list):
for t in data:
if t.get("name") == tn:
tid = t.get("id")
client._team_id_cache[(args.owner, tn)] = tid # noqa: SLF001
break
if tid is not None:
team_ids.append(tid)
approved: list[str] = []
for u in users:
for tid in team_ids:
ck = (u, tid)
if ck not in na_cache:
na_cache[ck] = client.is_team_member(tid, u) # noqa: SLF001
res = na_cache[ck]
if res is True:
approved.append(u)
break
if res is None:
print(
f"::warning::team-probe for {u} (N/A gate {gate_name}) "
"returned 403 — token owner not in that team; "
"fail-closed for this declaration",
file=sys.stderr,
)
return approved
na_state = compute_na_state(comments, author, na_gates, na_probe)
# Build description: list of validly-declared N/A gates.
na_approved_gates = [
g for g, entry in na_state.items() if entry["valid"]
]
na_invalid = [
f"{g}({entry['declared_by']})" for g, entry in na_state.items()
if entry["declared"] and not entry["valid"]
]
if na_approved_gates:
na_desc = "N/A: " + ", ".join(na_approved_gates)
elif na_invalid:
na_desc = "invalid N/A: " + ", ".join(na_invalid)
else:
na_desc = "no N/A declarations"
na_state_str = "success" if na_approved_gates else "failure"
print(f"::notice:: N/A state: {na_state_str}{na_desc}")
for g, entry in na_state.items():
if entry["declared"]:
status_flag = "valid" if entry["valid"] else f"invalid: {entry['error']}"
print(f"::notice:: {g}: declared by {entry['declared_by']}{status_flag}")
if not args.dry_run:
na_context = "sop-checklist / na-declarations (pull_request)"
client.post_status(
args.owner, args.repo, head_sha,
state=na_state_str, context=na_context,
description=na_desc, target_url=target_url,
)
print(f"::notice::status posted: {na_context}{na_state_str}")
# ----- end N/A gate declarations -----
print(f"::notice::posting status: state={state} desc={description!r}")
target_url = f"https://{args.gitea_host}/{args.owner}/{args.repo}/pulls/{args.pr}"
if args.dry_run:
print("::notice::--dry-run: not posting status")
if args.exit_on_state:
return 0 if state in ("success", "pending") else 1
return 0
target_url = f"https://{args.gitea_host}/{args.owner}/{args.repo}/pulls/{args.pr}"
client.post_status(
args.owner, args.repo, head_sha,
state=state, context=args.status_context,