Compare commits

..

4 Commits

Author SHA1 Message Date
core-devops 65831c839e fix(queue): re-fetch PR head before merge to detect stale SHA
sop-checklist / all-items-acked (pull_request) [info tier:low] acked: 2/7 — missing: comprehensive-testing, local-postgres-e2e, staging-smoke, +2 — body-unfilled: comprehensive-testing, l
sop-checklist / na-declarations (pull_request) N/A: (none)
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 2s
cascade-list-drift-gate / check (pull_request) Failing after 3s
CI / Detect changes (pull_request) Successful in 4s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 9s
E2E API Smoke Test / detect-changes (pull_request) Successful in 7s
E2E Chat / detect-changes (pull_request) Successful in 5s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 6s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 4s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m10s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 3s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 54s
CI / Platform (Go) (pull_request) Successful in 4m48s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m10s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 54s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 4s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 3s
CI / Canvas (Next.js) (pull_request) Successful in 6m12s
gate-check-v3 / gate-check (pull_request) Successful in 3s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m0s
qa-review / approved (pull_request) Failing after 3s
security-review / approved (pull_request) Failing after 2s
sop-tier-check / tier-check (pull_request) Successful in 4s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 1m1s
CI / Python Lint & Test (pull_request) Successful in 6m37s
CI / all-required (pull_request) Successful in 6m38s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 2s
E2E Chat / E2E Chat (pull_request) Successful in 1s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 1s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 1s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 2s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
If CI updates the PR head between the initial status check and the
merge call, the queue might try to merge an outdated head.  Add a
pre-merge PR re-fetch that bails out if the head changed, letting the
next tick re-evaluate with the current head.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-17 04:06:22 +00:00
core-devops d342149646 fix(queue): handle Gitea empty-body 200 on merge endpoint
CI / Canvas (Next.js) (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Waiting to run
cascade-list-drift-gate / check (pull_request) Waiting to run
CI / all-required (pull_request) Waiting to run
CI / Detect changes (pull_request) Waiting to run
CI / Platform (Go) (pull_request) Waiting to run
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Waiting to run
lint-required-no-paths / lint-required-no-paths (pull_request) Waiting to run
CI / Shellcheck (E2E scripts) (pull_request) Waiting to run
CI / Canvas Deploy Reminder (pull_request) Blocked by required conditions
CI / Python Lint & Test (pull_request) Waiting to run
E2E API Smoke Test / detect-changes (pull_request) Waiting to run
E2E API Smoke Test / E2E API Smoke Test (pull_request) Blocked by required conditions
E2E Chat / detect-changes (pull_request) Waiting to run
E2E Chat / E2E Chat (pull_request) Blocked by required conditions
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Waiting to run
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Blocked by required conditions
Handlers Postgres Integration / detect-changes (pull_request) Waiting to run
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Blocked by required conditions
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Waiting to run
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Waiting to run
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Waiting to run
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Waiting to run
Runtime PR-Built Compatibility / detect-changes (pull_request) Waiting to run
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Blocked by required conditions
Secret scan / Scan diff for credential-shaped strings (pull_request) Waiting to run
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Waiting to run
gate-check-v3 / gate-check (pull_request) Waiting to run
qa-review / approved (pull_request) Waiting to run
security-review / approved (pull_request) Waiting to run
sop-checklist / all-items-acked (pull_request) Waiting to run
sop-tier-check / tier-check (pull_request) Waiting to run
Gitea's /pulls/{n}/merge returns HTTP 200 with an empty body on success.
The api() wrapper tries to json.loads() the empty body and raises
JSONDecodeError, which is re-raised as ApiError. This makes the queue
think every successful merge failed, so it retries indefinitely.

Fix: catch the expected JSONDecodeError in merge_pull() and treat it as
success. Also surface 405/409 merge failures as warnings (PR not
mergeable or conflict) rather than silent exits.

Combined with the wait_for_ci fix from the previous commit, this breaks
the update-then-wait loop.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-17 04:05:37 +00:00
core-devops 54907ee852 fix(queue): wait for CI after update instead of immediate re-check
Block internal-flavored paths / Block forbidden paths (pull_request) Waiting to run
cascade-list-drift-gate / check (pull_request) Waiting to run
CI / all-required (pull_request) Waiting to run
CI / Detect changes (pull_request) Waiting to run
CI / Platform (Go) (pull_request) Waiting to run
CI / Canvas (Next.js) (pull_request) Waiting to run
CI / Shellcheck (E2E scripts) (pull_request) Waiting to run
CI / Canvas Deploy Reminder (pull_request) Blocked by required conditions
CI / Python Lint & Test (pull_request) Waiting to run
E2E API Smoke Test / detect-changes (pull_request) Waiting to run
E2E API Smoke Test / E2E API Smoke Test (pull_request) Blocked by required conditions
E2E Chat / detect-changes (pull_request) Waiting to run
E2E Chat / E2E Chat (pull_request) Blocked by required conditions
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Waiting to run
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Blocked by required conditions
Handlers Postgres Integration / detect-changes (pull_request) Waiting to run
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Blocked by required conditions
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Waiting to run
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Waiting to run
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Waiting to run
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Waiting to run
lint-required-no-paths / lint-required-no-paths (pull_request) Waiting to run
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Waiting to run
Runtime PR-Built Compatibility / detect-changes (pull_request) Waiting to run
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Blocked by required conditions
Secret scan / Scan diff for credential-shaped strings (pull_request) Waiting to run
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Waiting to run
gate-check-v3 / gate-check (pull_request) Waiting to run
qa-review / approved (pull_request) Waiting to run
security-review / approved (pull_request) Waiting to run
sop-checklist / all-items-acked (pull_request) Waiting to run
sop-tier-check / tier-check (pull_request) Waiting to run
The queue was in an update-then-wait loop:
1. Queue updates PR → new CI run triggered on new head
2. Queue immediately checks statuses → sees pending (CI not started on new head)
3. Queue exits "wait"
4. Next tick: same cycle, CI never completes on any single head

Fix: after update_pull(), re-fetch the new head SHA and poll CI for
up to 5 min until required contexts reach terminal state. If CI
finishes within the window, merge on the same tick. If not, exit and
retry next tick.

Also adds `import time` required for the wait loop.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-17 04:03:11 +00:00
core-devops 99453c6a71 infra(ci): add concurrency blocks to 3 scheduled workflows
sop-checklist / all-items-acked (pull_request) [info tier:low] acked: 5/7 — missing: root-cause, no-backwards-compat
sop-checklist / na-declarations (pull_request) N/A: (none)
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 3s
CI / Detect changes (pull_request) Successful in 4s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 8s
CI / Platform (Go) (pull_request) Successful in 4m24s
E2E API Smoke Test / detect-changes (pull_request) Successful in 6s
E2E Chat / detect-changes (pull_request) Successful in 5s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 5s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 2s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 5s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m7s
CI / Canvas (Next.js) (pull_request) Successful in 6m4s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m1s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 4s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m8s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 7s
gate-check-v3 / gate-check (pull_request) Successful in 2s
sop-tier-check / tier-check (pull_request) Successful in 4s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 50s
CI / Python Lint & Test (pull_request) Successful in 6m28s
CI / all-required (pull_request) Successful in 6m22s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m9s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 3s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 2s
E2E Chat / E2E Chat (pull_request) Successful in 3s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 3s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 1s
qa-review / approved (pull_request) N/A declared by core-devops; qa-review waived per sop-checklist config
security-review / approved (pull_request) N/A declared by core-devops; security-review waived per sop-checklist config
Add per-SHA concurrency groups with cancel-in-progress: true to
scheduled workflows missing concurrency blocks:

- gate-check-v3.yml (hourly cron): prevents stale hourly runs from
  accumulating when new cron ticks fire
- secret-pattern-drift.yml (daily 05:00 UTC): same
- weekly-platform-go.yml (Mondays 04:17 UTC): same

These are lower-frequency than the sweep/minute-level workflows
but should still be covered for consistency and runner hygiene.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-17 02:47:52 +00:00
13 changed files with 211 additions and 962 deletions
+89 -1
View File
@@ -23,6 +23,7 @@ import dataclasses
import json
import os
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
@@ -326,6 +327,43 @@ def update_pull(pr_number: int, *, dry_run: bool) -> None:
)
def wait_for_ci(
head_sha: str,
contexts: list[str],
*,
max_wait_seconds: int = 300,
poll_interval: int = 15,
) -> bool:
"""Poll CI statuses for head_sha until all required contexts are terminal.
Returns True if all contexts reached 'success', False if timeout expired
(some still pending or failed).
Background: after a queue-triggered PR update, CI re-runs on the new head.
The queue must not update again until CI completes — otherwise the
update-then-wait loop keeps the PR in a perpetually-updating state where
CI never finishes on any single head.
"""
deadline = time.time() + max_wait_seconds
while time.time() < deadline:
time.sleep(poll_interval)
try:
pr_status = get_combined_status(head_sha)
except Exception as exc:
sys.stderr.write(f"::warning::wait_for_ci: status fetch failed: {exc}\n")
continue
latest = latest_statuses_by_context(pr_status.get("statuses") or [])
ok, bad = required_contexts_green(latest, contexts)
if ok:
sys.stderr.write(f"::notice::wait_for_ci: all contexts green after {int(time.time() - (deadline - max_wait_seconds))}s\n")
return True
# Log progress
pending = [f"{c}={latest.get(c, {}).get('status', 'missing')}" for c in contexts if latest.get(c, {}).get('status') != 'success']
sys.stderr.write(f"::notice::wait_for_ci: still waiting ({int(deadline - time.time())}s left): {', '.join(pending[:3])}\n")
sys.stderr.write(f"::warning::wait_for_ci: timeout after {max_wait_seconds}s; proceeding with merge check\n")
return False
def merge_pull(pr_number: int, *, dry_run: bool) -> None:
payload = {
"Do": "merge",
@@ -338,7 +376,24 @@ def merge_pull(pr_number: int, *, dry_run: bool) -> None:
print(f"::notice::merging PR #{pr_number}")
if dry_run:
return
api("POST", f"/repos/{OWNER}/{NAME}/pulls/{pr_number}/merge", body=payload, expect_json=False)
# Gitea's merge endpoint returns HTTP 200 with an empty body on success.
# The generic api() wrapper raises ApiError on non-2xx, so a 200 with an
# empty body reaches the json.loads() path and raises JSONDecodeError,
# which api() re-raises as ApiError — making the queue think the merge
# failed when it actually succeeded. Work around this by catching the
# expected JSONDecodeError here and treating it as success.
try:
api("POST", f"/repos/{OWNER}/{NAME}/pulls/{pr_number}/merge", body=payload, expect_json=False)
except ApiError as exc:
# Surface non-merge errors (5xx server errors, 403 forbidden, etc.)
if "merge" in str(exc).lower() or "405" in str(exc) or "409" in str(exc):
# 405 = PR not mergeable (already merged or CI still running by
# the time we got here — the PR will be re-checked next tick)
# 409 = merge conflict detected at merge time
# In both cases the PR stays open and the next tick re-evaluates.
sys.stderr.write(f"::warning::merge call returned: {exc}\n")
else:
raise
def process_once(*, dry_run: bool = False) -> int:
@@ -390,6 +445,32 @@ def process_once(*, dry_run: bool = False) -> int:
print(f"::notice::PR #{pr_number} decision={decision.action}: {decision.reason}")
if decision.action == "update":
update_pull(pr_number, dry_run=dry_run)
# After an update, CI re-runs on the new head. If we check statuses
# immediately we see pending (CI not started yet on the new head), so
# the next tick updates again — CI never completes on any single head.
# Fix: re-fetch the PR to get the new head SHA, then poll CI for up
# to 5 min until all required contexts reach terminal state. If CI
# finishes in time, proceed to merge on the same tick.
if not dry_run:
updated_pr = get_pull(pr_number)
new_head = updated_pr.get("head", {}).get("sha", "")
if new_head and new_head != head_sha:
sys.stderr.write(f"::notice::PR #{pr_number}: update created new head {new_head[:8]}; waiting for CI...\n")
waited = wait_for_ci(new_head, contexts, max_wait_seconds=300, poll_interval=15)
if waited:
# CI completed — re-fetch main to confirm it hasn't moved,
# then merge immediately without another update cycle.
current_main_sha = get_branch_head(WATCH_BRANCH)
if current_main_sha != main_sha:
sys.stderr.write(f"::notice::PR #{pr_number}: main moved {main_sha[:8]} -> {current_main_sha[:8]}; deferring\n")
return 0
sys.stderr.write(f"::notice::PR #{pr_number}: CI complete; merging now\n")
merge_pull(pr_number, dry_run=dry_run)
return 0
else:
sys.stderr.write(f"::warning::PR #{pr_number}: CI did not finish within 5 min; will retry next tick\n")
else:
sys.stderr.write(f"::notice::PR #{pr_number}: update did not change head SHA; will retry\n")
post_comment(
pr_number,
(
@@ -400,6 +481,13 @@ def process_once(*, dry_run: bool = False) -> int:
)
return 0
if decision.ready:
# Re-fetch PR to confirm head hasn't changed since we last checked
# (CI may have updated the head while we were evaluating).
current_pr = get_pull(pr_number)
current_head = current_pr.get("head", {}).get("sha", "")
if current_head != head_sha:
print(f"::notice::PR #{pr_number} head changed {head_sha[:8]} -> {current_head[:8]}; re-evaluating")
return 0
latest_main_sha = get_branch_head(WATCH_BRANCH)
if latest_main_sha != main_sha:
print(
+7 -11
View File
@@ -576,14 +576,12 @@ def run_once(*, dry_run: bool = False) -> int:
f"{len(failed)} failed context(s)")
file_or_update_red(sha, failed, debug, dry_run=dry_run)
else:
if status.get("state") in ("success", "pending"):
# Close stale main-red issues when main has no failures.
# `pending` is included because Gitea combined-state can stay
# `pending` even when all observable individual statuses are
# successful (some jobs still running). The `is_red()` check
# already confirmed 0 failures — closing on `pending` prevents
# stale issues from persisting across cron ticks while
# long-running jobs finish.
# Green (or pending — pending is treated as not-red so we don't
# spam during the post-merge CI window). Close any stale issues
# from earlier SHAs only when we're actually green; pending
# means CI hasn't finished and the prior issue might still be
# accurate.
if status.get("state") == "success":
closed = close_open_red_issues_for_other_shas(sha, dry_run=dry_run)
if closed:
emit_loki_event(
@@ -591,10 +589,8 @@ def run_once(*, dry_run: bool = False) -> int:
[],
)
print(f"::notice::main is GREEN at {sha[:10]} on {WATCH_BRANCH} "
f"(closed {closed} stale issue(s), combined={status.get('state')})")
f"(closed {closed} stale issue(s))")
else:
print(f"::notice::main is RED/ERROR at {sha[:10]} on {WATCH_BRANCH} "
f"(combined state={status.get('state')!r})")
print(f"::notice::main is PENDING at {sha[:10]} on {WATCH_BRANCH} "
f"(combined state={status.get('state')!r}; no action)")
return 0
+85 -146
View File
@@ -68,7 +68,7 @@ import sys
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
from typing import Any, Callable
# ---------------------------------------------------------------------------
@@ -102,7 +102,7 @@ def normalize_slug(raw: str, numeric_aliases: dict[int, str] | None = None) -> s
# ---------------------------------------------------------------------------
# Comment parsing — /sop-ack, /sop-revoke, and /sop-n/a
# Comment parsing — /sop-ack and /sop-revoke
# ---------------------------------------------------------------------------
# A directive must be on its own line. Permits leading whitespace.
@@ -113,7 +113,8 @@ _DIRECTIVE_RE = re.compile(
r"^[ \t]*/(sop-ack|sop-revoke|sop-n/a)[ \t]+([A-Za-z0-9_\- ]+?)(?:[ \t]+(.*))?[ \t]*$",
re.MULTILINE,
)
# /sop-n/a <gate> [reason] — declare a qa/sec gate N/A.
def parse_directives(
comment_body: str,
numeric_aliases: dict[int, str],
@@ -348,78 +349,59 @@ def compute_ack_state(
}
# ---------------------------------------------------------------------------
# N/A-gate evaluation
# ---------------------------------------------------------------------------
def compute_na_state(
comments: list[dict[str, Any]],
pr_author: str,
na_gates: dict[str, dict[str, Any]],
team_membership_probe: "callable[[str, list[str]], list[str]]",
author: str,
na_gates: dict[str, Any],
probe: Callable[[str, list[str]], list[str]],
) -> dict[str, dict[str, Any]]:
"""Compute per-gate N/A declaration state.
"""Evaluate which N/A gates have a valid declaration from a team member.
Each comment is processed in chronological order. The most-recent
N/A directive per (commenter, gate) wins.
Returns a dict keyed by gate name:
{
"qa-review": {
"declared": True,
"declared_by": "core-qa-agent",
"reason": "CI/non-security-touching",
"valid": True, # non-author + in required team
"error": None, # error string if invalid
},
...
}
Undeclared gates have declared=False; invalid gates have declared=True, valid=False.
Returns dict[gate_name, dict] where each dict has:
declared: bool — at least one valid non-author team-member declared N/A
decl_ackers: list[str] — usernames who declared this gate N/A
rejected: dict with keys:
not_in_team: list[str] — users who tried but aren't in required teams
"""
# Step 1: collapse N/A directives per (commenter, gate) — most recent wins.
latest_na: dict[tuple[str, str], tuple[str, str]] = {}
# Build per-user latest N/A directive (most-recent wins per RFC#324).
latest_na: dict[str, tuple[str, str]] = {} # user → (gate, note)
for c in comments:
body = c.get("body", "") or ""
user = (c.get("user") or {}).get("login", "")
if not user:
continue
_, na_directives = parse_directives(body, {})
for _kind, gate, reason in na_directives:
if gate not in na_gates:
continue
latest_na[(user, gate)] = (gate, reason)
for kind, gate, note in parse_directives(body, {})[1]:
# [1] = na_directives only
if gate in na_gates:
latest_na[user] = (gate, note)
# Step 2: initialise all gates as undeclared.
result: dict[str, dict[str, Any]] = {
g: {"declared": False, "declared_by": "", "reason": "", "valid": False, "error": None}
for g in na_gates
}
# Step 3: evaluate each gate's most-recent N/A declaration.
for (user, gate), (gate_name, reason) in latest_na.items():
if gate_name not in na_gates:
continue
cfg = na_gates[gate_name]
required_teams: list[str] = cfg.get("required_teams", [])
entry: dict[str, Any] = {
"declared": True,
"declared_by": user,
"reason": reason,
"valid": False,
"error": None,
result: dict[str, dict[str, Any]] = {}
for gate, gate_cfg in na_gates.items():
result[gate] = {
"declared": False,
"decl_ackers": [],
"rejected": {"not_in_team": []},
}
# Authors cannot self-declare N/A (gate script enforces same rule).
if user == pr_author:
entry["error"] = "self-declare N/A rejected"
else:
# Probe team membership: is the declarer in any required team?
approved = team_membership_probe(f"na:{gate_name}", [user])
if user in approved:
entry["valid"] = True
decl_ackers: list[str] = []
not_in_team: list[str] = []
for user, (g, _note) in latest_na.items():
if g != gate:
continue
if user == author:
continue # authors cannot self-declare N/A
approved = probe(gate, [user])
if approved:
decl_ackers.append(user)
else:
# 403 from team API means token owner not in that team.
# Fail-closed: treat unknown membership as invalid.
entry["error"] = f"{user} not in required team {required_teams}"
result[gate_name] = entry
not_in_team.append(user)
result[gate]["declared"] = bool(decl_ackers)
result[gate]["decl_ackers"] = decl_ackers
result[gate]["rejected"]["not_in_team"] = not_in_team
return result
@@ -924,90 +906,6 @@ def main(argv: list[str] | None = None) -> int:
extra = " (" + "; ".join(extras) + ")" if extras else ""
print(f"::notice:: [WAIT] {slug} — no valid peer-ack yet{extra}")
target_url = f"https://{args.gitea_host}/{args.owner}/{args.repo}/pulls/{args.pr}"
# ----- N/A gate declarations (RFC#324 §N/A follow-up) -----
# sop-checklist.yml fires on /sop-n/a comments; this step posts the
# `sop-checklist / na-declarations (pull_request)` status that
# review-check.sh reads to waive the Gitea-APPROVE requirement.
na_gates: dict[str, Any] = cfg.get("n/a_gates") or {}
# Build a team-membership probe for N/A gates (separate cache from items probe).
na_cache: dict[tuple[str, int], bool | None] = {}
def na_probe(slug_hint: str, users: list[str]) -> list[str]:
# slug_hint is "na:{gate_name}" — extract gate name and required teams.
gate_name = slug_hint.removeprefix("na:")
gate_cfg = na_gates.get(gate_name, {})
team_names: list[str] = gate_cfg.get("required_teams", [])
# Resolve team names → ids.
team_ids: list[int] = []
for tn in team_names:
tid = client.resolve_team_id(args.owner, tn) # noqa: SLF001
if tid is None:
code, data = client._req( # noqa: SLF001
"GET", f"/orgs/{args.owner}/teams"
)
if code == 200 and isinstance(data, list):
for t in data:
if t.get("name") == tn:
tid = t.get("id")
client._team_id_cache[(args.owner, tn)] = tid # noqa: SLF001
break
if tid is not None:
team_ids.append(tid)
approved: list[str] = []
for u in users:
for tid in team_ids:
ck = (u, tid)
if ck not in na_cache:
na_cache[ck] = client.is_team_member(tid, u) # noqa: SLF001
res = na_cache[ck]
if res is True:
approved.append(u)
break
if res is None:
print(
f"::warning::team-probe for {u} (N/A gate {gate_name}) "
"returned 403 — token owner not in that team; "
"fail-closed for this declaration",
file=sys.stderr,
)
return approved
na_state = compute_na_state(comments, author, na_gates, na_probe)
# Build description: list of validly-declared N/A gates.
na_approved_gates = [
g for g, entry in na_state.items() if entry["valid"]
]
na_invalid = [
f"{g}({entry['declared_by']})" for g, entry in na_state.items()
if entry["declared"] and not entry["valid"]
]
if na_approved_gates:
na_desc = "N/A: " + ", ".join(na_approved_gates)
elif na_invalid:
na_desc = "invalid N/A: " + ", ".join(na_invalid)
else:
na_desc = "no N/A declarations"
na_state_str = "success" if na_approved_gates else "failure"
print(f"::notice:: N/A state: {na_state_str}{na_desc}")
for g, entry in na_state.items():
if entry["declared"]:
status_flag = "valid" if entry["valid"] else f"invalid: {entry['error']}"
print(f"::notice:: {g}: declared by {entry['declared_by']}{status_flag}")
if not args.dry_run:
na_context = "sop-checklist / na-declarations (pull_request)"
client.post_status(
args.owner, args.repo, head_sha,
state=na_state_str, context=na_context,
description=na_desc, target_url=target_url,
)
print(f"::notice::status posted: {na_context}{na_state_str}")
# ----- end N/A gate declarations -----
print(f"::notice::posting status: state={state} desc={description!r}")
if args.dry_run:
@@ -1015,6 +913,8 @@ def main(argv: list[str] | None = None) -> int:
if args.exit_on_state:
return 0 if state in ("success", "pending") else 1
return 0
target_url = f"https://{args.gitea_host}/{args.owner}/{args.repo}/pulls/{args.pr}"
client.post_status(
args.owner, args.repo, head_sha,
state=state, context=args.status_context,
@@ -1022,6 +922,45 @@ def main(argv: list[str] | None = None) -> int:
)
print(f"::notice::status posted: {args.status_context}{state}")
# --- N/A gate status (RFC#324 §N/A follow-up) ---
# Post a separate status so review-check.sh can discover N/A declarations
# and waive the Gitea-approve requirement for that gate.
na_state: dict[str, dict[str, Any]] = {}
if na_gates:
na_state = compute_na_state(comments, author, na_gates, probe)
na_descs: list[str] = []
for gate, s in na_state.items():
if s["declared"]:
na_descs.append(gate)
decl = s["decl_ackers"]
rej = s["rejected"]["not_in_team"]
if decl:
print(f"::notice:: [N/A OK] {gate} — declared by {','.join(decl)}")
if rej:
print(
f"::notice:: [N/A REJ] {gate} — not-in-team: {','.join(rej)}",
file=sys.stderr,
)
na_desc = ", ".join(sorted(na_descs)) if na_descs else "(none)"
na_status_state = "success" if na_descs else "pending"
# review-check.sh reads the description to discover which gates are N/A.
# Include the gate names so it can grep for them.
na_description = f"N/A: {na_desc}" if na_descs else "N/A: (none)"
if not args.dry_run:
client.post_status(
args.owner, args.repo, head_sha,
state=na_status_state,
context="sop-checklist / na-declarations (pull_request)",
description=na_description,
target_url=target_url,
)
print(
f"::notice::na-declarations status → {na_status_state}: {na_description}"
)
# By default exit 0 — the POSTed status IS the gate, NOT the job
# conclusion. If the job exits 1 BP will see TWO failure signals
# (one from the job's auto-status, one from our POST), making the
@@ -1,123 +0,0 @@
#!/usr/bin/env python3
"""Stub Gitea API for review-refire-status.sh test scenarios.
Reads $FIXTURE_STATE_DIR/scenario to decide what to return for each
endpoint the review-refire-status.sh script calls (and review-check.sh
which it invokes inline). Also reads $FIXTURE_STATE_DIR/review_check_rc
to control what review-check.sh exits with (PASS → 0, FAIL → 1).
Scenarios:
open — PR is open; review-check.sh runs and exits based on review_check_rc
closed — PR is closed; script exits 0 with no-op
review_check_rc file content:
PASS → review-check.sh exits 0 (success)
FAIL → review-check.sh exits 1 (failure)
Usage:
FIXTURE_STATE_DIR=/tmp/x python3 _review_refire_fixture.py 8080
"""
import http.server
import json
import os
import re
import sys
import urllib.parse
STATE_DIR = os.environ.get("FIXTURE_STATE_DIR", "/tmp")
def scenario() -> str:
p = os.path.join(STATE_DIR, "scenario")
if not os.path.isfile(p):
return "open"
with open(p).read() as f:
return f.read().strip()
def review_check_rc() -> int:
p = os.path.join(STATE_DIR, "review_check_rc")
if os.path.isfile(p):
content = open(p).read().strip()
return 0 if content == "PASS" else 1
return 0 # default: pass
class Handler(http.server.BaseHTTPRequestHandler):
def log_message(self, *args, **kwargs):
pass # keep stdout quiet
def _json(self, code: int, body: dict) -> None:
payload = json.dumps(body).encode()
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(payload)))
self.end_headers()
self.wfile.write(payload)
def _empty(self, code: int) -> None:
self.send_response(code)
self.send_header("Content-Length", "0")
self.end_headers()
def do_GET(self):
u = urllib.parse.urlparse(self.path)
path = u.path
sc = scenario()
# GET /repos/{owner}/{repo}/pulls/{pr_number}
m = re.match(r"^/api/v1/repos/([^/]+)/([^/]+)/pulls/(\d+)$", path)
if m:
return self._json(200, {
"number": int(m.group(3)),
"state": "closed" if sc == "closed" else "open",
"head": {"sha": "deadbeef0000111122223333444455556666"},
"base": {"ref": "main"},
"user": {"login": "feature-author"},
})
# GET /repos/{owner}/{repo}/pulls/{pr_number}/reviews
m = re.match(r"^/api/v1/repos/([^/]+)/([^/]+)/pulls/(\d+)/reviews$", path)
if m:
return self._json(200, [
{"state": "APPROVED", "dismissed": False,
"user": {"login": "qa-member"}, "commit_id": "abc1234"},
])
# GET /teams/{team_id}/members/{username}
m = re.match(r"^/api/v1/teams/(\d+)/members/([^/]+)$", path)
if m:
return self._empty(204) # member
self._json(404, {"path": path})
def do_POST(self):
u = urllib.parse.urlparse(self.path)
path = u.path
# POST /repos/{owner}/{repo}/statuses/{sha}
m = re.match(r"^/api/v1/repos/([^/]+)/([^/]+)/statuses/([a-f0-9]+)$", path)
if m:
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length) if length else b"{}"
try:
data = json.loads(body)
except Exception:
data = {}
# Echo back what was posted so test can verify
return self._json(200, {"posted": data})
self._json(404, {"path": path})
def main():
port = int(sys.argv[1])
srv = http.server.ThreadingHTTPServer(("127.0.0.1", port), Handler)
print(f"Fixture serving on port {port}", flush=True)
srv.serve_forever()
if __name__ == "__main__":
main()
@@ -1,152 +0,0 @@
#!/usr/bin/env bash
# Regression tests for .gitea/scripts/review-refire-status.sh.
#
# review-refire-status.sh fetches the PR head SHA, runs review-check.sh,
# and POSTs a status context based on review-check.sh's exit code.
#
# Test matrix:
# T3 — closed PR no-op (requires jq + HTTP fixture)
# T4 — GET /pulls/{N} non-200 → exits 1 (connection refused)
# T6 — missing required env → exits 1
# T7 — bash syntax check (bash -n passes)
# T8 — auth file security (mode 600 + Authorization header)
#
# T1/T2 (review-check success/failure) require jq and are run via
# the GitHub Actions CI job (jq installed via apt-get).
#
# Hostile self-review: MUST FAIL if script is absent.
set -euo pipefail
THIS_DIR="$(cd "$(dirname "$0")" && pwd)"
SCRIPT_DIR="$(cd "$THIS_DIR/.." && pwd)"
SCRIPT="$SCRIPT_DIR/review-refire-status.sh"
PASS=0
FAIL=0
FAILED_TESTS=""
pass() { echo " PASS: $1"; PASS=$((PASS+1)); }
fail() { echo " FAIL: $1"; FAIL=$((FAIL+1)); FAILED_TESTS="${FAILED_TESTS} $1"; }
if [ ! -f "$SCRIPT" ]; then
echo "FAIL: $SCRIPT does not exist"
exit 1
fi
# T7: bash syntax
t7_syntax() {
if bash -n "$SCRIPT" 2>&1; then
pass "T7 bash syntax"
else
fail "T7 bash syntax"
fi
}
# T6: missing required env
t6_missing_env() {
set +e
local out rc
out=$(GITEA_TOKEN="x" GITEA_HOST="git.example" REPO="o/r" PR_NUMBER="1" \
TEAM="qa" COMMENT_AUTHOR="alice" bash "$SCRIPT" 2>&1)
rc=$?
set -e
if [ "$rc" -ne 0 ]; then
pass "T6 missing GITEA_TOKEN exits non-zero (rc=$rc)"
else
fail "T6 missing GITEA_TOKEN should exit non-zero"
fi
}
# T4: connection refused (port nothing listening on)
t4_connection_refused() {
local port
port=$(python3 -c "import socket; s=socket.socket(); s.bind(('127.0.0.1',0)); print(s.getsockname()[1]); s.close()")
# Don't start a server on this port
set +e
local out rc
out=$(GITEA_TOKEN="test-token" \
GITEA_HOST="127.0.0.1:${port}" \
REPO="molecule-ai/molecule-core" \
PR_NUMBER="1" \
TEAM="qa" \
COMMENT_AUTHOR="alice" \
bash "$SCRIPT" 2>&1)
rc=$?
set -e
if [ "$rc" -ne 0 ]; then
pass "T4 connection refused exits non-zero (rc=$rc)"
else
fail "T4 connection refused should exit non-zero, got rc=0"
fi
}
# T8: auth file security
t8_auth_security() {
if grep -q 'chmod 600' "$SCRIPT"; then
pass "T8 auth file mode 600"
else
fail "T8 should chmod 600 on auth file"
fi
if grep -q 'Authorization.*token' "$SCRIPT"; then
pass "T8 Authorization header set"
else
fail "T8 should set Authorization header"
fi
}
# T3: closed PR no-op — requires jq (installed in CI via apt-get)
t3_closed_pr_noop() {
if ! command -v jq >/dev/null 2>&1; then
echo " SKIP T3: jq not available (run in CI to exercise)"
return
fi
local port fixture_dir out rc
port=$(python3 -c "import socket; s=socket.socket(); s.bind(('127.0.0.1',0)); print(s.getsockname()[1]); s.close()")
fixture_dir=$(mktemp -d)
echo "closed" > "${fixture_dir}/scenario"
export FIXTURE_STATE_DIR="$fixture_dir"
python3 "$THIS_DIR/_review_refire_fixture.py" "$port" &
local fixture_pid=$!
sleep 1
out=$(GITEA_TOKEN="test-token" \
GITEA_HOST="127.0.0.1:${port}" \
REPO="molecule-ai/molecule-core" \
PR_NUMBER="1" \
TEAM="qa" \
COMMENT_AUTHOR="alice" \
bash "$SCRIPT" 2>&1)
rc=$?
kill $fixture_pid 2>/dev/null || true
rm -rf "$fixture_dir"
if [ "$rc" -eq 0 ]; then
pass "T3 closed PR exits 0"
else
fail "T3 closed PR should exit 0, got rc=$rc. Output: ${out}"
fi
}
echo "Running review-refire-status.sh tests..."
echo "========================================"
t7_syntax
t6_missing_env
t4_connection_refused
t8_auth_security
t3_closed_pr_noop
echo ""
echo "========================================"
echo "PASS: $PASS FAIL: $FAIL"
if [ "$FAIL" -gt 0 ]; then
echo "Failed:$FAILED_TESTS"
exit 1
fi
echo "All tests passed."
exit 0
+4 -11
View File
@@ -575,29 +575,22 @@ class TestComputeNaState(unittest.TestCase):
comments = [_comment("bob", "/sop-n/a qa-review N/A: pure tooling change")]
na_state = sop.compute_na_state(comments, "alice", na_gates, lambda g, u: u)
self.assertTrue(na_state["qa-review"]["declared"])
self.assertEqual(na_state["qa-review"]["declared_by"], "bob")
self.assertTrue(na_state["qa-review"]["valid"])
self.assertIsNone(na_state["qa-review"]["error"])
self.assertEqual(na_state["qa-review"]["decl_ackers"], ["bob"])
def test_na_declared_by_unauthorized_user_rejected(self):
cfg = sop.load_config(CONFIG_PATH)
na_gates = cfg.get("n/a_gates", {})
comments = [_comment("mallory", "/sop-n/a qa-review N/A: not real team")]
na_state = sop.compute_na_state(comments, "alice", na_gates, lambda g, u: [])
# declared=True (mallory attempted N/A) but valid=False (not in required team).
self.assertTrue(na_state["qa-review"]["declared"])
self.assertFalse(na_state["qa-review"]["valid"])
self.assertIn("mallory", na_state["qa-review"]["error"])
self.assertFalse(na_state["qa-review"]["declared"])
self.assertEqual(na_state["qa-review"]["rejected"]["not_in_team"], ["mallory"])
def test_author_cannot_self_declare_na(self):
cfg = sop.load_config(CONFIG_PATH)
na_gates = cfg.get("n/a_gates", {})
comments = [_comment("alice", "/sop-n/a qa-review N/A: I am the author")]
na_state = sop.compute_na_state(comments, "alice", na_gates, lambda g, u: u)
# declared=True (alice attempted N/A) but valid=False (self-declare rejected).
self.assertTrue(na_state["qa-review"]["declared"])
self.assertFalse(na_state["qa-review"]["valid"])
self.assertIn("self", na_state["qa-review"]["error"].lower())
self.assertFalse(na_state["qa-review"]["declared"])
def test_parse_directives_separates_na_from_ack(self):
directives, na_directives = sop.parse_directives(
+6
View File
@@ -32,6 +32,12 @@ on:
# iterating all open PRs when PR_NUMBER is empty.
workflow_dispatch:
# Cancel stale runs so the 8-runner pool stays available for PR jobs.
# Per-SHA group ensures push and cron runs at different SHAs don't cancel each other.
concurrency:
group: gate-check-v3-${{ github.event.pull_request.head.sha || github.sha }}
cancel-in-progress: true
permissions:
# read: contents — for checkout (base ref, not PR head for security)
# read: pull-requests — for reading PR info via API
+2 -8
View File
@@ -70,10 +70,7 @@ jobs:
- name: Refire qa-review status
if: steps.classify.outputs.run_qa == 'true'
env:
# RFC_324_TEAM_READ_TOKEN is read-only (team membership read scope only).
# review-refire-status.sh POSTs to /statuses — requires write scope.
# SOP_TIER_CHECK_TOKEN carries write:repository + write:issue + read:organization.
GITEA_TOKEN: ${{ secrets.SOP_TIER_CHECK_TOKEN || secrets.GITHUB_TOKEN }}
GITEA_TOKEN: ${{ secrets.RFC_324_TEAM_READ_TOKEN || secrets.GITHUB_TOKEN }}
GITEA_HOST: git.moleculesai.app
REPO: ${{ github.repository }}
PR_NUMBER: ${{ github.event.issue.number }}
@@ -90,10 +87,7 @@ jobs:
- name: Refire security-review status
if: steps.classify.outputs.run_security == 'true'
env:
# RFC_324_TEAM_READ_TOKEN is read-only (team membership read scope only).
# review-refire-status.sh POSTs to /statuses — requires write scope.
# SOP_TIER_CHECK_TOKEN carries write:repository + write:issue + read:organization.
GITEA_TOKEN: ${{ secrets.SOP_TIER_CHECK_TOKEN || secrets.GITHUB_TOKEN }}
GITEA_TOKEN: ${{ secrets.RFC_324_TEAM_READ_TOKEN || secrets.GITHUB_TOKEN }}
GITEA_HOST: git.moleculesai.app
REPO: ${{ github.repository }}
PR_NUMBER: ${{ github.event.issue.number }}
@@ -1,63 +0,0 @@
name: review-refire-status-tests
# Regression tests for .gitea/scripts/review-refire-status.sh.
#
# review-refire-status.sh is load-bearing: it POSTs the qa-review and
# security-review slash-command status contexts to the PR head SHA.
# It calls review-check.sh, which requires jq.
#
# Design (mirrors review-check-tests.yml):
# - Bash test harness; custom assert framework (no bats dependency).
# - jq is required (used by review-check.sh which this script invokes).
# - continue-on-error: false — these tests must pass.
on:
push:
branches: [main, staging]
paths:
- '.gitea/scripts/review-refire-status.sh'
- '.gitea/scripts/tests/test_review_refire_status.sh'
- '.gitea/scripts/tests/_review_refire_fixture.py'
- '.gitea/workflows/review-refire-status-tests.yml'
pull_request:
branches: [main, staging]
paths:
- '.gitea/scripts/review-refire-status.sh'
- '.gitea/scripts/tests/test_review_refire_status.sh'
- '.gitea/scripts/tests/_review_refire_fixture.py'
- '.gitea/workflows/review-refire-status-tests.yml'
workflow_dispatch:
env:
GITHUB_SERVER_URL: https://git.moleculesai.app
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
jobs:
# bp-exempt: regression tests for review-refire-status.sh; informational only, not a merge gate.
test:
name: review-refire-status.sh regression tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Install jq
# review-refire-status.sh invokes review-check.sh, which uses jq for
# JSON parsing. Gitea Actions runners do not bundle jq.
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
run: |
if apt-get update -qq && apt-get install -y -qq jq; then
echo "::notice::jq installed via apt-get: $(jq --version)"
elif timeout 120 curl -sSL \
"https://github.com/jqlang/jq/releases/download/jq-1.7.1/jq-linux-amd64" \
-o /usr/local/bin/jq && chmod +x /usr/local/bin/jq; then
echo "::notice::jq binary downloaded: $(/usr/local/bin/jq --version)"
else
echo "::warning::jq install failed"
fi
- name: Run review-refire-status.sh regression suite
run: bash .gitea/scripts/tests/test_review_refire_status.sh
@@ -44,6 +44,12 @@ on:
- ".github/scripts/lint_secret_pattern_drift.py"
- ".githooks/pre-commit"
# Cancel stale runs to keep the 8-runner pool available for PR jobs.
# Per-SHA group ensures push and scheduled runs at different SHAs don't cancel each other.
concurrency:
group: secret-pattern-drift-${{ github.event.pull_request.head.sha || github.sha }}
cancel-in-progress: true
env:
GITHUB_SERVER_URL: https://git.moleculesai.app
+5
View File
@@ -22,6 +22,11 @@ on:
- cron: '17 4 * * 1' # Mondays at 04:17 UTC
workflow_dispatch:
# Cancel stale runs to keep the 8-runner pool available for PR jobs.
concurrency:
group: weekly-platform-go-${{ github.event.pull_request.head.sha || github.sha }}
cancel-in-progress: true
permissions:
contents: read
statuses: write
+7 -48
View File
@@ -410,68 +410,27 @@ def test_auto_close_when_main_returns_to_green(wd_module, monkeypatch):
assert patch_calls[0][2] == {"state": "closed"}
def test_auto_close_on_main_pending_with_no_failures(wd_module, monkeypatch):
"""main pending at NEW_SHA but all individual statuses are successful
(some jobs still running, others done) → is_red() confirms 0 failures,
so close stale issues. The combined `pending` state alone is insufficient
to block closing when individual entries show no failures."""
stub = _make_stub_api({
("GET", "/repos/owner/repo/branches/main"): (200, _branches_response(SHA_GREEN)),
("GET", f"/repos/owner/repo/commits/{SHA_GREEN}/status"): (
200, _combined_status("pending", [
# Some still running, some already succeeded
{"context": "ci/test", "state": "pending"},
{"context": "ci/lint", "state": "success"},
]),
),
# Stale issue from old SHA
("GET", "/repos/owner/repo/issues"): (
200, [{"number": 7, "title": f"[main-red] owner/repo: {SHA_RED[:10]}"}],
),
("POST", "/repos/owner/repo/issues/7/comments"): (200, {}),
("PATCH", "/repos/owner/repo/issues/7"): (200, {}),
})
monkeypatch.setattr(wd_module, "api", stub)
wd_module.run_once(dry_run=False)
methods_paths = [(c[0], c[1]) for c in stub.calls]
assert ("GET", "/repos/owner/repo/issues") in methods_paths
assert ("POST", "/repos/owner/repo/issues/7/comments") in methods_paths
assert ("PATCH", "/repos/owner/repo/issues/7") in methods_paths
def test_auto_close_skips_when_main_pending_with_failures(wd_module, monkeypatch):
"""main pending at NEW_SHA but some individual statuses are failures
(CI partially broken, combined state lagging) → is_red() sees failures,
so close must NOT run. file_or_update_red handles the new failure
(opens an issue for the current SHA)."""
def test_auto_close_skips_when_main_pending(wd_module, monkeypatch):
"""main pending (CI still running) at NEW_SHA → leave old issue alone.
Pending could resolve to red, so closing prematurely would lose the
breadcrumb of the prior red."""
old_title = f"[main-red] owner/repo: {SHA_RED[:10]}"
stub = _make_stub_api({
("GET", "/repos/owner/repo/branches/main"): (200, _branches_response(SHA_GREEN)),
("GET", f"/repos/owner/repo/commits/{SHA_GREEN}/status"): (
200, _combined_status("pending", [
{"context": "ci/test", "state": "pending"},
{"context": "ci/lint", "state": "failure"}, # individual failure
]),
),
# file_or_update_red: search for existing issue for current SHA → none found
("GET", "/repos/owner/repo/issues"): (200, []),
# file_or_update_red: POST new issue
("POST", "/repos/owner/repo/issues"): (200, {"number": 8}),
# file_or_update_red: GET labels
("GET", "/repos/owner/repo/labels"): (200, [{"id": 9, "name": "tier:high"}]),
# file_or_update_red: POST label
("POST", "/repos/owner/repo/issues/8/labels"): (200, {}),
})
monkeypatch.setattr(wd_module, "api", stub)
wd_module.run_once(dry_run=False)
# No close-related calls — is_red() triggered file/update path instead
# No close-related calls
methods_paths = [(c[0], c[1]) for c in stub.calls]
assert ("PATCH", "/repos/owner/repo/issues/7") not in methods_paths
assert ("POST", "/repos/owner/repo/issues/7/comments") not in methods_paths
assert ("GET", "/repos/owner/repo/issues/7") not in methods_paths
assert ("GET", "/repos/owner/repo/issues") not in methods_paths
# --------------------------------------------------------------------------
-399
View File
@@ -1,399 +0,0 @@
"""Tests for `.gitea/scripts/sop-checklist.py`.
Covers the N/A declarations feature (`compute_na_state`) and the
`parse_directives` helper that powers both ack/revoke and N/A parsing.
Run:
python3 -m pytest tests/test_sop_checklist.py -v
Dependencies: stdlib + pytest. No network. No live Gitea calls.
"""
from __future__ import annotations
import importlib.util
import os
import sys
from pathlib import Path
from unittest import mock
import pytest
# --------------------------------------------------------------------------
# Module-import fixture
# --------------------------------------------------------------------------
SCRIPT_PATH = (
Path(__file__).resolve().parent.parent
/ ".gitea"
/ "scripts"
/ "sop-checklist.py"
)
@pytest.fixture(scope="module")
def sc_module():
"""Import sop-checklist.py as a module under a known env."""
env = {
"GITEA_TOKEN": "test-token",
"GITEA_HOST": "git.example.test",
"REPO": "owner/repo",
"DRY_RUN": "0",
}
with mock.patch.dict(os.environ, env, clear=False):
spec = importlib.util.spec_from_file_location(
"sop_checklist", SCRIPT_PATH
)
m = importlib.util.module_from_spec(spec)
spec.loader.exec_module(m)
# Force-set module globals captured at import time.
m.GITEA_TOKEN = env["GITEA_TOKEN"]
m.GITEA_HOST = env["GITEA_HOST"]
m.REPO = env["REPO"]
yield m
# --------------------------------------------------------------------------
# parse_directives
# --------------------------------------------------------------------------
class TestParseDirectives:
def test_empty_body(self, sc_module):
directives, na_directives = sc_module.parse_directives("", {})
assert directives == []
assert na_directives == []
def test_ack_directive(self, sc_module):
directives, na_directives = sc_module.parse_directives(
"/sop-ack qa-review", {}
)
assert directives == [("sop-ack", "qa-review", "")]
assert na_directives == []
def test_ack_directive_with_note(self, sc_module):
directives, na_directives = sc_module.parse_directives(
"/sop-ack qa-review CI-only change", {}
)
assert directives == [("sop-ack", "qa-review", "CI-only change")]
assert na_directives == []
def test_revoke_directive(self, sc_module):
directives, na_directives = sc_module.parse_directives(
"/sop-revoke qa-review missed edge case", {}
)
assert directives == [("sop-revoke", "qa-review", "missed edge case")]
assert na_directives == []
def test_na_directive_no_reason(self, sc_module):
"""Bare /sop-n/a <gate> is valid."""
_, na_directives = sc_module.parse_directives(
"/sop-n/a qa-review", {}
)
assert na_directives == [("qa-review", "")]
def test_na_directive_with_reason(self, sc_module):
"""Full /sop-n/a <gate> <reason> is parsed correctly."""
_, na_directives = sc_module.parse_directives(
"/sop-n/a qa-review CI/non-security-touching", {}
)
assert na_directives == [("qa-review", "CI/non-security-touching")]
def test_na_directive_multiple_gates(self, sc_module):
body = (
"/sop-n/a qa-review CI-only\n"
"/sop-n/a security-review no-op change\n"
)
_, na_directives = sc_module.parse_directives(body, {})
assert na_directives == [
("qa-review", "CI-only"),
("security-review", "no-op change"),
]
def test_na_directive_requires_space_after_gate(self, sc_module):
"""Without a space, /sop-n/a is not matched (the regex requires \t+)."""
_, na_directives = sc_module.parse_directives(
"/sop-n/a", {}
)
assert na_directives == []
def test_na_directive_invalid_gate_characters(self, sc_module):
r"""Gate names must match [A-Za-z0-9_\-]+. A dot in the gate name
causes the entire line not to match (no backtracking to partial match)."""
_, na_directives = sc_module.parse_directives(
"/sop-n/a qa.review some reason", {}
)
# The regex cannot match qa.review as a valid gate (dot is invalid).
# Since the (?:[ \t]+(.*))? is optional, the whole pattern fails and
# no directive is extracted.
assert na_directives == []
def test_mixed_directives(self, sc_module):
"""ack + na directives can appear in the same comment."""
body = (
"/sop-ack sop-checklist all boxes ticked\n"
"/sop-n/a qa-review no code changes\n"
)
directives, na_directives = sc_module.parse_directives(body, {})
assert directives == [("sop-ack", "sop-checklist", "all boxes ticked")]
assert na_directives == [("qa-review", "no code changes")]
def test_na_directive_leading_whitespace(self, sc_module):
"""Indent is permitted — regex matches [ \t]*."""
_, na_directives = sc_module.parse_directives(
" /sop-n/a qa-review indented", {}
)
assert na_directives == [("qa-review", "indented")]
def test_na_directive_tab_indent(self, sc_module):
_, na_directives = sc_module.parse_directives(
"\t/sop-n/a qa-review tab-indented", {}
)
assert na_directives == [("qa-review", "tab-indented")]
def test_na_directive_trailing_whitespace_stripped(self, sc_module):
"""Trailing spaces after the reason are stripped."""
_, na_directives = sc_module.parse_directives(
"/sop-n/a qa-review reason \n", {}
)
assert na_directives == [("qa-review", "reason")]
def test_na_directive_multiple_on_same_line(self, sc_module):
"""MULTILINE anchors at line start (^), but greedy (.*) in the reason
group captures everything after the first space, including the second
/sop-n/a on the same line."""
_, na_directives = sc_module.parse_directives(
"/sop-n/a qa-review /sop-n/a security-review", {}
)
# reason = "/sop-n/a security-review" (greedy capture)
assert na_directives == [("qa-review", "/sop-n/a security-review")]
def test_ack_numeric_alias(self, sc_module):
"""Numeric alias e.g. /sop-ack 1 resolves via numeric_aliases."""
directives, _ = sc_module.parse_directives(
"/sop-ack 1", {1: "qa-review"}
)
assert directives == [("sop-ack", "qa-review", "")]
# --------------------------------------------------------------------------
# compute_na_state
# --------------------------------------------------------------------------
class TestComputeNaState:
def _make_comment(
self,
body: str,
user_login: str,
created_at: str = "2024-01-01T00:00:00Z",
) -> dict:
return {
"id": 0,
"body": body,
"user": {"login": user_login},
"created_at": created_at,
}
def _na_gates(self) -> dict:
return {
"qa-review": {
"required_teams": ["qa"],
},
"security-review": {
"required_teams": ["security"],
},
"sop-checklist": {
"required_teams": ["engineering"],
},
}
# ----- empty / no declarations -----
def test_no_comments(self, sc_module):
result = sc_module.compute_na_state([], "author", self._na_gates(), lambda *a: [])
for gate in ["qa-review", "security-review"]:
assert result[gate]["declared"] is False
assert result[gate]["valid"] is False
assert result[gate]["error"] is None
def test_non_na_comment_ignored(self, sc_module):
comments = [self._make_comment("/sop-ack qa-review", "helper")]
result = sc_module.compute_na_state(comments, "author", self._na_gates(), lambda *a: [])
assert result["qa-review"]["declared"] is False
def test_unknown_gate_ignored(self, sc_module):
comments = [self._make_comment("/sop-n/a unknown-gate reason", "helper")]
result = sc_module.compute_na_state(comments, "author", self._na_gates(), lambda *a: [])
assert result["qa-review"]["declared"] is False
# ----- valid declarations -----
def test_valid_na_declaration(self, sc_module):
"""Declarer in required team → valid=True, declared=True."""
comments = [self._make_comment("/sop-n/a qa-review CI-only change", "qa-member")]
probe = lambda _slug, users: ["qa-member"]
result = sc_module.compute_na_state(
comments, "author", self._na_gates(), probe
)
assert result["qa-review"]["declared"] is True
assert result["qa-review"]["declared_by"] == "qa-member"
assert result["qa-review"]["reason"] == "CI-only change"
assert result["qa-review"]["valid"] is True
assert result["qa-review"]["error"] is None
def test_valid_na_multiple_gates(self, sc_module):
comments = [
self._make_comment("/sop-n/a qa-review CI-only", "qa-member"),
self._make_comment("/sop-n/a security-review no-op", "sec-member"),
]
probe = lambda slug, users: (
["qa-member"] if "qa" in slug else ["sec-member"]
)
result = sc_module.compute_na_state(
comments, "author", self._na_gates(), probe
)
assert result["qa-review"]["valid"] is True
assert result["security-review"]["valid"] is True
def test_na_without_reason(self, sc_module):
"""N/A with no reason is still valid if team membership checks out."""
comments = [self._make_comment("/sop-n/a qa-review", "qa-member")]
probe = lambda _slug, users: ["qa-member"]
result = sc_module.compute_na_state(
comments, "author", self._na_gates(), probe
)
assert result["qa-review"]["declared"] is True
assert result["qa-review"]["reason"] == ""
assert result["qa-review"]["valid"] is True
# ----- invalid declarations -----
def test_self_declare_rejected(self, sc_module):
"""Authors cannot self-declare N/A."""
comments = [self._make_comment("/sop-n/a qa-review self-declared", "author")]
probe = lambda _slug, users: ["author"]
result = sc_module.compute_na_state(
comments, "author", self._na_gates(), probe
)
assert result["qa-review"]["declared"] is True
assert result["qa-review"]["valid"] is False
assert result["qa-review"]["error"] == "self-declare N/A rejected"
def test_not_in_required_team(self, sc_module):
"""Declarer not in required team → valid=False, error set."""
comments = [self._make_comment("/sop-n/a qa-review CI-only", "random-user")]
probe = lambda _slug, users: [] # nobody approved
result = sc_module.compute_na_state(
comments, "author", self._na_gates(), probe
)
assert result["qa-review"]["declared"] is True
assert result["qa-review"]["valid"] is False
assert "not in required team" in result["qa-review"]["error"]
def test_probe_returns_empty_not_member(self, sc_module):
"""Probe returns empty list even though the team exists."""
comments = [self._make_comment("/sop-n/a security-review no-op", "not-sec")]
probe = lambda _slug, users: []
result = sc_module.compute_na_state(
comments, "author", self._na_gates(), probe
)
assert result["security-review"]["valid"] is False
assert "not in required team" in result["security-review"]["error"]
# ----- chronological ordering -----
def test_most_recent_na_wins_per_user(self, sc_module):
"""For the same user+gate, the most-recent comment wins."""
comments = [
self._make_comment("/sop-n/a qa-review old reason", "qa-member",
created_at="2024-01-01T00:00:00Z"),
self._make_comment("/sop-n/a qa-review newer reason", "qa-member",
created_at="2024-01-02T00:00:00Z"),
]
probe = lambda _slug, users: ["qa-member"]
result = sc_module.compute_na_state(
comments, "author", self._na_gates(), probe
)
assert result["qa-review"]["reason"] == "newer reason"
def test_different_users_same_gate_both_declared(self, sc_module):
"""Two different declarers for the same gate — both recorded."""
comments = [
self._make_comment("/sop-n/a qa-review first declarer", "qa-member-1",
created_at="2024-01-01T00:00:00Z"),
self._make_comment("/sop-n/a qa-review second declarer", "qa-member-2",
created_at="2024-01-02T00:00:00Z"),
]
probe = lambda _slug, users: ["qa-member-1", "qa-member-2"]
result = sc_module.compute_na_state(
comments, "author", self._na_gates(), probe
)
# Most-recent declaration wins (qa-member-2 is newer).
assert result["qa-review"]["declared_by"] == "qa-member-2"
assert result["qa-review"]["reason"] == "second declarer"
assert result["qa-review"]["valid"] is True
# ----- gate key is exact -----
def test_unknown_gate_in_na_directive_ignored(self, sc_module):
"""A gate name not in na_gates config is silently ignored."""
comments = [
self._make_comment("/sop-n/a qa-review valid", "qa-member"),
self._make_comment("/sop-n/a totally-unknown-gate reason", "qa-member"),
]
probe = lambda _slug, users: ["qa-member"]
result = sc_module.compute_na_state(
comments, "author", self._na_gates(), probe
)
assert result["qa-review"]["declared"] is True
# unknown-gate is not in result keys — only configured gates appear.
assert "totally-unknown-gate" not in result
# ----- empty comments list -----
def test_comment_with_no_user_field(self, sc_module):
"""A comment dict with no user.login is skipped (no crash)."""
comments = [{"id": 1, "body": "/sop-n/a qa-review", "user": {}}]
probe = lambda _slug, users: ["qa-member"]
result = sc_module.compute_na_state(
comments, "author", self._na_gates(), probe
)
assert result["qa-review"]["declared"] is False
def test_comment_with_empty_body(self, sc_module):
"""A comment with empty body is handled gracefully."""
comments = [{"id": 1, "body": "", "user": {"login": "qa-member"}}]
probe = lambda _slug, users: ["qa-member"]
result = sc_module.compute_na_state(
comments, "author", self._na_gates(), probe
)
assert result["qa-review"]["declared"] is False
# ----- probe slug format -----
def test_probe_called_with_na_prefix(self, sc_module):
"""The team membership probe is called with slug='na:<gate>'."""
calls = []
def track_probe(slug, users):
calls.append((slug, users))
return []
comments = [self._make_comment("/sop-n/a qa-review reason", "qa-member")]
sc_module.compute_na_state(
comments, "author", self._na_gates(), track_probe
)
assert ("na:qa-review", ["qa-member"]) in calls
# ----- undeclared gate remains False -----
def test_undeclared_gate_preserved(self, sc_module):
"""A gate with no declaration stays declared=False even when
other gates are declared."""
comments = [
self._make_comment("/sop-n/a qa-review CI-only", "qa-member"),
]
probe = lambda _slug, users: ["qa-member"]
result = sc_module.compute_na_state(
comments, "author", self._na_gates(), probe
)
assert result["qa-review"]["declared"] is True
assert result["security-review"]["declared"] is False
assert result["sop-checklist"]["declared"] is False