Compare commits

..

2 Commits

Author SHA1 Message Date
core-fe f27097a5c8 test(canvas): add MemoryTab tests (42 cases)
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 18s
Harness Replays / detect-changes (pull_request) Successful in 14s
CI / Detect changes (pull_request) Successful in 36s
E2E API Smoke Test / detect-changes (pull_request) Successful in 37s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 34s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 38s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 15s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 31s
qa-review / approved (pull_request) Failing after 18s
gate-check-v3 / gate-check (pull_request) Successful in 29s
security-review / approved (pull_request) Failing after 17s
sop-tier-check / tier-check (pull_request) Successful in 23s
CI / Platform (Go) (pull_request) Successful in 6s
Harness Replays / Harness Replays (pull_request) Successful in 5s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 6s
CI / Python Lint & Test (pull_request) Successful in 6s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 8s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 6s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 8s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 7m48s
audit-force-merge / audit (pull_request) Has been skipped
CI / Canvas (Next.js) (pull_request) Successful in 10m37s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / all-required (pull_request) Successful in 2s
Cover awareness dashboard expand/collapse, iframe with workspaceId in URL,
status grid, KV memory list, expand/collapse entries, add/edit/delete
memory entries, JSON parsing, TTL support, 409 conflict retry hint,
error states, and refresh.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-12 03:46:50 +00:00
core-fe 4980982aea chore: retrigger CI after rebase to main 2026-05-12 03:46:50 +00:00
30 changed files with 568 additions and 6579 deletions
-404
View File
@@ -1,404 +0,0 @@
#!/usr/bin/env python3
"""lint-required-no-paths — structural enforcement of
`feedback_path_filtered_workflow_cant_be_required`.
For every workflow whose status-check context appears in
`branch_protections/<branch>.status_check_contexts`, assert that the
workflow's `on:` block has NO `paths:` and NO `paths-ignore:` filter.
A required-check workflow with a paths filter silently degrades the
merge gate:
- If the PR's diff doesn't match the `paths:` glob, the workflow
never fires.
- Gitea (1.22.6) reports the required context as `pending` (never as
`skipped == success`), so the PR cannot merge.
- For a docs-only PR against `paths: ['**.go']`, the PR is
blocked forever — no human action can produce a green.
The class was previously prevented only by reviewer vigilance + the
saved memory `feedback_path_filtered_workflow_cant_be_required`. This
script makes it a hard CI gate so a future PR adding `paths:` to a
required workflow fails fast at PR time, not after merge when the next
docs PR wedges main.
The lint runs as `.gitea/workflows/lint-required-no-paths.yml` on every
PR. The lint workflow ITSELF must not have a paths-filter (otherwise it
could be circumvented by a paths-non-matching PR) — that's enforced by
self-reference and by the workflow's own `on:` block deliberately
omitting filters.
Sources of truth:
- `branch_protections/<branch>` `status_check_contexts` (the merge gate)
- `.gitea/workflows/*.yml` `name:` + `on:` (the workflow set)
Context-format note (Gitea 1.22.6):
Status-check contexts are formatted `{workflow_name} / {job_name_or_key} ({event})`.
We parse the workflow_name prefix and walk `.gitea/workflows/*.yml` for
a file whose `name:` attr matches. (The filename is NOT the source of
truth; `name:` is, because Gitea formats the context from `name:`.)
Exit codes:
0 — no required workflow has a paths/paths-ignore filter (clean) OR
branch_protections endpoint returned 403/404 (token-scope issue;
surfaced via ::error:: but non-fatal so a missing scope doesn't
red-X every PR — fix the token, not the lint).
1 — at least one required workflow has a paths/paths-ignore filter
(the gate-degrading defect class).
2 — env contract violation (missing GITEA_TOKEN/HOST/REPO/BRANCH).
3 — workflows directory missing or workflow YAML unparseable.
4 — protection response shape unexpected (non-dict body on 2xx).
Auth note: `GET /repos/.../branch_protections/{branch}` requires
repo-admin role in Gitea 1.22.6. The workflow-default `GITHUB_TOKEN`
is non-admin; we re-use `DRIFT_BOT_TOKEN` (same persona that powers
ci-required-drift.yml). If `DRIFT_BOT_TOKEN` is unavailable in a future
context, the script falls through gracefully (exit 0 + ::error::).
"""
from __future__ import annotations
import json
import os
import re
import sys
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
import yaml # PyYAML 6.0.2 — installed by the workflow before this runs.
# --------------------------------------------------------------------------
# Environment
# --------------------------------------------------------------------------
def _env(key: str, *, required: bool = True, default: str | None = None) -> str:
val = os.environ.get(key, default)
if required and not val:
sys.stderr.write(f"::error::missing required env var: {key}\n")
sys.exit(2)
return val or ""
GITEA_TOKEN = _env("GITEA_TOKEN", required=False)
GITEA_HOST = _env("GITEA_HOST", required=False)
REPO = _env("REPO", required=False)
BRANCH = _env("BRANCH", required=False, default="main")
WORKFLOWS_DIR = _env(
"WORKFLOWS_DIR", required=False, default=".gitea/workflows"
)
OWNER, NAME = (REPO.split("/", 1) + [""])[:2] if REPO else ("", "")
API = f"https://{GITEA_HOST}/api/v1" if GITEA_HOST else ""
def _require_runtime_env() -> None:
"""Enforce env contract — called from `run()` only. Tests import
individual functions without setting the full env contract."""
for key in ("GITEA_TOKEN", "GITEA_HOST", "REPO", "BRANCH"):
if not os.environ.get(key):
sys.stderr.write(f"::error::missing required env var: {key}\n")
sys.exit(2)
# --------------------------------------------------------------------------
# Tiny HTTP helper (mirrors ci-required-drift.py contract:
# raise on non-2xx and on JSON-decode-fail when JSON expected, per
# `feedback_api_helper_must_raise_not_return_dict`).
# --------------------------------------------------------------------------
class ApiError(RuntimeError):
"""Raised when a Gitea API call cannot be trusted to have succeeded."""
def api(
method: str,
path: str,
*,
body: dict | None = None,
query: dict[str, str] | None = None,
expect_json: bool = True,
) -> tuple[int, Any]:
url = f"{API}{path}"
if query:
url = f"{url}?{urllib.parse.urlencode(query)}"
data = None
headers = {
"Authorization": f"token {GITEA_TOKEN}",
"Accept": "application/json",
}
if body is not None:
data = json.dumps(body).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, method=method, data=data, headers=headers)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read()
status = resp.status
except urllib.error.HTTPError as e:
raw = e.read()
status = e.code
if not (200 <= status < 300):
snippet = raw[:500].decode("utf-8", errors="replace") if raw else ""
raise ApiError(f"{method} {path} → HTTP {status}: {snippet}")
if not raw:
return status, None
try:
return status, json.loads(raw)
except json.JSONDecodeError as e:
if expect_json:
raise ApiError(
f"{method} {path} → HTTP {status} but body is not JSON: {e}"
) from e
return status, {"_raw": raw.decode("utf-8", errors="replace")}
# --------------------------------------------------------------------------
# Status-check context parser
# --------------------------------------------------------------------------
# Format: "<workflow_name> / <job_name_or_key> (<event>)"
# Examples observed on molecule-core/main:
# "Secret scan / Scan diff for credential-shaped strings (pull_request)"
# "sop-tier-check / tier-check (pull_request)"
#
# Split strategy: peel off the trailing ` (<event>)` first, then split
# the leading `<workflow> / <rest>` on the FIRST ` / ` (workflow names
# come from `name:` attrs which conventionally don't embed ' / '; job
# names CAN, so we keep the rest of the slash-divided text as the job
# name). This matches Gitea's `name: ` semantics.
_CONTEXT_RE = re.compile(r"^(?P<workflow>.+?) / (?P<job>.+) \((?P<event>[^)]+)\)$")
def parse_context(ctx: str) -> tuple[str, str, str] | None:
"""Parse `<workflow> / <job> (<event>)` → (workflow, job, event) or None."""
if not ctx:
return None
m = _CONTEXT_RE.match(ctx)
if not m:
return None
return m.group("workflow"), m.group("job"), m.group("event")
# --------------------------------------------------------------------------
# workflow-name → file resolution
# --------------------------------------------------------------------------
def _iter_workflow_files() -> list[Path]:
d = Path(WORKFLOWS_DIR)
if not d.is_dir():
sys.stderr.write(f"::error::workflows directory not found: {d}\n")
sys.exit(3)
# `.yml` and `.yaml` — Gitea accepts both (rarely used `.yaml`, but
# don't silently miss it if a future port uses it).
return sorted(list(d.glob("*.yml")) + list(d.glob("*.yaml")))
def resolve_workflow_file(workflow_name: str) -> Path | None:
"""Find the YAML file whose `name:` attr matches `workflow_name`.
Returns None if no match. Filename is NOT used as a fallback —
Gitea's context format uses `name:`, so a `name:`-less workflow
won't even appear in the protection list. (A YAML with no `name:`
would default the context to the file basename, but our protection
contexts on molecule-core are all `name:`-derived; we trust the
same.)
"""
for f in _iter_workflow_files():
try:
doc = yaml.safe_load(f.read_text(encoding="utf-8"))
except yaml.YAMLError as e:
sys.stderr.write(f"::error::YAML parse error in {f}: {e}\n")
sys.exit(3)
if isinstance(doc, dict) and doc.get("name") == workflow_name:
return f
return None
# --------------------------------------------------------------------------
# paths-filter detection
# --------------------------------------------------------------------------
# Triggers that accept `paths:` / `paths-ignore:` (per GitHub Actions /
# Gitea Actions docs): pull_request, pull_request_target, push.
# We don't enumerate — any sub-key named `paths` or `paths-ignore`
# inside an event mapping is flagged.
_PATHS_KEYS = ("paths", "paths-ignore")
def detect_paths_filters(workflow_path: Path) -> list[str]:
"""Walk the workflow's `on:` block and return a list of findings, one
per offending `paths`/`paths-ignore` key.
Returns:
Empty list if the workflow has no paths/paths-ignore filter
anywhere in its `on:` block. Otherwise, a list of human-readable
strings naming the event and filter key + the filter contents.
"""
try:
doc = yaml.safe_load(workflow_path.read_text(encoding="utf-8"))
except yaml.YAMLError as e:
sys.stderr.write(f"::error::YAML parse error in {workflow_path}: {e}\n")
sys.exit(3)
if not isinstance(doc, dict):
return []
on_block = doc.get("on") or doc.get(True) # PyYAML 6 quirk: `on:`
# under default constructor sometimes becomes the bool key `True`
# because YAML 1.1 treats `on` as a boolean. Tolerate both.
if on_block is None:
return []
findings: list[str] = []
# Shape A: `on: pull_request` (string shorthand) — cannot carry filters.
if isinstance(on_block, str):
return []
# Shape B: `on: [pull_request, push]` (list shorthand) — cannot carry filters.
if isinstance(on_block, list):
return []
# Shape C: `on: { event: { ... } }` — the standard mapping case.
if isinstance(on_block, dict):
# Defensive: top-level malformed `on.paths` (someone wrote
# `on: { paths: ['x'] }` thinking it's a workflow-level filter).
# This is invalid syntax, but if present, flag it — it might
# not block the workflow from registering (Gitea may ignore the
# unknown key) and would create a false sense of "filter exists"
# the lint should still surface.
for k in _PATHS_KEYS:
if k in on_block:
v = on_block[k]
findings.append(
f"top-level `on.{k}` filter (malformed but present): {v!r}"
)
for event, event_body in on_block.items():
if event in _PATHS_KEYS:
continue # already handled above
if not isinstance(event_body, dict):
# `pull_request: null` / `pull_request: [opened]` shapes —
# no place for a paths filter to live; skip.
continue
for k in _PATHS_KEYS:
if k in event_body:
v = event_body[k]
findings.append(
f"`on.{event}.{k}` filter present: {v!r}"
)
return findings
# --------------------------------------------------------------------------
# Driver
# --------------------------------------------------------------------------
def run() -> int:
"""Main lint entrypoint. Returns the process exit code.
Exit semantics (see module docstring for full table):
0 — clean (no offending paths-filter on any required workflow),
OR protection unreadable (403/404) — surfaced as ::error::
but treated as non-fatal so token-scope issues don't red-X
every PR.
1 — at least one required workflow carries a paths/paths-ignore
filter — the regression class this lint exists to prevent.
"""
_require_runtime_env()
protection_path = f"/repos/{OWNER}/{NAME}/branch_protections/{BRANCH}"
try:
_, protection = api("GET", protection_path)
except ApiError as e:
msg = str(e)
m = re.search(r"HTTP (\d{3})", msg)
http_status = int(m.group(1)) if m else None
if http_status in (403, 404):
sys.stderr.write(
f"::error::GET {protection_path} returned HTTP {http_status}"
f"DRIFT_BOT_TOKEN lacks repo-admin scope (Gitea 1.22.6 "
f"requires it for this endpoint) OR branch '{BRANCH}' has "
f"no protection configured. Cannot enumerate required "
f"checks; skipping lint with exit 0 to avoid red-X on "
f"every PR. Fix: grant repo-admin to mc-drift-bot.\n"
)
return 0
raise
if not isinstance(protection, dict):
sys.stderr.write(
f"::error::protection response for {BRANCH} not a JSON object\n"
)
return 4
contexts: list[str] = list(protection.get("status_check_contexts") or [])
if not contexts:
print(
f"::notice::branch_protections/{BRANCH} has 0 required "
f"status_check_contexts; nothing to lint. (no required contexts)"
)
return 0
print(f"::notice::Linting {len(contexts)} required context(s) for paths-filter regressions:")
for c in contexts:
print(f" - {c}")
offenders: list[tuple[str, Path, list[str]]] = []
unresolved: list[str] = []
for ctx in contexts:
parsed = parse_context(ctx)
if parsed is None:
print(
f"::warning::could not parse context '{ctx}' "
f"(expected `<workflow> / <job> (<event>)`); skipping"
)
unresolved.append(ctx)
continue
workflow_name, _job, _event = parsed
wf_path = resolve_workflow_file(workflow_name)
if wf_path is None:
print(
f"::warning::no workflow file in {WORKFLOWS_DIR} has "
f"`name: {workflow_name}` (required context '{ctx}'); "
f"skipping paths-filter check. "
f"(orphaned-context detection is ci-required-drift's job.)"
)
unresolved.append(ctx)
continue
findings = detect_paths_filters(wf_path)
if findings:
offenders.append((workflow_name, wf_path, findings))
else:
print(f"::notice::OK {wf_path.name} ({workflow_name}) — no paths filter")
if offenders:
print("")
print(f"::error::Found {len(offenders)} required workflow(s) with paths/paths-ignore filters:")
for workflow_name, wf_path, findings in offenders:
for finding in findings:
# ::error file=... lets Gitea Actions surface a per-file
# annotation in the PR UI (when annotations are wired).
print(
f"::error file={wf_path}::Required workflow "
f"'{workflow_name}' ({wf_path.name}) has a paths "
f"filter that would degrade the merge gate to a "
f"silent indefinite pending: {finding}. "
f"See feedback_path_filtered_workflow_cant_be_required. "
f"Fix: remove the filter and instead gate per-step "
f"inside the job with `if: contains(steps.changed.outputs.files, ...)` "
f"or refactor to a single-job-with-per-step-if shape."
)
return 1
print("")
print(
f"::notice::OK — all {len(contexts) - len(unresolved)} resolvable "
f"required workflow(s) clean (no paths/paths-ignore filters)."
)
if unresolved:
print(
f"::notice::{len(unresolved)} required context(s) were not "
f"resolved to a workflow file (warn-not-fail); see warnings above."
)
return 0
if __name__ == "__main__":
sys.exit(run())
-369
View File
@@ -1,369 +0,0 @@
#!/usr/bin/env python3
"""lint-workflow-yaml — catch Gitea-1.22.6-hostile workflow YAML shapes.
This script enforces six structural rules that have historically caused
silent CI failures on Gitea Actions (1.22.6) — workflows that the server's
YAML parser rejects with `[W] ignore invalid workflow ...` and registers
for zero events, or shape conventions that produce ambiguous status
contexts. Each rule maps to a documented incident in saved memory.
Rules (4 fatal + 1 fatal cross-file + 1 heuristic-warn):
1. `workflow_dispatch.inputs:` block — Gitea 1.22.6 mis-parses the
`inputs` keys as sibling event types and rejects the whole file.
Memory: feedback_gitea_workflow_dispatch_inputs_unsupported.
Origin: 2026-05-11 PyPI freeze (publish-runtime).
2. `on: workflow_run:` event — not enumerated in Gitea 1.22.6's
supported event list (verified via modules/actions/workflows.go
enumeration; task #81). Workflow registers, fires for 0 events.
3. `name:` containing `/` — breaks the
`<workflow> / <job> (<event>)` commit-status context convention;
downstream parsers (sop-tier-check, status-reaper) tokenize on `/`.
4. `name:` collision across files — Gitea routes commit-status updates
by `name` and behavior on collision is undefined (status-reaper
rev1 fail-loud).
5. Cross-repo `uses: org/repo/path@ref` — blocked while
`[actions].DEFAULT_ACTIONS_URL=github` is the server default;
resolves to github.com/<org-suspended>/... and 404s.
Memory: feedback_gitea_cross_repo_uses_blocked. Cross-link: task #109.
6. (HEURISTIC, warn-not-fail) Steps reference `https://api.github.com`
or `https://github.com/.../releases/download` without a
workflow-level `env.GITHUB_SERVER_URL` set to the Gitea instance.
Memory: feedback_act_runner_github_server_url.
Per `feedback_smoke_test_vendor_truth_not_shape_match`: fixtures used to
validate this lint must mirror real Gitea 1.22.6 YAML semantics, not
Python yaml-parser quirks. The test suite at tests/test_lint_workflow_yaml.py
includes a vendor-truth fixture (the exact publish-runtime regression).
Usage:
python3 .gitea/scripts/lint-workflow-yaml.py
Lint every `*.yml` in `.gitea/workflows/`.
python3 .gitea/scripts/lint-workflow-yaml.py --workflow-dir <path>
Lint a custom directory (used by tests/test_lint_workflow_yaml.py).
Exit codes:
0 — clean OR only heuristic-warnings emitted.
1 — at least one fatal rule (1-5) violated.
2 — YAML parse error or argv usage error.
"""
from __future__ import annotations
import argparse
import collections
import glob
import os
import re
import sys
from pathlib import Path
from typing import Any, Iterable
try:
import yaml
except ImportError:
print("::error::PyYAML is required. Install with: pip install PyYAML", file=sys.stderr)
sys.exit(2)
# YAML quirk: bare `on:` at the top level parses to the Python `True`
# (because `on` is a YAML 1.1 boolean alias). Handle both keys.
def _get_on(d: dict) -> Any:
if not isinstance(d, dict):
return None
if "on" in d:
return d["on"]
if True in d:
return d[True]
return None
# ---------------------------------------------------------------------------
# Rule 1 — workflow_dispatch.inputs block (Gitea 1.22.6 parser rejects)
# ---------------------------------------------------------------------------
def check_workflow_dispatch_inputs(filename: str, doc: Any) -> list[str]:
"""Return per-violation error lines if `workflow_dispatch.inputs` is set."""
errors: list[str] = []
on = _get_on(doc)
if not isinstance(on, dict):
return errors
wd = on.get("workflow_dispatch")
if isinstance(wd, dict) and wd.get("inputs"):
errors.append(
f"::error file={filename}::Rule 1 (FATAL): "
f"`on.workflow_dispatch.inputs:` block detected. Gitea 1.22.6 "
f"silently rejects the entire workflow with `[W] ignore invalid "
f"workflow: unknown on type: map[...]`. Drop the `inputs:` block "
f"and derive parameters from tag name / env / external query. "
f"Memory: feedback_gitea_workflow_dispatch_inputs_unsupported."
)
return errors
# ---------------------------------------------------------------------------
# Rule 2 — on: workflow_run (not supported on Gitea 1.22.6)
# ---------------------------------------------------------------------------
def check_workflow_run_event(filename: str, doc: Any) -> list[str]:
"""Return per-violation error lines if `on: workflow_run:` is used."""
errors: list[str] = []
on = _get_on(doc)
if isinstance(on, dict) and "workflow_run" in on:
errors.append(
f"::error file={filename}::Rule 2 (FATAL): `on: workflow_run:` "
f"event used. Gitea 1.22.6 does NOT support `workflow_run` "
f"(verified via modules/actions/workflows.go enumeration; "
f"task #81). Workflow will fire for zero events. Use a "
f"`schedule:` cron OR a `push:` trigger with `paths:` filter "
f"on the upstream workflow file as the cross-workflow gate."
)
elif isinstance(on, list) and "workflow_run" in on:
errors.append(
f"::error file={filename}::Rule 2 (FATAL): `on: workflow_run` "
f"in event list. Not supported on Gitea 1.22.6 — task #81."
)
return errors
# ---------------------------------------------------------------------------
# Rule 3 — name: contains "/" (breaks status-context tokenization)
# ---------------------------------------------------------------------------
def check_name_with_slash(filename: str, doc: Any) -> list[str]:
"""Return per-violation error lines if workflow `name:` contains a slash."""
errors: list[str] = []
if not isinstance(doc, dict):
return errors
name = doc.get("name")
if isinstance(name, str) and "/" in name:
errors.append(
f"::error file={filename}::Rule 3 (FATAL): workflow `name: "
f"{name!r}` contains `/`. The commit-status context convention "
f"is `<workflow> / <job> (<event>)`; embedding `/` in the "
f"workflow name makes downstream parsers (sop-tier-check, "
f"status-reaper) tokenize ambiguously. Rename to use `-` or "
f"` ` instead."
)
return errors
# ---------------------------------------------------------------------------
# Rule 4 — cross-file name collision
# ---------------------------------------------------------------------------
def check_name_collision_across_files(
docs_by_file: dict[str, Any],
) -> list[str]:
"""Return per-collision error lines if two files share the same `name:`."""
errors: list[str] = []
by_name: dict[str, list[str]] = collections.defaultdict(list)
for filename, doc in docs_by_file.items():
if isinstance(doc, dict):
n = doc.get("name")
if isinstance(n, str) and n:
by_name[n].append(filename)
for n, files in sorted(by_name.items()):
if len(files) > 1:
errors.append(
f"::error::Rule 4 (FATAL): workflow `name: {n!r}` collision "
f"across {len(files)} files: {files}. Gitea routes "
f"commit-status updates by `name`; collision yields "
f"undefined behavior. Give each workflow a unique `name:`."
)
return errors
# ---------------------------------------------------------------------------
# Rule 5 — cross-repo `uses: org/repo/path@ref`
# ---------------------------------------------------------------------------
# `uses: <foo>@<ref>` — match the value form Gitea/act actually parse.
# We need to distinguish:
# - `actions/checkout@<sha>` OK (bare org/repo@ref, no subpath)
# - `./.gitea/actions/foo` OK (local path)
# - `docker://image:tag` OK (docker-image form)
# - `molecule-ai/molecule-ci/.gitea/actions/audit-force-merge@main` BAD
USES_CROSS_REPO_RE = re.compile(
r"""^
(?P<owner>[A-Za-z0-9_.\-]+)
/
(?P<repo>[A-Za-z0-9_.\-]+)
/ # mandatory subpath separator => cross-repo composite/reusable
(?P<path>[^@\s]+)
@
(?P<ref>\S+)
$""",
re.VERBOSE,
)
def _iter_uses(doc: Any) -> Iterable[str]:
"""Yield every `uses:` string from job steps in a workflow document."""
if not isinstance(doc, dict):
return
jobs = doc.get("jobs")
if not isinstance(jobs, dict):
return
for job in jobs.values():
if not isinstance(job, dict):
continue
# reusable workflow: `uses:` at the job level
if isinstance(job.get("uses"), str):
yield job["uses"]
steps = job.get("steps")
if not isinstance(steps, list):
continue
for step in steps:
if isinstance(step, dict) and isinstance(step.get("uses"), str):
yield step["uses"]
def check_cross_repo_uses(filename: str, doc: Any) -> list[str]:
"""Return per-violation error lines for cross-repo `uses:` references."""
errors: list[str] = []
for uses in _iter_uses(doc):
# Skip docker:// and local ./
if uses.startswith(("docker://", "./", "../")):
continue
m = USES_CROSS_REPO_RE.match(uses.strip())
if m:
errors.append(
f"::error file={filename}::Rule 5 (FATAL): cross-repo "
f"`uses: {uses}` detected. Gitea 1.22.6 with "
f"`[actions].DEFAULT_ACTIONS_URL=github` resolves this to "
f"github.com/{m.group('owner')}/{m.group('repo')} which "
f"404s (org suspended 2026-05-06). Inline the shared bash "
f"into `.gitea/scripts/` until task #109 (actions mirror) "
f"ships. Memory: feedback_gitea_cross_repo_uses_blocked."
)
return errors
# ---------------------------------------------------------------------------
# Rule 6 — heuristic: github.com/api refs without workflow-level
# GITHUB_SERVER_URL (WARN-not-FAIL per halt-condition 3)
# ---------------------------------------------------------------------------
# Match `https://api.github.com/...` (API call) — that's the actionable
# pattern. We intentionally do NOT match `https://github.com/.../releases/
# download/...` (jq-release pin) nor `https://github.com/${{ github.repository
# }}` (OCI label) because those are documented benign references on current
# main and would 100% false-positive (3 hits, per Phase 1 audit).
GITHUB_API_REF_RE = re.compile(
r"https://api\.github\.com\b|https://github\.com/api/",
re.IGNORECASE,
)
def _has_workflow_level_server_url(doc: Any) -> bool:
if not isinstance(doc, dict):
return False
env = doc.get("env")
if isinstance(env, dict) and "GITHUB_SERVER_URL" in env:
return True
return False
def check_github_server_url_missing(filename: str, doc: Any, raw: str) -> list[str]:
"""Return warn-lines (NOT errors) if api.github.com is referenced without
workflow-level GITHUB_SERVER_URL. Heuristic — false-positives possible.
"""
warns: list[str] = []
if not GITHUB_API_REF_RE.search(raw):
return warns
if _has_workflow_level_server_url(doc):
return warns
warns.append(
f"::warning file={filename}::Rule 6 (WARN, heuristic): file "
f"references `https://api.github.com` without a workflow-level "
f"`env.GITHUB_SERVER_URL: https://git.moleculesai.app`. The "
f"act_runner default for `${{{{ github.server_url }}}}` is "
f"github.com, which can break actions that auth-condition on "
f"server_url (e.g. actions/setup-go). If this curl is "
f"intentionally hitting GitHub (e.g. public release pin), ignore. "
f"Memory: feedback_act_runner_github_server_url."
)
return warns
# ---------------------------------------------------------------------------
# Driver
# ---------------------------------------------------------------------------
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(
description="Lint Gitea Actions workflow YAML for 1.22.6-hostile shapes."
)
p.add_argument(
"--workflow-dir",
default=".gitea/workflows",
help="Directory of workflow *.yml files (default: .gitea/workflows).",
)
args = p.parse_args(argv)
wf_dir = Path(args.workflow_dir)
if not wf_dir.exists():
# Empty / missing dir = nothing to lint, not a failure.
print(f"::notice::No workflow directory at {wf_dir}; skipping.")
return 0
yml_paths = sorted(
glob.glob(str(wf_dir / "*.yml")) + glob.glob(str(wf_dir / "*.yaml"))
)
if not yml_paths:
print(f"::notice::No workflow files in {wf_dir}; nothing to lint.")
return 0
fatal_errors: list[str] = []
warnings: list[str] = []
docs_by_file: dict[str, Any] = {}
for path in yml_paths:
rel = os.path.relpath(path)
try:
raw = Path(path).read_text()
doc = yaml.safe_load(raw)
except yaml.YAMLError as e:
fatal_errors.append(
f"::error file={rel}::YAML parse error: {e}. Cannot lint "
f"a file the parser rejects."
)
continue
docs_by_file[rel] = doc
# Per-file checks
fatal_errors.extend(check_workflow_dispatch_inputs(rel, doc))
fatal_errors.extend(check_workflow_run_event(rel, doc))
fatal_errors.extend(check_name_with_slash(rel, doc))
fatal_errors.extend(check_cross_repo_uses(rel, doc))
warnings.extend(check_github_server_url_missing(rel, doc, raw))
# Cross-file checks
fatal_errors.extend(check_name_collision_across_files(docs_by_file))
# Emit warnings first (non-blocking)
for w in warnings:
print(w)
if not fatal_errors:
n = len(yml_paths)
print(
f"::notice::lint-workflow-yaml: {n} workflow file(s) checked, "
f"no fatal Gitea-1.22.6-hostile shapes. "
f"({len(warnings)} heuristic warning(s) emitted.)"
)
return 0
# Emit fatal errors
print(
f"::error::lint-workflow-yaml: {len(fatal_errors)} fatal violation(s) "
f"across {len(yml_paths)} workflow file(s). See rule documentation "
f"in .gitea/scripts/lint-workflow-yaml.py docstring."
)
for e in fatal_errors:
print(e)
return 1
if __name__ == "__main__":
sys.exit(main())
-361
View File
@@ -1,361 +0,0 @@
#!/usr/bin/env python3
"""lint_mask_pr_atomicity — Tier 2d structural enforcement per internal#350.
Rule
----
A PR whose diff touches `.gitea/workflows/ci.yml` AND modifies EITHER:
- any `continue-on-error:` value, OR
- the `all-required` sentinel job's `needs:` block
must EITHER:
- Touch BOTH atomically in the same PR (preferred), OR
- Cross-link the paired PR via a literal `Paired: #NNN` reference in
the PR body OR in any commit message between BASE_SHA and HEAD_SHA.
The class this prevents
-----------------------
PR#665 (interim `continue-on-error: true` on `platform-build`) and
PR#668 (sentinel-`needs` demotion of the same job) were designed as a
pair but merged solo — #665 landed at 04:47Z 2026-05-12, #668 was still
open at 05:07Z when the main-red watchdog (#674) fired. Result: ~20
minutes of `main` red and a cascade of false-positives on unrelated PRs.
The lint operates on the YAML AST (PyYAML), not grep, per
`feedback_behavior_based_ast_gates`: a refactor that moves `continue-on-error`
between job keys, or renames the `all-required` job, would still be
detected because we walk the parsed structure.
Why this works on Gitea 1.22.6
------------------------------
We don't use any 1.22.6-missing endpoints (no `/actions/runs/*`, no
`branch_protections/*` — Tier 2f/g need those; Tier 2d does not). All
required inputs come from the workflow `pull_request` event payload
(BASE_SHA, HEAD_SHA, PR_BODY) and from local git via `git show`/`git log`.
The auto-injected `GITHUB_TOKEN` is enough; we don't need
DRIFT_BOT_TOKEN.
Exit codes
----------
0 — ci.yml not in diff, OR diff is no-op for the rule predicates,
OR atomicity satisfied (both touched), OR a valid `Paired: #NNN`
reference is present.
1 — exactly ONE of {coe, sentinel-needs} touched AND no valid
`Paired: #NNN` reference. The split-pair regression class.
2 — env contract violation (BASE_SHA / HEAD_SHA missing) or YAML
parse error on either side.
Env
---
BASE_SHA — PR base (pull_request.base.sha)
HEAD_SHA — PR head (pull_request.head.sha)
PR_BODY — pull_request.body (may be empty)
CI_WORKFLOW_PATH — defaults to `.gitea/workflows/ci.yml`
SENTINEL_JOB_KEY — defaults to `all-required`
Memory cross-links
------------------
- internal#350 (the RFC that specs this lint)
- PR#665 / PR#668 (the empirical split-pair)
- mc#664 (the main-red incident)
- feedback_strict_root_only_after_class_a
- feedback_behavior_based_ast_gates
"""
from __future__ import annotations
import os
import re
import subprocess
import sys
from typing import Any
try:
import yaml
except ImportError:
sys.stderr.write(
"::error::PyYAML is required. Install with: pip install PyYAML\n"
)
sys.exit(2)
# ---------------------------------------------------------------------------
# YAML quirk: bare `on:` at the top level becomes Python `True` because
# `on` is a YAML 1.1 boolean. Not used here but documented for future
# editors who copy from this module.
# ---------------------------------------------------------------------------
# `Paired: #NNN` reference. `#` is mandatory, NNN must be digits. Any
# surrounding markdown/whitespace is fine. The match is case-sensitive
# on `Paired:` because lower-case `paired:` collides with conversational
# prose ("paired: see comment above") and the convention is the exact
# capitalisation.
PAIRED_RE = re.compile(r"\bPaired:\s*#(?P<num>\d+)\b")
# ---------------------------------------------------------------------------
# Env contract
# ---------------------------------------------------------------------------
def _env(key: str, default: str | None = None) -> str:
v = os.environ.get(key, default)
return v if v is not None else ""
def _require_env(key: str) -> str:
v = os.environ.get(key)
if not v:
sys.stderr.write(f"::error::missing required env var: {key}\n")
sys.exit(2)
return v
# ---------------------------------------------------------------------------
# git-show helper. Returns None when the path doesn't exist on that side
# (new file, deleted file, or rename — git returns exit 128 with "fatal:
# path not in tree"). We treat None as "no rule predicate triggered on
# that side".
# ---------------------------------------------------------------------------
def git_show(sha: str, path: str) -> str | None:
r = subprocess.run(
["git", "show", f"{sha}:{path}"],
capture_output=True,
text=True,
)
if r.returncode != 0:
return None
return r.stdout
def git_log_messages(base_sha: str, head_sha: str) -> str:
r = subprocess.run(
["git", "log", "--format=%B", f"{base_sha}..{head_sha}"],
capture_output=True,
text=True,
)
if r.returncode != 0:
return ""
return r.stdout
def git_diff_paths(base_sha: str, head_sha: str) -> list[str]:
r = subprocess.run(
["git", "diff", "--name-only", f"{base_sha}..{head_sha}"],
capture_output=True,
text=True,
)
if r.returncode != 0:
return []
return [p for p in r.stdout.splitlines() if p.strip()]
# ---------------------------------------------------------------------------
# Predicate 1 — any `continue-on-error` value changed between base and head
# ---------------------------------------------------------------------------
def _collect_coe(doc: Any) -> dict[str, Any]:
"""Walk every job in `jobs.*` and collect its continue-on-error value.
Returns a dict {job_key: coe_value}. Missing keys are absent from
the dict (NOT `False` — distinguishes "added the key" from
"unchanged absent"). Job-step `continue-on-error` is NOT considered
— only job-level, because that's the value that masks job status
rollup, which is the class this lint targets.
"""
out: dict[str, Any] = {}
if not isinstance(doc, dict):
return out
jobs = doc.get("jobs")
if not isinstance(jobs, dict):
return out
for k, j in jobs.items():
if not isinstance(j, dict):
continue
if "continue-on-error" in j:
out[k] = j["continue-on-error"]
return out
def coe_changed(base_doc: Any, head_doc: Any) -> tuple[bool, list[str]]:
"""Return (changed?, [reasons]) describing per-job coe diffs."""
base = _collect_coe(base_doc)
head = _collect_coe(head_doc)
reasons: list[str] = []
all_keys = set(base) | set(head)
for k in sorted(all_keys):
b = base.get(k, "<absent>")
h = head.get(k, "<absent>")
if b != h:
reasons.append(f"job '{k}' continue-on-error: {b!r}{h!r}")
return (bool(reasons), reasons)
# ---------------------------------------------------------------------------
# Predicate 2 — sentinel job's `needs:` changed
# ---------------------------------------------------------------------------
def _collect_needs(doc: Any, sentinel_key: str) -> list[str] | None:
"""Return the sentinel job's needs list (sorted) or None if absent."""
if not isinstance(doc, dict):
return None
jobs = doc.get("jobs")
if not isinstance(jobs, dict):
return None
j = jobs.get(sentinel_key)
if not isinstance(j, dict):
return None
needs = j.get("needs")
if needs is None:
return []
if isinstance(needs, str):
return [needs]
if isinstance(needs, list):
# Sort because `needs:` is order-insensitive at the engine
# level; a reorder is not a semantic change and shouldn't
# trip the lint.
return sorted(str(x) for x in needs)
return None
def sentinel_needs_changed(
base_doc: Any, head_doc: Any, sentinel_key: str
) -> tuple[bool, str]:
"""Return (changed?, reason)."""
base = _collect_needs(base_doc, sentinel_key)
head = _collect_needs(head_doc, sentinel_key)
if base == head:
return (False, "")
return (
True,
f"sentinel '{sentinel_key}'.needs: {base!r}{head!r}",
)
# ---------------------------------------------------------------------------
# Predicate 3 — `Paired: #NNN` present in body or any commit message
# ---------------------------------------------------------------------------
def find_paired_refs(pr_body: str, commit_log: str) -> list[str]:
"""Return list of `#NNN` strings found (deduped, sorted)."""
found: set[str] = set()
for src in (pr_body, commit_log):
for m in PAIRED_RE.finditer(src or ""):
found.add(m.group("num"))
return sorted(found)
# ---------------------------------------------------------------------------
# Driver
# ---------------------------------------------------------------------------
def _parse(content: str | None, label: str) -> Any:
if content is None:
return None
try:
return yaml.safe_load(content)
except yaml.YAMLError as e:
sys.stderr.write(f"::error::YAML parse error on {label}: {e}\n")
sys.exit(2)
def run() -> int:
base_sha = _require_env("BASE_SHA")
head_sha = _require_env("HEAD_SHA")
pr_body = _env("PR_BODY", "")
ci_path = _env("CI_WORKFLOW_PATH", ".gitea/workflows/ci.yml")
sentinel_key = _env("SENTINEL_JOB_KEY", "all-required")
# Step 0 — is ci.yml even in the diff? If not, the lint doesn't apply.
changed_paths = git_diff_paths(base_sha, head_sha)
if ci_path not in changed_paths:
print(
f"::notice::{ci_path} not in PR diff; lint-mask-pr-atomicity "
f"skipped (no atomicity risk)."
)
return 0
base_yml = git_show(base_sha, ci_path)
head_yml = git_show(head_sha, ci_path)
base_doc = _parse(base_yml, f"{ci_path}@{base_sha}")
head_doc = _parse(head_yml, f"{ci_path}@{head_sha}")
# If the file is newly added (no base), no flip is possible — every
# value is "newly introduced", not "changed". Tier 2e covers the
# tracking-issue check for new continue-on-error: true. Exit 0.
if base_doc is None:
print(
f"::notice::{ci_path} newly added in this PR; no flip to "
f"analyse — lint-mask-pr-atomicity skipped."
)
return 0
# If the file is deleted on head, ditto — no atomicity question.
if head_doc is None:
print(
f"::notice::{ci_path} deleted in this PR; "
f"lint-mask-pr-atomicity skipped."
)
return 0
coe_yes, coe_reasons = coe_changed(base_doc, head_doc)
needs_yes, needs_reason = sentinel_needs_changed(
base_doc, head_doc, sentinel_key
)
if not coe_yes and not needs_yes:
print(
f"::notice::{ci_path} touched but neither continue-on-error "
f"nor sentinel '{sentinel_key}'.needs changed — no atomicity "
f"risk. OK."
)
return 0
if coe_yes and needs_yes:
print(
f"::notice::Atomic change detected: both continue-on-error "
f"AND sentinel '{sentinel_key}'.needs touched in same PR. OK."
)
for r in coe_reasons:
print(f" - {r}")
print(f" - {needs_reason}")
return 0
# Exactly one side touched — require Paired: #NNN reference.
commit_log = git_log_messages(base_sha, head_sha)
paired = find_paired_refs(pr_body, commit_log)
one_side = "continue-on-error" if coe_yes else f"sentinel '{sentinel_key}'.needs"
other_side = (
f"sentinel '{sentinel_key}'.needs" if coe_yes else "continue-on-error"
)
if paired:
print(
f"::notice::Split-pair detected ({one_side} changed without "
f"{other_side}), but Paired reference(s) present: "
f"{', '.join('#' + n for n in paired)}. OK."
)
for r in coe_reasons:
print(f" - {r}")
if needs_reason:
print(f" - {needs_reason}")
return 0
# The failure mode this lint exists to prevent.
print(
f"::error file={ci_path}::lint-mask-pr-atomicity (Tier 2d): "
f"PR touches {one_side} in {ci_path} but NOT {other_side}, "
f"and no `Paired: #NNN` reference was found in the PR body or "
f"in commit messages between {base_sha[:8]}..{head_sha[:8]}. "
f"This is the PR#665+#668 split-pair regression class "
f"(see internal#350, mc#664). FIX: either (a) include the "
f"matching {other_side} change in the same PR (preferred), or "
f"(b) add `Paired: #NNN` (literal, capital P, with `#`) to the "
f"PR body or a commit message referencing the paired PR."
)
for r in coe_reasons:
print(f" - {r}")
if needs_reason:
print(f" - {needs_reason}")
return 1
if __name__ == "__main__":
sys.exit(run())
+3 -20
View File
@@ -222,20 +222,9 @@ def is_red(status: dict) -> tuple[bool, list[dict]]:
combined = status.get("state")
statuses = status.get("statuses") or []
red_states = {"failure", "error"}
# Schema asymmetry: top-level combined uses `state`, but per-entry
# items in `statuses[]` use `status` in Gitea 1.22.6. Prefer
# `status`; fall back to `state` defensively. Verified empirically
# 2026-05-12 03:42Z. Pre-rev4 code only read `state` from per-entry
# items → failed[] always empty → render_body always showed the
# "no per-context entries were in a red state" fallback even when
# the combined-state correctly flagged red. See
# `feedback_smoke_test_vendor_truth_not_shape_match`.
def _entry_state(s: dict) -> str:
return s.get("status") or s.get("state") or ""
failed = [
s for s in statuses
if isinstance(s, dict) and _entry_state(s) in red_states
if isinstance(s, dict) and s.get("state") in red_states
]
return (combined in red_states or bool(failed), failed)
@@ -324,9 +313,7 @@ def render_body(sha: str, failed: list[dict], debug: dict) -> str:
else:
for s in failed:
ctx = s.get("context", "(no context)")
# Per-entry key is `status` in Gitea 1.22.6, not `state`
# (see _entry_state in is_red). Fallback for forward-compat.
state = s.get("status") or s.get("state") or "(no state)"
state = s.get("state", "(no state)")
url = s.get("target_url") or ""
desc = (s.get("description") or "").strip()
entry = f"- **{ctx}** — `{state}`"
@@ -559,11 +546,7 @@ def run_once(*, dry_run: bool = False) -> int:
"combined_state": status.get("state"),
"failed_contexts": [s.get("context") for s in failed],
"all_contexts": [
# Per-entry key is `status` in Gitea 1.22.6, not `state`.
# Pre-rev4 debug output reported `state: None` for every
# context, making run logs useless for triage.
{"context": s.get("context"),
"state": s.get("status") or s.get("state")}
{"context": s.get("context"), "state": s.get("state")}
for s in (status.get("statuses") or [])
if isinstance(s, dict)
],
-823
View File
@@ -1,823 +0,0 @@
#!/usr/bin/env python3
# sop-checklist-gate — evaluate whether a PR has peer-acked each
# SOP-checklist item. Posts a commit-status that branch protection
# can require.
#
# RFC#351 Step 2 of 6 (implementation MVP).
#
# Invoked by .gitea/workflows/sop-checklist-gate.yml on:
# - pull_request_target: [opened, edited, synchronize, reopened]
# - issue_comment: [created, edited, deleted]
#
# Flow:
# 1. Load .gitea/sop-checklist-config.yaml (from BASE ref — trusted).
# 2. GET /repos/{R}/pulls/{N} — author, head.sha, tier label
# 3. GET /repos/{R}/issues/{N}/comments — extract /sop-ack and /sop-revoke
# 4. For each checklist item:
# a. Is the section marker present in PR body? (author answered)
# b. Is there ≥1 unrevoked /sop-ack from a non-author whose
# team-membership matches required_teams?
# 5. POST /repos/{R}/statuses/{sha} — context
# `sop-checklist / all-items-acked (pull_request)`,
# state=success | failure | pending, description=`acked: N/M …`.
#
# Trust boundary (mirrors RFC#324 §A4):
# This script is loaded from the BASE branch. The workflow's
# actions/checkout step pins ref=base.sha. PR-HEAD code is never
# executed. We only HTTP-call the Gitea API.
#
# Token scope:
# - read:repository / read:organization to enumerate PR + comments
# + team membership (Gitea 1.22.6 quirk: team-membership endpoint
# returns 403 if token owner is not in the team; see review-check.sh
# for the same gotcha — we surface the same fail-closed message).
# - write:repository for `POST /repos/{R}/statuses/{sha}`. Unlike
# RFC#324's pattern (which uses the JOB's own pass/fail as the
# status), we POST the status explicitly because the gate posts
# a single multi-item status with a richer description than a
# bare success/failure context can carry.
#
# Slug normalization rules (canonical form: kebab-case):
# - Lowercase
# - Whitespace + underscores → single dash
# - Strip non [a-z0-9-] characters
# - Collapse adjacent dashes
# - Strip leading/trailing dashes
# - If the result is a digit string (e.g. "1"), look up via
# config.items[*].numeric_alias to get the kebab-case slug.
#
# Examples:
# "Comprehensive_Testing" → "comprehensive-testing"
# "comprehensive testing" → "comprehensive-testing"
# "1" → "comprehensive-testing"
# "Five-Axis-Review" → "five-axis-review"
#
# Revoke semantics:
# /sop-revoke <slug> [reason] — most-recent comment per (slug, user)
# wins. So if Alice posts /sop-ack X then later /sop-revoke X, her ack
# for X is invalidated. Bob's prior /sop-ack X is unaffected. If Alice
# posts /sop-revoke X then later /sop-ack X again, the ack is restored.
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
# ---------------------------------------------------------------------------
# Slug normalization
# ---------------------------------------------------------------------------
_NORMALIZE_REPLACE_RE = re.compile(r"[\s_]+")
_NORMALIZE_STRIP_RE = re.compile(r"[^a-z0-9-]")
_NORMALIZE_DASH_RE = re.compile(r"-+")
def normalize_slug(raw: str, numeric_aliases: dict[int, str] | None = None) -> str:
"""Normalize a user-supplied slug to canonical kebab-case form.
See module header for the rules.
If the input is a pure digit string AND numeric_aliases is provided,
the alias mapping is consulted. Unknown digits return "" so the caller
can flag the comment as unparseable.
"""
if raw is None:
return ""
s = raw.strip().lower()
s = _NORMALIZE_REPLACE_RE.sub("-", s)
s = _NORMALIZE_STRIP_RE.sub("", s)
s = _NORMALIZE_DASH_RE.sub("-", s)
s = s.strip("-")
if s.isdigit() and numeric_aliases is not None:
return numeric_aliases.get(int(s), "")
return s
# ---------------------------------------------------------------------------
# Comment parsing — /sop-ack and /sop-revoke
# ---------------------------------------------------------------------------
# A directive must be on its own line. Permits leading whitespace.
# Optional trailing note after the slug for /sop-ack and required reason
# for /sop-revoke (RFC#351 open question 4 — reason is captured but not
# yet validated; future iteration may require a min-length).
_DIRECTIVE_RE = re.compile(
r"^[ \t]*/(sop-ack|sop-revoke)[ \t]+([A-Za-z0-9_\- ]+?)(?:[ \t]+(.*))?[ \t]*$",
re.MULTILINE,
)
def parse_directives(
comment_body: str,
numeric_aliases: dict[int, str],
) -> list[tuple[str, str, str]]:
"""Extract /sop-ack and /sop-revoke directives from a comment body.
Returns a list of (kind, canonical_slug, note) tuples where:
kind is "sop-ack" or "sop-revoke"
canonical_slug is the normalized form (or "" if unparseable)
note is the trailing free-text (may be "")
"""
out: list[tuple[str, str, str]] = []
if not comment_body:
return out
for m in _DIRECTIVE_RE.finditer(comment_body):
kind = m.group(1)
raw_slug = (m.group(2) or "").strip()
# If the raw match included trailing words, the regex non-greedy
# captured only the first token; strip again for safety.
# We split on whitespace to keep the FIRST word as the slug, and
# everything after as the note.
parts = raw_slug.split()
if not parts:
continue
first = parts[0]
# If the slug-capture greedily matched multiple words (e.g.
# "comprehensive testing"), preserve normalize behavior: join
# the WHOLE first-word-token only; trailing words get appended to
# the note. The regex limits group(2) to [A-Za-z0-9_\- ] so we
# may have multi-word forms here — normalize handles them.
if len(parts) > 1:
# User wrote "/sop-ack comprehensive testing extra-note"
# → treat "comprehensive testing" as the slug source if it
# normalizes to a known item; otherwise treat "comprehensive"
# as slug and "testing extra-note" as note. We defer the
# disambiguation to the caller via the returned canonical
# slug. For simplicity: try the WHOLE captured string first.
canonical = normalize_slug(raw_slug, numeric_aliases)
else:
canonical = normalize_slug(first, numeric_aliases)
note_from_group = (m.group(3) or "").strip()
# If we collapsed multi-word slug into kebab and there's a
# trailing-text group too, append it.
out.append((kind, canonical, note_from_group))
return out
# ---------------------------------------------------------------------------
# PR body section detection
# ---------------------------------------------------------------------------
def section_marker_present(body: str, marker: str) -> bool:
"""Return True if `marker` appears in `body` case-insensitively
on a non-empty line (i.e. the author actually filled it in).
We require the marker substring AND non-whitespace content on the
same line OR within the next line — this prevents trivially-empty
checklists like:
## SOP-Checklist
- [ ] **Comprehensive testing performed**:
- [ ] **Local-postgres E2E run**:
from auto-passing the section-present check. The peer-ack is still
required, but answering with empty content is captured as a soft
finding via the section-present test alone.
"""
if not body or not marker:
return False
body_lower = body.lower()
marker_lower = marker.lower()
idx = body_lower.find(marker_lower)
if idx < 0:
return False
# Walk to end of line.
line_end = body.find("\n", idx)
if line_end < 0:
line_end = len(body)
line = body[idx + len(marker):line_end]
# Strip the colon + checkbox tail patterns; require at least one
# non-whitespace, non-punctuation char.
stripped = re.sub(r"[\s\*:\-\[\]]+", "", line)
if stripped:
return True
# Fall through: check the NEXT line (multi-line answers).
next_line_end = body.find("\n", line_end + 1)
if next_line_end < 0:
next_line_end = len(body)
next_line = body[line_end + 1:next_line_end]
stripped_next = re.sub(r"[\s\*:\-\[\]]+", "", next_line)
return bool(stripped_next)
# ---------------------------------------------------------------------------
# Ack-state computation
# ---------------------------------------------------------------------------
def compute_ack_state(
comments: list[dict[str, Any]],
pr_author: str,
items_by_slug: dict[str, dict[str, Any]],
numeric_aliases: dict[int, str],
team_membership_probe: "callable[[str, list[str]], list[str]]",
) -> dict[str, dict[str, Any]]:
"""Compute per-item ack state.
Each comment is processed in chronological order. The most-recent
directive per (commenter, slug) wins.
Returns a dict keyed by canonical slug:
{
"comprehensive-testing": {
"ackers": ["bob"], # non-author, team-verified
"rejected_ackers": { # debugging info
"self_ack": ["alice"],
"unknown_slug": [],
"not_in_team": ["eve"],
}
},
...
}
"""
# Step 1: collapse directives per (commenter, slug) — most recent wins.
# comments are expected to come in chronological order from the
# API (Gitea returns oldest-first by default for issues/{N}/comments).
latest_directive: dict[tuple[str, str], str] = {} # (user, slug) → kind
unparseable_per_user: dict[str, int] = {}
for c in comments:
body = c.get("body", "") or ""
user = (c.get("user") or {}).get("login", "")
if not user:
continue
for kind, slug, _note in parse_directives(body, numeric_aliases):
if not slug:
unparseable_per_user[user] = unparseable_per_user.get(user, 0) + 1
continue
latest_directive[(user, slug)] = kind
# Step 2: build candidate ackers per slug.
# Filter out self-acks and unknown slugs.
ackers_per_slug: dict[str, list[str]] = {s: [] for s in items_by_slug}
rejected_self: dict[str, list[str]] = {s: [] for s in items_by_slug}
rejected_unknown: dict[str, list[str]] = {s: [] for s in items_by_slug}
pending_team_check: dict[str, list[str]] = {s: [] for s in items_by_slug}
for (user, slug), kind in latest_directive.items():
if kind != "sop-ack":
continue # revokes leave the (user,slug) state as "no ack"
if slug not in items_by_slug:
# Slug normalized to something not in our config — store
# under a synthetic key for diagnostic surfacing. Don't add
# to any item.
continue
if user == pr_author:
rejected_self[slug].append(user)
continue
pending_team_check[slug].append(user)
# Step 3: team membership probe per slug (batched per slug to keep
# API call count down — same user may ack multiple items but the
# required_teams differ per item, so we MUST probe per (user, item)).
rejected_not_in_team: dict[str, list[str]] = {s: [] for s in items_by_slug}
for slug, candidates in pending_team_check.items():
if not candidates:
continue
required = items_by_slug[slug]["required_teams"]
approved = team_membership_probe(slug, candidates) # returns subset
rejected_not_in_team[slug] = [u for u in candidates if u not in approved]
ackers_per_slug[slug] = approved
# Stash required teams for description rendering.
items_by_slug[slug]["_required_resolved"] = required
return {
slug: {
"ackers": ackers_per_slug[slug],
"rejected": {
"self_ack": rejected_self[slug],
"not_in_team": rejected_not_in_team[slug],
},
}
for slug in items_by_slug
}
# ---------------------------------------------------------------------------
# Gitea API client
# ---------------------------------------------------------------------------
class GiteaClient:
def __init__(self, host: str, token: str):
self.base = f"https://{host}/api/v1"
self.token = token
# Cache team-name → team-id resolutions per org.
self._team_id_cache: dict[tuple[str, str], int | None] = {}
def _req(
self,
method: str,
path: str,
body: dict[str, Any] | None = None,
ok_codes: tuple[int, ...] = (200, 201, 204),
) -> tuple[int, Any]:
url = self.base + path
data = None
headers = {
"Authorization": f"token {self.token}",
"Accept": "application/json",
}
if body is not None:
data = json.dumps(body).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, method=method, data=data, headers=headers)
try:
with urllib.request.urlopen(req, timeout=20) as r:
raw = r.read()
code = r.getcode()
except urllib.error.HTTPError as e:
code = e.code
raw = e.read()
try:
parsed = json.loads(raw.decode("utf-8")) if raw else None
except json.JSONDecodeError:
parsed = raw.decode("utf-8", errors="replace") if raw else None
return code, parsed
def get_pr(self, owner: str, repo: str, pr: int) -> dict[str, Any]:
code, data = self._req("GET", f"/repos/{owner}/{repo}/pulls/{pr}")
if code != 200:
raise RuntimeError(f"GET pulls/{pr} → HTTP {code}: {data!r}")
return data
def get_issue_comments(
self, owner: str, repo: str, issue: int
) -> list[dict[str, Any]]:
# Paginate. Gitea default page size 50.
out: list[dict[str, Any]] = []
page = 1
while True:
code, data = self._req(
"GET",
f"/repos/{owner}/{repo}/issues/{issue}/comments?limit=50&page={page}",
)
if code != 200:
raise RuntimeError(
f"GET issues/{issue}/comments page={page} → HTTP {code}: {data!r}"
)
if not data:
break
out.extend(data)
if len(data) < 50:
break
page += 1
return out
def resolve_team_id(self, org: str, team_name: str) -> int | None:
key = (org, team_name)
if key in self._team_id_cache:
return self._team_id_cache[key]
code, data = self._req("GET", f"/orgs/{org}/teams/search?q={urllib.parse.quote(team_name)}")
team_id = None
if code == 200 and isinstance(data, dict):
for t in data.get("data", []):
if t.get("name") == team_name:
team_id = t.get("id")
break
if team_id is None and code == 200 and isinstance(data, list):
for t in data:
if t.get("name") == team_name:
team_id = t.get("id")
break
self._team_id_cache[key] = team_id
return team_id
def is_team_member(self, team_id: int, login: str) -> bool | None:
"""Return True / False / None (unknown — 403 from API)."""
code, _ = self._req(
"GET", f"/teams/{team_id}/members/{urllib.parse.quote(login)}"
)
if code in (200, 204):
return True
if code == 404:
return False
# 403 means the token owner isn't in this team, so the API
# refuses to confirm membership. Fail-closed at the caller.
return None
def post_status(
self,
owner: str,
repo: str,
sha: str,
state: str,
context: str,
description: str,
target_url: str = "",
) -> None:
body = {
"state": state,
"context": context,
"description": description[:140], # Gitea truncates to 255 but be safe
"target_url": target_url or "",
}
code, data = self._req(
"POST",
f"/repos/{owner}/{repo}/statuses/{sha}",
body=body,
ok_codes=(201,),
)
if code not in (200, 201):
raise RuntimeError(
f"POST statuses/{sha} → HTTP {code}: {data!r}"
)
# ---------------------------------------------------------------------------
# Config loader (PyYAML-free — config file is intentionally tiny + flat)
# ---------------------------------------------------------------------------
def load_config(path: str) -> dict[str, Any]:
"""Load .gitea/sop-checklist-config.yaml.
Uses PyYAML if available, otherwise falls back to a built-in
minimal parser sufficient for our flat config shape. Bundling
PyYAML on the runner is one apt install away but we avoid the
dep by keeping the config shape constrained.
"""
try:
import yaml # type: ignore[import-not-found]
with open(path) as f:
return yaml.safe_load(f)
except ImportError:
return _load_config_minimal(path)
def _load_config_minimal(path: str) -> dict[str, Any]:
"""Minimal YAML subset parser for our config shape.
Supports: top-level scalar:value, top-level map-of-map (e.g.
tier_failure_mode), top-level list of maps (items:), and within an
item map: scalars + lists of scalars. Does NOT support nested lists,
YAML anchors, multi-doc, or flow style.
"""
with open(path) as f:
lines = f.readlines()
return _parse_minimal_yaml(lines)
def _parse_minimal_yaml(lines: list[str]) -> dict[str, Any]: # noqa: C901
"""Hand-rolled subset parser. See _load_config_minimal docstring."""
# Strip comments + blank lines but preserve indentation.
cleaned: list[tuple[int, str]] = []
for raw in lines:
# Don't strip a "#" that is inside a quoted value.
body = raw.rstrip("\n")
# Remove trailing comment.
idx = body.find("#")
if idx >= 0 and (idx == 0 or body[idx - 1] in " \t"):
body = body[:idx].rstrip()
if not body.strip():
continue
indent = len(body) - len(body.lstrip(" "))
cleaned.append((indent, body.strip()))
root: dict[str, Any] = {}
i = 0
n = len(cleaned)
def parse_scalar(s: str) -> Any:
s = s.strip()
if s.startswith('"') and s.endswith('"'):
return s[1:-1]
if s.startswith("'") and s.endswith("'"):
return s[1:-1]
if s.lower() in ("true", "yes"):
return True
if s.lower() in ("false", "no"):
return False
try:
return int(s)
except ValueError:
pass
return s
def parse_inline_list(s: str) -> list[Any]:
s = s.strip()
if not (s.startswith("[") and s.endswith("]")):
return [parse_scalar(s)]
inner = s[1:-1]
if not inner.strip():
return []
return [parse_scalar(x.strip()) for x in inner.split(",")]
while i < n:
indent, line = cleaned[i]
if indent != 0:
i += 1
continue
if ":" not in line:
i += 1
continue
key, _, rest = line.partition(":")
key = key.strip()
rest = rest.strip()
if rest == "":
# Block — could be map or list.
i += 1
# Look ahead for first child.
if i < n and cleaned[i][1].startswith("- "):
# List of items.
items: list[Any] = []
while i < n and cleaned[i][0] > indent and cleaned[i][1].startswith("- "):
item_indent = cleaned[i][0]
first_kv = cleaned[i][1][2:].strip() # strip "- "
item: dict[str, Any] = {}
if ":" in first_kv:
k, _, v = first_kv.partition(":")
k = k.strip()
v = v.strip()
if v == "":
item[k] = ""
elif v.startswith(">-") or v.startswith(">"):
# Folded scalar continues on subsequent indented lines
collected: list[str] = []
i += 1
while i < n and cleaned[i][0] > item_indent:
collected.append(cleaned[i][1])
i += 1
item[k] = " ".join(collected)
items.append(item)
continue
elif v.startswith("["):
item[k] = parse_inline_list(v)
else:
item[k] = parse_scalar(v)
i += 1
# Subsequent k:v lines at deeper indent belong to this item.
while i < n and cleaned[i][0] > item_indent and not cleaned[i][1].startswith("- "):
sub_indent, sub_line = cleaned[i]
if ":" in sub_line:
k, _, v = sub_line.partition(":")
k = k.strip()
v = v.strip()
if v == "":
item[k] = ""
i += 1
elif v.startswith(">-") or v.startswith(">"):
collected = []
i += 1
while i < n and cleaned[i][0] > sub_indent:
collected.append(cleaned[i][1])
i += 1
item[k] = " ".join(collected)
elif v.startswith("["):
item[k] = parse_inline_list(v)
i += 1
else:
item[k] = parse_scalar(v)
i += 1
else:
i += 1
items.append(item)
root[key] = items
else:
# Sub-map.
submap: dict[str, Any] = {}
while i < n and cleaned[i][0] > indent:
sub_indent, sub_line = cleaned[i]
if ":" in sub_line:
k, _, v = sub_line.partition(":")
k = k.strip().strip('"').strip("'")
v = v.strip()
if v.startswith("[") and v.endswith("]"):
submap[k] = parse_inline_list(v)
else:
submap[k] = parse_scalar(v)
i += 1
root[key] = submap
else:
# Inline scalar or list.
if rest.startswith("[") and rest.endswith("]"):
root[key] = parse_inline_list(rest)
else:
root[key] = parse_scalar(rest)
i += 1
return root
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def render_status(
items: list[dict[str, Any]],
ack_state: dict[str, dict[str, Any]],
body_state: dict[str, bool],
) -> tuple[str, str]:
"""Return (state, description) for the commit-status post.
state is "success" if every item has at least one valid ack
(body section presence is informational only — peer-ack is the
real gate). "pending" is reserved for the soft-fail path
(tier:low) and is set by the caller.
"""
n = len(items)
fully_acked = [
it["slug"] for it in items if ack_state[it["slug"]]["ackers"]
]
missing = [
it["slug"] for it in items if not ack_state[it["slug"]]["ackers"]
]
missing_body = [it["slug"] for it in items if not body_state.get(it["slug"], False)]
desc_parts = [f"acked: {len(fully_acked)}/{n}"]
if missing:
# Show up to 3 missing slugs to stay inside the 140-char budget.
shown = ", ".join(missing[:3])
if len(missing) > 3:
shown += f", +{len(missing) - 3}"
desc_parts.append(f"missing: {shown}")
if missing_body:
desc_parts.append(f"body-unfilled: {len(missing_body)}")
state = "success" if not missing else "failure"
return state, "".join(desc_parts)
def get_tier_mode(pr: dict[str, Any], cfg: dict[str, Any]) -> str:
"""Read tier label, return 'hard' or 'soft' per cfg.tier_failure_mode."""
labels = pr.get("labels") or []
tier_labels = [l.get("name", "") for l in labels if (l.get("name", "") or "").startswith("tier:")]
mode_map = cfg.get("tier_failure_mode") or {}
default_mode = cfg.get("default_mode", "hard")
for tl in tier_labels:
if tl in mode_map:
return mode_map[tl]
return default_mode
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser()
p.add_argument("--owner", required=True)
p.add_argument("--repo", required=True)
p.add_argument("--pr", type=int, required=True)
p.add_argument("--config", default=".gitea/sop-checklist-config.yaml")
p.add_argument("--gitea-host", default="git.moleculesai.app")
p.add_argument(
"--dry-run",
action="store_true",
help="Compute state but do not POST the status.",
)
p.add_argument(
"--status-context",
default="sop-checklist / all-items-acked (pull_request)",
)
p.add_argument(
"--exit-on-state",
action="store_true",
help=(
"If set, exit non-zero when state=failure. Default OFF so the "
"job-level conclusion is independent of ack-state — the only "
"thing BP sees is the POSTed status. Useful for local debugging."
),
)
args = p.parse_args(argv)
token = os.environ.get("GITEA_TOKEN", "")
if not token and not args.dry_run:
print("::error::GITEA_TOKEN env required", file=sys.stderr)
return 2
cfg = load_config(args.config)
items: list[dict[str, Any]] = cfg["items"]
items_by_slug = {it["slug"]: it for it in items}
numeric_aliases = {
int(it["numeric_alias"]): it["slug"] for it in items if it.get("numeric_alias")
}
client = GiteaClient(args.gitea_host, token) if token else None
if not client:
print("::error::No client (dry-run without token has nothing to do)", file=sys.stderr)
return 2
pr = client.get_pr(args.owner, args.repo, args.pr)
if pr.get("state") != "open":
print(f"::notice::PR #{args.pr} is {pr.get('state')} — gate is a no-op")
return 0
author = (pr.get("user") or {}).get("login", "")
head_sha = (pr.get("head") or {}).get("sha", "")
body = pr.get("body", "") or ""
if not author or not head_sha:
print("::error::PR payload missing user.login or head.sha", file=sys.stderr)
return 1
comments = client.get_issue_comments(args.owner, args.repo, args.pr)
# Build team-membership probe closure that caches results per
# (user, team-id) so a user acking multiple items only triggers
# one membership lookup per team.
team_member_cache: dict[tuple[str, int], bool | None] = {}
def probe(slug: str, users: list[str]) -> list[str]:
item = items_by_slug[slug]
team_names: list[str] = item["required_teams"]
# Resolve names → ids. NOTE: orgs/{org}/teams/search may not be
# available — fall back to the list endpoint.
team_ids: list[int] = []
for tn in team_names:
tid = client.resolve_team_id(args.owner, tn)
if tid is None:
# Try the list endpoint as a fallback.
code, data = client._req( # noqa: SLF001
"GET", f"/orgs/{args.owner}/teams"
)
if code == 200 and isinstance(data, list):
for t in data:
if t.get("name") == tn:
tid = t.get("id")
client._team_id_cache[(args.owner, tn)] = tid # noqa: SLF001
break
if tid is not None:
team_ids.append(tid)
else:
print(
f"::warning::could not resolve team-id for '{tn}' "
f"in org '{args.owner}' — item '{slug}' will fail closed",
file=sys.stderr,
)
approved: list[str] = []
for u in users:
for tid in team_ids:
cache_key = (u, tid)
if cache_key not in team_member_cache:
team_member_cache[cache_key] = client.is_team_member(tid, u)
result = team_member_cache[cache_key]
if result is True:
approved.append(u)
break
if result is None:
print(
f"::warning::team-probe for {u} in team-id {tid} returned 403 "
"(token owner not in that team — fail-closed per RFC#324)",
file=sys.stderr,
)
# Treat as not-in-team for this user/team pair; loop
# may still find membership in another team.
return approved
ack_state = compute_ack_state(comments, author, items_by_slug, numeric_aliases, probe)
body_state = {it["slug"]: section_marker_present(body, it["pr_section_marker"]) for it in items}
state, description = render_status(items, ack_state, body_state)
mode = get_tier_mode(pr, cfg)
if state == "failure" and mode == "soft":
state = "pending"
description = f"[soft-fail tier:low] {description}"
# Diagnostics to job log.
print(f"::notice::PR #{args.pr} author={author} head={head_sha[:7]} mode={mode}")
for it in items:
slug = it["slug"]
ackers = ack_state[slug]["ackers"]
if ackers:
print(f"::notice:: [PASS] {slug} — acked by {','.join(ackers)}")
else:
r = ack_state[slug]["rejected"]
extras: list[str] = []
if r["self_ack"]:
extras.append(f"self-acks-rejected:{','.join(r['self_ack'])}")
if r["not_in_team"]:
extras.append(f"not-in-team:{','.join(r['not_in_team'])}")
extra = " (" + "; ".join(extras) + ")" if extras else ""
print(f"::notice:: [WAIT] {slug} — no valid peer-ack yet{extra}")
print(f"::notice::posting status: state={state} desc={description!r}")
if args.dry_run:
print("::notice::--dry-run: not posting status")
if args.exit_on_state:
return 0 if state in ("success", "pending") else 1
return 0
target_url = f"https://{args.gitea_host}/{args.owner}/{args.repo}/pulls/{args.pr}"
client.post_status(
args.owner, args.repo, head_sha,
state=state, context=args.status_context,
description=description, target_url=target_url,
)
print(f"::notice::status posted: {args.status_context}{state}")
# By default exit 0 — the POSTed status IS the gate, NOT the job
# conclusion. If the job exits 1 BP will see TWO failure signals
# (one from the job's auto-status, one from our POST), making the
# description less actionable. --exit-on-state restores the old
# behavior for local debugging.
if args.exit_on_state:
return 0 if state in ("success", "pending") else 1
return 0
if __name__ == "__main__":
sys.exit(main())
+1 -12
View File
@@ -452,18 +452,7 @@ def reap(
if not isinstance(s, dict):
continue
context = s.get("context") or ""
# Schema asymmetry: Gitea 1.22.6 returns the TOP-LEVEL combined
# aggregate as `combined.state` but each per-context entry in
# `combined.statuses[]` uses the key `status`, NOT `state`.
# Prefer `status`; fall back to `state` so a future Gitea
# version (or a test fixture written against the wrong key)
# still flows through the compensation path. Verified empirically
# via direct API probe 2026-05-12 03:42Z:
# /repos/.../commits/{sha}/status entries → key is "status".
# Pre-rev4 code read "state" only → returned "" → bypassed the
# `state != "failure"` guard → compensation path unreachable.
# See `feedback_smoke_test_vendor_truth_not_shape_match`.
state = s.get("status") or s.get("state") or ""
state = s.get("state") or ""
# Only `failure` is the bug shape. `error`/`pending`/`success`
# left alone — they have other meanings.
@@ -1,524 +0,0 @@
#!/usr/bin/env python3
# Unit tests for sop-checklist-gate.py
#
# Run: python3 .gitea/scripts/tests/test_sop_checklist_gate.py
# or: pytest .gitea/scripts/tests/test_sop_checklist_gate.py
#
# RFC#351 Step 2 of 6 — implementation MVP. Tests cover:
# - slug normalization (the 4 example variants in the script header)
# - parse_directives (ack, revoke, with/without note, mid-comment, etc.)
# - section_marker_present (empty answer rejected, filled answer ok)
# - compute_ack_state (self-ack rejected, team probe applied, revoke
# invalidates own prior ack, peer's ack survives unrevoked)
# - render_status (state + description format)
# - get_tier_mode (label-driven, default fallback)
# - load_config (default config parses cleanly with both PyYAML and
# the bundled minimal parser)
#
# All tests run WITHOUT touching the Gitea API — the team-probe
# callable is dependency-injected.
from __future__ import annotations
import os
import sys
import tempfile
import unittest
# Resolve sibling script regardless of where pytest is invoked from.
HERE = os.path.dirname(os.path.abspath(__file__))
PARENT = os.path.dirname(HERE) # .gitea/scripts
sys.path.insert(0, PARENT)
import importlib.util # noqa: E402
_spec = importlib.util.spec_from_file_location(
"sop_checklist_gate", os.path.join(PARENT, "sop-checklist-gate.py")
)
sop = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(sop) # type: ignore[union-attr]
# ---------------------------------------------------------------------------
# Test fixtures
# ---------------------------------------------------------------------------
CONFIG_PATH = os.path.join(PARENT, "..", "sop-checklist-config.yaml")
def _items() -> list[dict]:
cfg = sop.load_config(CONFIG_PATH)
return cfg["items"]
def _items_by_slug() -> dict[str, dict]:
return {it["slug"]: it for it in _items()}
def _numeric_aliases() -> dict[int, str]:
return {
int(it["numeric_alias"]): it["slug"]
for it in _items()
if it.get("numeric_alias")
}
def _comment(user: str, body: str) -> dict:
return {"user": {"login": user}, "body": body}
# ---------------------------------------------------------------------------
# normalize_slug
# ---------------------------------------------------------------------------
class TestNormalizeSlug(unittest.TestCase):
def test_kebab_already(self):
self.assertEqual(sop.normalize_slug("comprehensive-testing"), "comprehensive-testing")
def test_underscore_to_dash(self):
self.assertEqual(sop.normalize_slug("comprehensive_testing"), "comprehensive-testing")
def test_space_to_dash(self):
self.assertEqual(sop.normalize_slug("comprehensive testing"), "comprehensive-testing")
def test_uppercase_to_lower(self):
self.assertEqual(sop.normalize_slug("Comprehensive-Testing"), "comprehensive-testing")
def test_mixed_separators(self):
self.assertEqual(sop.normalize_slug("Comprehensive_Testing"), "comprehensive-testing")
self.assertEqual(sop.normalize_slug("FIVE_axis review"), "five-axis-review")
def test_collapse_repeated_dashes(self):
self.assertEqual(sop.normalize_slug("comprehensive--testing"), "comprehensive-testing")
self.assertEqual(sop.normalize_slug("comprehensive testing"), "comprehensive-testing")
def test_strip_trailing_punctuation(self):
self.assertEqual(sop.normalize_slug("comprehensive-testing."), "comprehensive-testing")
self.assertEqual(sop.normalize_slug("comprehensive-testing!"), "comprehensive-testing")
def test_numeric_shorthand_known(self):
self.assertEqual(
sop.normalize_slug("1", _numeric_aliases()),
"comprehensive-testing",
)
self.assertEqual(
sop.normalize_slug("3", _numeric_aliases()),
"staging-smoke",
)
self.assertEqual(
sop.normalize_slug("7", _numeric_aliases()),
"memory-consulted",
)
def test_numeric_shorthand_unknown_returns_empty(self):
# "8" is out of range → empty so caller can flag as unparseable.
self.assertEqual(sop.normalize_slug("8", _numeric_aliases()), "")
def test_numeric_without_alias_table_keeps_digits(self):
# No alias table → return the digits as-is.
self.assertEqual(sop.normalize_slug("1"), "1")
def test_empty_input(self):
self.assertEqual(sop.normalize_slug(""), "")
self.assertEqual(sop.normalize_slug(" "), "")
self.assertEqual(sop.normalize_slug(None), "")
# ---------------------------------------------------------------------------
# parse_directives
# ---------------------------------------------------------------------------
class TestParseDirectives(unittest.TestCase):
def setUp(self):
self.aliases = _numeric_aliases()
def test_simple_ack(self):
d = sop.parse_directives("/sop-ack comprehensive-testing", self.aliases)
self.assertEqual(d, [("sop-ack", "comprehensive-testing", "")])
def test_simple_revoke(self):
d = sop.parse_directives("/sop-revoke staging-smoke", self.aliases)
self.assertEqual(d, [("sop-revoke", "staging-smoke", "")])
def test_ack_with_note(self):
d = sop.parse_directives(
"/sop-ack comprehensive-testing LGTM the test covers all edge cases",
self.aliases,
)
self.assertEqual(len(d), 1)
self.assertEqual(d[0][0], "sop-ack")
self.assertEqual(d[0][1], "comprehensive-testing")
self.assertIn("LGTM", d[0][2])
def test_numeric_shorthand(self):
d = sop.parse_directives("/sop-ack 1", self.aliases)
self.assertEqual(d, [("sop-ack", "comprehensive-testing", "")])
def test_revoke_with_reason(self):
d = sop.parse_directives(
"/sop-revoke comprehensive-testing realized the e2e was mocking the DB",
self.aliases,
)
self.assertEqual(d[0][0], "sop-revoke")
self.assertEqual(d[0][1], "comprehensive-testing")
self.assertIn("mocking", d[0][2])
def test_directive_in_middle_of_comment(self):
body = (
"Reviewed the PR, looks good overall.\n"
"/sop-ack comprehensive-testing\n"
"Will follow up on the doc nit separately."
)
d = sop.parse_directives(body, self.aliases)
self.assertEqual(len(d), 1)
self.assertEqual(d[0][1], "comprehensive-testing")
def test_multiple_directives_in_one_comment(self):
body = (
"/sop-ack comprehensive-testing\n"
"/sop-ack local-postgres-e2e\n"
)
d = sop.parse_directives(body, self.aliases)
self.assertEqual(len(d), 2)
slugs = {x[1] for x in d}
self.assertEqual(slugs, {"comprehensive-testing", "local-postgres-e2e"})
def test_must_be_at_line_start(self):
# A directive embedded mid-line is not honored (prevents review
# comments like "to /sop-ack you need..." from acting as acks).
body = "If you want to /sop-ack comprehensive-testing reply in this thread"
d = sop.parse_directives(body, self.aliases)
self.assertEqual(d, [])
def test_leading_whitespace_allowed(self):
body = " /sop-ack comprehensive-testing"
d = sop.parse_directives(body, self.aliases)
self.assertEqual(len(d), 1)
def test_empty_body(self):
self.assertEqual(sop.parse_directives("", self.aliases), [])
self.assertEqual(sop.parse_directives(None, self.aliases), [])
def test_normalization_applied(self):
# /sop-ack Comprehensive_Testing → canonical comprehensive-testing
d = sop.parse_directives("/sop-ack Comprehensive_Testing", self.aliases)
self.assertEqual(d[0][1], "comprehensive-testing")
# ---------------------------------------------------------------------------
# section_marker_present
# ---------------------------------------------------------------------------
class TestSectionMarkerPresent(unittest.TestCase):
def test_marker_with_inline_answer(self):
body = "- [ ] **Comprehensive testing performed**: Added 12 new tests covering null/empty/giant inputs."
self.assertTrue(sop.section_marker_present(body, "Comprehensive testing performed"))
def test_marker_with_empty_answer(self):
body = "- [ ] **Comprehensive testing performed**:"
self.assertFalse(sop.section_marker_present(body, "Comprehensive testing performed"))
def test_marker_with_only_whitespace_answer(self):
body = "- [ ] **Comprehensive testing performed**: \n"
self.assertFalse(sop.section_marker_present(body, "Comprehensive testing performed"))
def test_marker_with_next_line_answer(self):
body = (
"- [ ] **Comprehensive testing performed**:\n"
" Yes — see attached log + 12 new unit tests in foo_test.py.\n"
)
self.assertTrue(sop.section_marker_present(body, "Comprehensive testing performed"))
def test_marker_missing(self):
body = "- [ ] **Local-postgres E2E run**: N/A — pure-frontend\n"
self.assertFalse(sop.section_marker_present(body, "Comprehensive testing performed"))
def test_case_insensitive_marker_match(self):
body = "- [ ] **comprehensive TESTING performed**: yes"
self.assertTrue(sop.section_marker_present(body, "Comprehensive testing performed"))
def test_empty_body(self):
self.assertFalse(sop.section_marker_present("", "X"))
self.assertFalse(sop.section_marker_present(None, "X"))
# ---------------------------------------------------------------------------
# compute_ack_state
# ---------------------------------------------------------------------------
class TestComputeAckState(unittest.TestCase):
def setUp(self):
self.items = _items_by_slug()
self.aliases = _numeric_aliases()
@staticmethod
def _approve_all(slug, users):
return list(users)
@staticmethod
def _approve_none(slug, users):
return []
def _approve_only(self, allowed_users):
return lambda slug, users: [u for u in users if u in allowed_users]
def test_peer_ack_passes(self):
comments = [_comment("bob", "/sop-ack comprehensive-testing")]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
self.assertEqual(state["comprehensive-testing"]["ackers"], ["bob"])
def test_self_ack_rejected(self):
comments = [_comment("alice", "/sop-ack comprehensive-testing")]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
self.assertEqual(state["comprehensive-testing"]["ackers"], [])
self.assertEqual(state["comprehensive-testing"]["rejected"]["self_ack"], ["alice"])
def test_not_in_team_rejected(self):
comments = [_comment("eve", "/sop-ack comprehensive-testing")]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_none
)
self.assertEqual(state["comprehensive-testing"]["ackers"], [])
self.assertEqual(state["comprehensive-testing"]["rejected"]["not_in_team"], ["eve"])
def test_revoke_invalidates_own_prior_ack(self):
# Bob acks then later revokes — Bob no longer counts.
comments = [
_comment("bob", "/sop-ack comprehensive-testing"),
_comment("bob", "/sop-revoke comprehensive-testing realized e2e was mocked"),
]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
self.assertEqual(state["comprehensive-testing"]["ackers"], [])
def test_revoke_does_not_affect_others_acks(self):
# Bob revokes his own ack; Carol's still counts.
comments = [
_comment("bob", "/sop-ack comprehensive-testing"),
_comment("carol", "/sop-ack comprehensive-testing"),
_comment("bob", "/sop-revoke comprehensive-testing"),
]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
self.assertEqual(state["comprehensive-testing"]["ackers"], ["carol"])
def test_ack_after_revoke_restored(self):
# Bob revokes then re-acks (e.g. after re-reviewing).
comments = [
_comment("bob", "/sop-ack comprehensive-testing"),
_comment("bob", "/sop-revoke comprehensive-testing"),
_comment("bob", "/sop-ack comprehensive-testing"),
]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
self.assertEqual(state["comprehensive-testing"]["ackers"], ["bob"])
def test_numeric_shorthand_ack(self):
# /sop-ack 1 → comprehensive-testing
comments = [_comment("bob", "/sop-ack 1")]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
self.assertEqual(state["comprehensive-testing"]["ackers"], ["bob"])
def test_ack_for_unknown_slug_ignored(self):
# Some other slug not in config — silently drop (doesn't crash).
comments = [_comment("bob", "/sop-ack does-not-exist")]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
for slug in self.items:
self.assertEqual(state[slug]["ackers"], [])
def test_multi_item_multi_user(self):
comments = [
_comment("bob", "/sop-ack comprehensive-testing\n/sop-ack staging-smoke"),
_comment("carol", "/sop-ack five-axis-review"),
]
state = sop.compute_ack_state(
comments, "alice", self.items, self.aliases, self._approve_all
)
self.assertEqual(state["comprehensive-testing"]["ackers"], ["bob"])
self.assertEqual(state["staging-smoke"]["ackers"], ["bob"])
self.assertEqual(state["five-axis-review"]["ackers"], ["carol"])
self.assertEqual(state["root-cause"]["ackers"], [])
# ---------------------------------------------------------------------------
# render_status
# ---------------------------------------------------------------------------
class TestRenderStatus(unittest.TestCase):
def setUp(self):
self.items = _items()
self.items_by_slug = _items_by_slug()
def _state_with(self, acked: list[str]) -> dict:
return {
it["slug"]: {
"ackers": ["peer"] if it["slug"] in acked else [],
"rejected": {"self_ack": [], "not_in_team": []},
}
for it in self.items
}
def test_all_acked_returns_success(self):
all_slugs = [it["slug"] for it in self.items]
state, desc = sop.render_status(
self.items, self._state_with(all_slugs), {s: True for s in all_slugs}
)
self.assertEqual(state, "success")
self.assertIn("7/7", desc)
def test_partial_acked_returns_failure(self):
state, desc = sop.render_status(
self.items,
self._state_with(["comprehensive-testing", "staging-smoke"]),
{it["slug"]: True for it in self.items},
)
self.assertEqual(state, "failure")
self.assertIn("2/7", desc)
self.assertIn("missing", desc)
def test_description_truncates_long_missing_list(self):
# Only ack one — 6 missing should be summarized as "+N".
state, desc = sop.render_status(
self.items,
self._state_with(["comprehensive-testing"]),
{it["slug"]: True for it in self.items},
)
# Length budget: under 140 chars.
self.assertLessEqual(len(desc), 140)
self.assertIn("+", desc) # +N elision marker
def test_body_unfilled_surfaced(self):
all_slugs = [it["slug"] for it in self.items]
state, desc = sop.render_status(
self.items,
self._state_with(all_slugs),
{it["slug"]: False for it in self.items},
)
self.assertIn("body-unfilled", desc)
# ---------------------------------------------------------------------------
# get_tier_mode
# ---------------------------------------------------------------------------
class TestGetTierMode(unittest.TestCase):
def setUp(self):
self.cfg = sop.load_config(CONFIG_PATH)
def test_tier_high_is_hard(self):
pr = {"labels": [{"name": "tier:high"}, {"name": "area:ci"}]}
self.assertEqual(sop.get_tier_mode(pr, self.cfg), "hard")
def test_tier_medium_is_hard(self):
pr = {"labels": [{"name": "tier:medium"}]}
self.assertEqual(sop.get_tier_mode(pr, self.cfg), "hard")
def test_tier_low_is_soft(self):
pr = {"labels": [{"name": "tier:low"}]}
self.assertEqual(sop.get_tier_mode(pr, self.cfg), "soft")
def test_no_tier_label_defaults_to_hard(self):
# Per feedback_fix_root_not_symptom — never silently lower the bar.
pr = {"labels": [{"name": "area:ci"}]}
self.assertEqual(sop.get_tier_mode(pr, self.cfg), "hard")
def test_no_labels_defaults_to_hard(self):
self.assertEqual(sop.get_tier_mode({"labels": []}, self.cfg), "hard")
self.assertEqual(sop.get_tier_mode({}, self.cfg), "hard")
# ---------------------------------------------------------------------------
# load_config
# ---------------------------------------------------------------------------
class TestLoadConfig(unittest.TestCase):
def test_default_config_parses(self):
cfg = sop.load_config(CONFIG_PATH)
self.assertIn("items", cfg)
self.assertEqual(len(cfg["items"]), 7)
slugs = {it["slug"] for it in cfg["items"]}
self.assertEqual(
slugs,
{
"comprehensive-testing",
"local-postgres-e2e",
"staging-smoke",
"root-cause",
"five-axis-review",
"no-backwards-compat",
"memory-consulted",
},
)
def test_default_config_tier_mode_shape(self):
cfg = sop.load_config(CONFIG_PATH)
self.assertEqual(cfg["tier_failure_mode"]["tier:high"], "hard")
self.assertEqual(cfg["tier_failure_mode"]["tier:medium"], "hard")
self.assertEqual(cfg["tier_failure_mode"]["tier:low"], "soft")
self.assertEqual(cfg["default_mode"], "hard")
def test_each_item_has_required_fields(self):
cfg = sop.load_config(CONFIG_PATH)
for it in cfg["items"]:
self.assertIn("slug", it)
self.assertIn("numeric_alias", it)
self.assertIn("pr_section_marker", it)
self.assertIn("required_teams", it)
self.assertIsInstance(it["required_teams"], list)
self.assertGreater(len(it["required_teams"]), 0)
# ---------------------------------------------------------------------------
# Edge case: full integration without team probe (dependency-injected)
# ---------------------------------------------------------------------------
class TestEndToEndAckFlow(unittest.TestCase):
"""All-7-items happy path with synthetic comments. Verifies the
full pipeline minus the Gitea API."""
def test_all_seven_acked_by_proper_teams(self):
items = _items_by_slug()
aliases = _numeric_aliases()
comments = [
_comment("qa-bot", "/sop-ack comprehensive-testing"),
_comment("eng-bot", "/sop-ack local-postgres-e2e"),
_comment("eng-bot", "/sop-ack staging-smoke"),
_comment("mgr-bot", "/sop-ack root-cause"),
_comment("eng-bot", "/sop-ack five-axis-review"),
_comment("mgr-bot", "/sop-ack no-backwards-compat"),
_comment("eng-bot", "/sop-ack memory-consulted"),
]
def probe(slug, users):
# Pretend every user is in every team.
return list(users)
state = sop.compute_ack_state(comments, "alice-author", items, aliases, probe)
body = {it["slug"]: True for it in items.values()}
items_list = list(items.values())
result_state, desc = sop.render_status(items_list, state, body)
self.assertEqual(result_state, "success")
self.assertIn("7/7", desc)
if __name__ == "__main__":
unittest.main(verbosity=2)
-109
View File
@@ -1,109 +0,0 @@
# SOP-Checklist gate — per-item required reviewer teams.
#
# RFC#351 v1 starter set. Each item lists:
# slug — canonical kebab-case form used in /sop-ack <slug>
# pr_section_marker — substring matched in the PR body to detect that
# the author filled in this item (case-insensitive)
# required_teams — list of Gitea team names; an ack from ANY one of
# these teams (logical OR) satisfies the item.
# Membership is probed at gate-time via
# GET /api/v1/teams/{id}/members/{login}.
# Team-id resolution happens at script start via
# GET /api/v1/orgs/{org}/teams (cheap, one call).
# numeric_alias — 1..7; lets reviewers type `/sop-ack 3` as a
# shortcut for `/sop-ack staging-smoke`.
#
# WHY THESE TEAM MAPPINGS:
# The RFC table referenced persona-role names like `core-qa`,
# `core-be`, `core-devops` — these are individual Gitea user logins,
# not teams. The Gitea team-membership API is /teams/{id}/members/{u},
# so we need actual teams. Orchestrator preflight 2026-05-12 verified
# only these teams exist on molecule-ai: ceo(5), engineers(2),
# managers(6), qa(20), security(21), Owners(1), and bot teams. We
# map the RFC roles to the closest existing team and surface the
# mapping explicitly so it's reviewable.
#
# HOW TO EDIT:
# - Tightening: replace `engineers` with a smaller team after creating
# it (e.g. a new `senior-engineers` team if needed).
# - Loosening: add another team to required_teams (OR semantics).
# - Add an item: append to items list and document the slug below.
#
# AUTHOR SELF-ACK IS FORBIDDEN regardless of which team contains them
# — the gate script enforces commenter != PR author before checking
# team membership.
version: 1
# Tier-aware failure mode (RFC#351 open question 2):
# For tier:high — hard-fail (status `failure`, blocks merge via BP).
# For tier:medium — hard-fail (same as high; medium is non-trivial).
# For tier:low — soft-fail (status `pending` with `acked: N/M` in the
# description). BP can choose to require the context
# or not for low-tier PRs.
# If no tier label is present, default to medium (hard-fail) — every PR
# should have a tier label per sop-tier-check, and absence indicates
# a missing-tier defect we should surface, not silently lower the bar.
tier_failure_mode:
"tier:high": hard
"tier:medium": hard
"tier:low": soft
default_mode: hard # used when no tier:* label is present
items:
- slug: comprehensive-testing
numeric_alias: 1
pr_section_marker: "Comprehensive testing performed"
required_teams: [qa, engineers]
description: >-
What was tested, how, edge cases covered. Ack from any qa-team
member (or engineers fallback while qa is small).
- slug: local-postgres-e2e
numeric_alias: 2
pr_section_marker: "Local-postgres E2E run"
required_teams: [engineers]
description: >-
Link to local CI artifact, or "N/A: pure-frontend change". Ack
from any engineer who can verify the local DB test actually ran.
- slug: staging-smoke
numeric_alias: 3
pr_section_marker: "Staging-smoke verified or pending"
required_teams: [engineers]
description: >-
Link to canary run, or "scheduled post-merge". Ack from any
engineer (core-devops/infra-sre are members of engineers team).
- slug: root-cause
numeric_alias: 4
pr_section_marker: "Root-cause not symptom"
required_teams: [managers, ceo]
description: >-
One-sentence root-cause statement. Ack from managers tier
(team-leads) or ceo. Senior judgment required to attest
root-cause-versus-symptom.
- slug: five-axis-review
numeric_alias: 5
pr_section_marker: "Five-Axis review walked"
required_teams: [engineers]
description: >-
Correctness / readability / architecture / security / performance.
Ack from any non-author engineer.
- slug: no-backwards-compat
numeric_alias: 6
pr_section_marker: "No backwards-compat shim / dead code added"
required_teams: [managers, ceo]
description: >-
Yes/no + justification if no. Senior ack required because
backward-compat shims are how dead-code accretes.
- slug: memory-consulted
numeric_alias: 7
pr_section_marker: "Memory/saved-feedback consulted"
required_teams: [engineers]
description: >-
List of feedback memories applicable to this change. Ack from
any engineer who has the same memory access.
-1
View File
@@ -85,5 +85,4 @@ jobs:
REQUIRED_CHECKS: |
Secret scan / Scan diff for credential-shaped strings (pull_request)
sop-tier-check / tier-check (pull_request)
CI / all-required (pull_request)
run: bash .gitea/scripts/audit-force-merge.sh
+8 -35
View File
@@ -70,12 +70,10 @@ jobs:
changes:
name: Detect changes
runs-on: ubuntu-latest
# Phase 4 (RFC #219 §1): all required jobs >=98% green on main.
# Flip confirmed 2026-05-12 via combined-status check of latest main
# commit (all CI jobs green). `all-required` sentinel hard-fails
# when this job fails; no Phase 3 suppression needed.
# revert: add `continue-on-error: true` back if regressions appear.
continue-on-error: false
# Phase 3 (RFC #219 §1): surface broken workflows without blocking
# the PR. Follow-up PR flips this off after the surfaced defects
# (if any) are triaged.
continue-on-error: true
outputs:
platform: ${{ steps.check.outputs.platform }}
canvas: ${{ steps.check.outputs.canvas }}
@@ -126,29 +124,7 @@ jobs:
name: Platform (Go)
needs: changes
runs-on: ubuntu-latest
# mc#664 (interim): re-mask platform-build pending fix-forward. Phase 4
# (#656) flipped this to continue-on-error: false based on a Phase-3-masked
# "green on main 2026-05-12" — the prior continue-on-error: true had
# been hiding failing tests in workspace-server/internal/handlers/.
# Two distinct failure classes surfaced on 0e5152c3:
# (1) 4x delegation_test.go (lines 1110/1176/1228/1271): helpers
# expectExecuteDelegationBase/Success/Failed are missing sqlmock
# expectations for queries production has issued since ~2026-04-21
# (last_outbound_at UPDATE, lookupDeliveryMode/Runtime SELECTs,
# a2a_receive INSERT activity_logs, recordLedgerStatus writes).
# Halt cond #3 applies (regression > 7 days → broader sweep).
# (2) 1x mcp_test.go:433 (TestMCPHandler_CommitMemory_GlobalScope_Blocked):
# commit 7d1a189f (2026-05-10) hardened mcp.go to scrub err.Error()
# from JSON-RPC responses (OFFSEC-001), but the test asserts the
# error message contains "GLOBAL". Production-vs-test contract
# collision — needs design call, not mock update.
# Time-boxed Option A (90 min) did not fit the cross-cutting scope.
# This is a sequenced revert→fix→reflip per
# feedback_strict_root_only_after_class_a emergency clause — NOT
# a permanent re-mask. Re-flip blocked on mc#664 fix-forward landing.
# Other 4 #656 flips (changes, canvas-build, shellcheck, python-lint)
# retain continue-on-error: false; only platform-build regresses.
continue-on-error: true # mc#664 fix-forward in flight; re-flip when tests pass
continue-on-error: true
defaults:
run:
working-directory: workspace-server
@@ -295,8 +271,7 @@ jobs:
name: Canvas (Next.js)
needs: changes
runs-on: ubuntu-latest
# Phase 4 (RFC #219 §1): confirmed green on main 2026-05-12.
continue-on-error: false
continue-on-error: true
defaults:
run:
working-directory: canvas
@@ -342,8 +317,7 @@ jobs:
name: Shellcheck (E2E scripts)
needs: changes
runs-on: ubuntu-latest
# Phase 4 (RFC #219 §1): confirmed green on main 2026-05-12.
continue-on-error: false
continue-on-error: true
steps:
- if: needs.changes.outputs.scripts != 'true'
run: echo "No tests/e2e/ or infra/scripts/ changes — skipping real shellcheck; this job always runs to satisfy the required-check name on branch protection."
@@ -418,8 +392,7 @@ jobs:
name: Python Lint & Test
needs: changes
runs-on: ubuntu-latest
# Phase 4 (RFC #219 §1): confirmed green on main 2026-05-12.
continue-on-error: false
continue-on-error: true
env:
WORKSPACE_ID: test
defaults:
-132
View File
@@ -1,132 +0,0 @@
name: lint-mask-pr-atomicity
# Tier 2d hard-gate lint (per internal#350) — blocks PRs that touch
# `.gitea/workflows/ci.yml` and modify ONLY ONE of {continue-on-error,
# all-required.sentinel.needs} without a `Paired: #NNN` reference in
# the PR body or in a commit message.
#
# Why this exists
# ---------------
# PR#665 (interim `continue-on-error: true` on `platform-build`) and
# PR#668 (sentinel-`needs` demotion of the same job) were designed as a
# pair but merged solo — #665 landed at 04:47Z 2026-05-12, #668 was
# still open at 05:07Z when the main-red watchdog (#674) fired. Result:
# ~20 minutes of `main` red and a cascade of false-positives on
# unrelated PRs. This lint structurally prevents that class.
#
# How the gate works
# ------------------
# 1. The workflow runs on every PR whose diff touches ci.yml (paths
# filter). It is NOT a required check on `main` because the rule is
# diff-based — running it on PRs that don't touch ci.yml would
# produce a `pending` status forever (per
# `feedback_path_filtered_workflow_cant_be_required`).
# 2. The script reads `BASE_SHA:ci.yml` and `HEAD_SHA:ci.yml`, parses
# both via PyYAML AST (per `feedback_behavior_based_ast_gates` — no
# grep, no regex on the raw text — so a YAML-shape refactor still
# detects).
# 3. Walks `jobs.*.continue-on-error` on each side; flags any value
# diff. Reads `jobs.all-required.needs` on each side; flags any
# set diff (order-insensitive — `needs:` is engine-unordered).
# 4. If both predicates fired → atomic, OK. If neither → no risk, OK.
# If exactly one fired → require `Paired: #NNN` in PR body OR in
# any commit message between base..head; else fail.
#
# Phase contract (RFC internal#219 §1 ladder)
# -------------------------------------------
# This workflow lands at `continue-on-error: true` (Phase 3 — surface
# regressions without blocking PRs while the rule beds in).
# Follow-up PR flips to `false` once we have ≥3 days of clean runs on
# `main` and no false-positives. Tracking issue: internal#350.
#
# Cross-links
# -----------
# - internal#350 (the RFC that specs this lint)
# - PR#665 / PR#668 (the empirical split-pair)
# - mc#664 (the main-red incident the split caused)
# - feedback_strict_root_only_after_class_a
# - feedback_behavior_based_ast_gates
#
# Auth: only needs the auto-injected GITHUB_TOKEN (read-only, repo
# scope). No DRIFT_BOT_TOKEN needed — Tier 2d does NOT call
# branch_protections (Tier 2g/f do).
on:
pull_request:
types: [opened, synchronize, reopened, edited]
# `edited` is included because the rule depends on PR_BODY: a user
# may add `Paired: #NNN` after first push to satisfy the lint. The
# rerun on `edited` lets the PR turn green without an empty
# commit. Gitea 1.22.6 fires `edited` on body changes — verified
# via gitea-source/models/issues/pull_list.go::triggerNewPRWebhook.
paths:
- '.gitea/workflows/ci.yml'
- '.gitea/scripts/lint_mask_pr_atomicity.py'
- '.gitea/workflows/lint-mask-pr-atomicity.yml'
- 'tests/test_lint_mask_pr_atomicity.py'
env:
# Belt-and-suspenders against the runner-default trap
# (feedback_act_runner_github_server_url). Runners are configured
# with this env via /opt/molecule/runners/config.yaml, but pinning
# at the workflow level protects against a runner regenerated
# without the config file.
GITHUB_SERVER_URL: https://git.moleculesai.app
permissions:
contents: read
pull-requests: read
# Per-PR concurrency — re-pushes cancel previous runs to keep the
# queue short. The lint is cheap (one git show + log + a YAML parse).
concurrency:
group: lint-mask-pr-atomicity-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
scan:
name: lint-mask-pr-atomicity
runs-on: ubuntu-latest
timeout-minutes: 5
# Phase 3 (RFC #219 §1): surface broken shapes without blocking
# PRs. Follow-up PR flips this to `false` once recent runs on main
# are confirmed clean (eat-our-own-dogfood discipline mirrors
# PR#673's same-shape comment). Tracking: internal#350.
continue-on-error: true
steps:
- name: Check out PR head with full history (need base SHA blobs)
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
# `git show <base-sha>:<path>` needs the base SHA's blobs.
# Shallow=1 would miss it. Same rationale as PR#673 and
# check-migration-collisions.yml.
fetch-depth: 0
- name: Set up Python (PyYAML for AST parsing)
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: '3.12'
- name: Install PyYAML
# Same pin as ci-required-drift.yml + the rest of the Tier 2
# lint family — keep runner-cache hits uniform.
run: python -m pip install --quiet 'PyYAML==6.0.2'
- name: Ensure base ref is reachable locally
# fetch-depth=0 usually pulls the base too, but explicit-fetch
# is cheap insurance against runner-version drift (matches the
# comment in check-migration-collisions.yml and PR#673).
run: |
git fetch origin "${{ github.event.pull_request.base.ref }}" || true
- name: Run lint-mask-pr-atomicity
env:
BASE_SHA: ${{ github.event.pull_request.base.sha }}
HEAD_SHA: ${{ github.event.pull_request.head.sha }}
# PR body — the script greps for `Paired: #NNN`.
PR_BODY: ${{ github.event.pull_request.body }}
CI_WORKFLOW_PATH: .gitea/workflows/ci.yml
SENTINEL_JOB_KEY: all-required
run: python3 .gitea/scripts/lint_mask_pr_atomicity.py
- name: Run lint-mask-pr-atomicity unit tests
# Run the test suite in-CI so the lint's own behaviour is
# verified on every change. Matches lint-workflow-yaml.yml.
run: |
python -m pip install --quiet pytest
python3 -m pytest tests/test_lint_mask_pr_atomicity.py -v
@@ -1,96 +0,0 @@
# lint-required-no-paths — structural enforcement of
# `feedback_path_filtered_workflow_cant_be_required`.
#
# Fails the PR if ANY workflow whose status-check context appears in
# `branch_protections/main.status_check_contexts` carries a
# `paths:` or `paths-ignore:` filter in its `on:` block.
#
# Why this exists:
# A required-check workflow with a paths filter silently degrades the
# merge gate. If a PR's diff doesn't touch the filter, the workflow
# never fires; Gitea (1.22.6) reports the required context as
# `pending` (NOT `skipped == success`), so the PR cannot merge. For a
# docs-only PR against `paths: ['**.go']`, the PR is wedged forever.
#
# Previously prevented only by reviewer vigilance + the saved memory
# `feedback_path_filtered_workflow_cant_be_required`. This workflow
# makes it a hard CI gate.
#
# Forward-compat scope:
# Today (2026-05-11) molecule-core/main protects 3 contexts:
# - "Secret scan / Scan diff for credential-shaped strings (pull_request)"
# - "sop-tier-check / tier-check (pull_request)"
# - "CI / all-required (pull_request)"
# Per RFC#324 Step 2 the required-list expands to ~5 contexts
# (qa-review, security-review added). Each new required context's
# workflow must remain unconditional. This lint pins that contract.
#
# Meta-required-check:
# This workflow ITSELF deliberately has NO `paths:` filter on its `on:`
# block — otherwise a paths-non-matching PR could bypass the check.
# Self-evident from this file: only `pull_request` types + no paths.
#
# Auth:
# `GET /repos/.../branch_protections/{branch}` requires repo-admin
# role in Gitea 1.22.6. The workflow-default `GITHUB_TOKEN` is
# non-admin (read-only), so we re-use `DRIFT_BOT_TOKEN` (same persona
# that powers `ci-required-drift.yml` — verified working there).
# If `DRIFT_BOT_TOKEN` becomes unavailable, the script exits 0 with a
# loud `::error::` rather than red-X every PR — token-scope issues
# should be fixed at the token, not surfaced as a gate failure on
# every unrelated PR.
#
# Behavior-based gate per `feedback_behavior_based_ast_gates`:
# YAML AST walk (PyYAML), NOT grep. Workflow renames, formatting
# changes (block-scalar vs flow-style), or moving `paths:` between
# `pull_request:` and `pull_request_target:` all still detect.
#
# IMPORTANT — Gitea 1.22.6 parser quirk per
# `feedback_gitea_workflow_dispatch_inputs_unsupported`: do NOT add an
# `inputs:` block to `workflow_dispatch:` — Gitea 1.22.6 rejects the
# entire workflow as "unknown on type" and it registers for ZERO events.
name: lint-required-no-paths
on:
pull_request:
types: [opened, synchronize, reopened]
workflow_dispatch:
# Read protection + read local YAML. No writes.
permissions:
contents: read
# Only one in-flight run per PR — re-pushes cancel the previous run to
# keep the queue short. Required-list reads are cheap (one GET); the
# cancellation is just hygiene.
concurrency:
group: lint-required-no-paths-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
lint:
name: lint-required-no-paths
runs-on: ubuntu-latest
timeout-minutes: 5
steps:
- name: Check out repo (we read the workflow YAML files locally)
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Set up Python (PyYAML for AST parsing)
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: '3.12'
- name: Install PyYAML
run: python -m pip install --quiet 'PyYAML==6.0.2'
- name: Run lint-required-no-paths
env:
# DRIFT_BOT_TOKEN is owned by mc-drift-bot, a least-privilege
# Gitea persona with repo-admin role for branch_protections
# read. Same secret used by ci-required-drift.yml — see that
# workflow's header for provisioning trail (internal#329).
GITEA_TOKEN: ${{ secrets.DRIFT_BOT_TOKEN }}
GITEA_HOST: git.moleculesai.app
REPO: ${{ github.repository }}
BRANCH: main
WORKFLOWS_DIR: .gitea/workflows
run: python3 .gitea/scripts/lint-required-no-paths.py
-75
View File
@@ -1,75 +0,0 @@
name: Lint workflow YAML (Gitea-1.22.6-hostile shapes)
# Tier-2 hard-gate lint (RFC internal#219 §1, charter §SOP-N rule (m)).
# Catches six Gitea-1.22.6-hostile workflow-YAML shapes BEFORE they reach
# `main`. Each rule maps to a documented incident in saved memory:
#
# 1. workflow_dispatch.inputs — feedback_gitea_workflow_dispatch_inputs_unsupported
# (2026-05-11 PyPI freeze 24h)
# 2. on: workflow_run — task #81 (Gitea 1.22.6 lacks the event)
# 3. name: containing "/" — breaks status-context tokenization
# 4. cross-file name collision — status-reaper rev1 fail-loud class
# 5. cross-repo uses: org/r/p@r — feedback_gitea_cross_repo_uses_blocked
# (DEFAULT_ACTIONS_URL=github → 404)
# 6. (WARN) api.github.com refs — feedback_act_runner_github_server_url
# without workflow-level GITHUB_SERVER_URL
#
# Empirical history this hardens against:
# - status-reaper rev1 caught rule-4 (name-collision) class
# - sop-tier-refire DOA'd on rule-2 (workflow_run partial)
# - #319 bootstrap-paradox (chained-defect class, related)
# - internal#329 dispatcher race (adjacent)
# - 2026-05-11 publish-runtime: rule-1, 24h PyPI freeze
#
# Triggers:
# - pull_request: pre-merge gate — block hostile shapes before they land
# - push: post-merge regression detection — catch direct-to-main edits
#
# Per RFC internal#219 §1 contract: continue-on-error: true during the
# surface-broken-shapes phase. Follow-up PR flips off after surfaced
# defects are triaged. The push-trigger ensures we catch regressions
# even if the pull_request gate is bypassed by branch-protection drift.
on:
pull_request:
paths:
- '.gitea/workflows/**'
- '.gitea/scripts/lint-workflow-yaml.py'
- 'tests/test_lint_workflow_yaml.py'
push:
branches: [main, staging]
paths:
- '.gitea/workflows/**'
- '.gitea/scripts/lint-workflow-yaml.py'
- 'tests/test_lint_workflow_yaml.py'
# Belt-and-suspenders against runner default
# (feedback_act_runner_github_server_url).
env:
GITHUB_SERVER_URL: https://git.moleculesai.app
jobs:
lint:
name: Lint workflow YAML for Gitea-1.22.6-hostile shapes
runs-on: ubuntu-latest
# Phase 3 (RFC #219 §1): surface broken shapes without blocking PRs.
# Follow-up PR flips this off after the 4 existing-on-main rule-2
# (workflow_run) violations are migrated to a supported trigger.
continue-on-error: true
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
with:
python-version: '3.11'
- name: Install PyYAML
run: pip install --quiet 'PyYAML>=6.0'
- name: Lint .gitea/workflows/*.yml
run: python3 .gitea/scripts/lint-workflow-yaml.py
- name: Run lint-workflow-yaml unit tests
run: |
pip install --quiet pytest
python3 -m pytest tests/test_lint_workflow_yaml.py -v
+7 -6
View File
@@ -54,12 +54,13 @@ env:
jobs:
build-and-push:
name: Build & push canvas image
# infra/docker-label-registration (molecule-ai/operator-config PR #30): `docker` label
# is now registered on all act_runners that mount /var/run/docker.sock. This change
# routes publish jobs exclusively to Docker-capable runners (no more coin-flip failures).
# Prerequisite: operator host must be rolled to pick up new runner config. See
# molecule-ai/molecule-core issue #711.
runs-on: [ubuntu-latest, docker]
# REVERTED (infra/revert-docker-runner-label): `runs-on: ubuntu-latest` restored.
# The `docker` label is not registered on any act_runner. `runs-on: [ubuntu-latest, docker]`
# causes jobs to queue indefinitely with zero eligible runners — strictly worse than the
# pre-#599 coin-flip (50% success rate). Once the `docker` label is registered on
# ≥2 runners, re-apply the fix from #599 (infra/docker-runner-label).
# See issue #576 + infra-lead pulse ~00:30Z.
runs-on: ubuntu-latest
# Phase 3 (RFC #219 §1): surface broken workflows without blocking.
continue-on-error: true
steps:
@@ -52,12 +52,13 @@ env:
jobs:
build-and-push:
# infra/docker-label-registration (molecule-ai/operator-config PR #30): `docker` label
# is now registered on all act_runners that mount /var/run/docker.sock. This change
# routes publish jobs exclusively to Docker-capable runners (no more coin-flip failures).
# Prerequisite: operator host must be rolled to pick up new runner config. See
# molecule-ai/molecule-core issue #711.
runs-on: [ubuntu-latest, docker]
# REVERTED (infra/revert-docker-runner-label): `runs-on: ubuntu-latest` restored.
# The `docker` label is not registered on any act_runner. `runs-on: [ubuntu-latest, docker]`
# causes jobs to queue indefinitely with zero eligible runners — strictly worse than the
# pre-#599 coin-flip (50% success rate). Once the `docker` label is registered on
# ≥2 runners, re-apply the fix from #599 (infra/docker-runner-label).
# See issue #576 + infra-lead pulse ~00:30Z.
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+10 -10
View File
@@ -50,10 +50,10 @@ name: redeploy-tenants-on-main
# target_tag=<sha>, re-pulling the older image on every tenant.
on:
push:
workflow_run:
workflows: ['publish-workspace-server-image']
types: [completed]
branches: [main]
paths:
- '.gitea/workflows/publish-workspace-server-image.yml'
permissions:
contents: read
# No write scopes needed — the workflow hits an external CP endpoint,
@@ -79,11 +79,11 @@ env:
jobs:
redeploy:
# Skip the auto-trigger if publish-workspace-server-image didn't
# actually succeed. The push trigger fires when the workflow file
# is updated (post-merge of publish-workspace-server-image). This is
# the best-available proxy for "publish succeeded" without workflow_run.
# If the push was from a revert or a partial publish, continue-on-error
# on the individual job means the redeploy failure won't block merges.
# actually succeed. workflow_run fires on any completion state; we
# don't want to redeploy against a half-built image.
# NOTE (Gitea port): workflow_dispatch trigger dropped; only the
# workflow_run path remains.
if: ${{ github.event.workflow_run.conclusion == 'success' }}
runs-on: ubuntu-latest
# Phase 3 (RFC #219 §1): surface broken workflows without blocking.
continue-on-error: true
@@ -111,7 +111,7 @@ jobs:
# dispatch with no input falls through to github.sha.
env:
INPUT_TAG: ${{ inputs.target_tag }}
HEAD_SHA: ${{ github.sha }}
HEAD_SHA: ${{ github.event.workflow_run.head_sha || github.sha }}
run: |
set -euo pipefail
if [ -n "${INPUT_TAG:-}" ]; then
@@ -251,7 +251,7 @@ jobs:
# GHCR's manifest. For workflow_run (default :latest) the
# workflow_run.head_sha is the SHA that just published.
env:
EXPECTED_SHA: ${{ github.sha }}
EXPECTED_SHA: ${{ github.event.workflow_run.head_sha || github.sha }}
TARGET_TAG: ${{ steps.tag.outputs.target_tag }}
# Tenant subdomain template — slugs from the response are
# appended. Production CP issues `<slug>.moleculesai.app`;
@@ -50,10 +50,10 @@ name: redeploy-tenants-on-staging
# of a known-good build.
on:
push:
workflow_run:
workflows: ['publish-workspace-server-image']
types: [completed]
branches: [main]
paths:
- '.gitea/workflows/publish-workspace-server-image.yml'
permissions:
contents: read
# No write scopes needed — the workflow hits an external CP endpoint,
@@ -72,12 +72,12 @@ env:
jobs:
redeploy:
# The push trigger fires when publish-workspace-server-image.yml is updated
# (post-merge of the publish workflow). This is the best-available proxy
# for "publish succeeded" without workflow_run. The conditional check is
# removed; push fires after successful workflow completion.
# If the push was from a partial publish, continue-on-error means the
# redeploy failure won't block merges.
# Skip the auto-trigger if publish-workspace-server-image didn't
# actually succeed. workflow_run fires on any completion state; we
# don't want to redeploy against a half-built image.
# NOTE (Gitea port): workflow_dispatch trigger dropped; only the
# workflow_run path remains.
if: ${{ github.event.workflow_run.conclusion == 'success' }}
runs-on: ubuntu-latest
# Phase 3 (RFC #219 §1): surface broken workflows without blocking.
continue-on-error: true
@@ -237,7 +237,7 @@ jobs:
# ssm_status-success-but-stale-image hazard and benefits from the
# same gate. Diff: TENANT_DOMAIN includes the `staging.` infix.
env:
EXPECTED_SHA: ${{ github.sha }}
EXPECTED_SHA: ${{ github.event.workflow_run.head_sha || github.sha }}
TARGET_TAG: ${{ inputs.target_tag || 'staging-latest' }}
TENANT_DOMAIN: 'staging.moleculesai.app'
run: |
-121
View File
@@ -1,121 +0,0 @@
# sop-checklist-gate — peer-ack merge gate for SOP-checklist items.
#
# RFC#351 Step 2 of 6 (implementation MVP).
#
# === DESIGN ===
#
# Goal: each PR must answer 7 SOP-checklist questions in its body,
# and each item must have at least one /sop-ack <slug> comment from
# a non-author peer in the required team. BP requires the
# `sop-checklist / all-items-acked (pull_request)` status to merge.
#
# Triggers:
# - `pull_request_target`: opened, edited, synchronize, reopened
# → fires when PR opens, body is edited (refire — RFC#351 §4),
# or new code is pushed (head.sha changes → stale status would
# be auto-discarded by BP via dismiss_stale_reviews, but the
# status itself is per-SHA so we re-post on the new head).
# - `issue_comment`: created, edited, deleted
# → fires on any new comment so /sop-ack / /sop-revoke take
# effect immediately (Gitea 1.22.6 doesn't refire on
# pull_request_review per feedback_pull_request_review_no_refire,
# so issue_comment is the canonical refire channel).
#
# Trust boundary (mirrors RFC#324 §A4 + sop-tier-check security note):
# `pull_request_target` (not `pull_request`) — workflow def is loaded
# from BASE branch, so a PR cannot rewrite this workflow to exfiltrate
# the token. The `actions/checkout` step pins `ref: base.sha` so the
# script ALSO comes from BASE. PR-HEAD code is never executed in the
# runner.
#
# Token scope:
# - read:repository, read:organization for PR + comments + team probes
# - write:repository for POST /statuses/{sha}
# - The token owner MUST be a member of every team referenced by the
# config's required_teams (else /teams/{id}/members/{login} returns
# 403 — see review-check.sh same-gotcha doc). For the MVP we use
# the dev-lead token (a member of engineers, managers, qa, security)
# via a repo secret `SOP_CHECKLIST_GATE_TOKEN`. Provisioning of that
# secret is a follow-up authorization step (separate from this PR).
#
# Failure mode: tier-aware (RFC#351 open question 2):
# - tier:high → state=failure (hard-fail; BP blocks merge)
# - tier:medium → state=failure (hard-fail; same)
# - tier:low → state=pending (soft-fail; BP can choose to require
# this context or skip for low-tier PRs)
# - missing/no-tier → state=failure (default-mode: hard — never lower
# the bar per feedback_fix_root_not_symptom)
#
# Slash-command contract (RFC#351 v1 + §A1.1-style notes from RFC#324):
#
# /sop-ack <slug-or-numeric-alias> [optional note]
# — register a peer-ack for one checklist item.
# — slug accepts kebab-case, snake_case, or natural-spaces
# (all normalize to canonical kebab-case).
# — numeric 1..7 maps via config.items[*].numeric_alias.
# — most-recent (user, slug) directive wins.
#
# /sop-revoke <slug-or-numeric-alias> [reason]
# — invalidate the commenter's own prior /sop-ack for this slug.
# — does NOT affect other peers' acks on the same slug.
# — most-recent (user, slug) directive wins, so a later /sop-ack
# re-restores the ack.
#
# The eval is read-only + idempotent (read PR + comments + team
# membership, compute, post status). Re-running on any event is safe —
# the new status overwrites the previous one for the same context.
name: sop-checklist-gate
on:
pull_request_target:
types: [opened, edited, synchronize, reopened]
issue_comment:
types: [created, edited, deleted]
permissions:
contents: read
pull-requests: read
# NOTE: `statuses: write` is the GitHub-Actions name for POST /statuses.
# Gitea 1.22.6 may not gate on this permission key (it just checks the
# token), but listing it explicitly documents intent for the next
# platform-version upgrade.
statuses: write
jobs:
gate:
# Run on pull_request_target events always. On issue_comment events,
# only when the comment is on a PR (issue_comment fires for issues
# too) and the body contains one of the slash-commands.
if: |
github.event_name == 'pull_request_target' ||
(github.event_name == 'issue_comment' &&
github.event.issue.pull_request != null &&
(contains(github.event.comment.body, '/sop-ack') ||
contains(github.event.comment.body, '/sop-revoke')))
runs-on: ubuntu-latest
steps:
- name: Check out BASE ref (trust boundary — never PR-head)
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
# For pull_request_target, the default branch is the trust
# anchor. For issue_comment the PR base may differ from the
# default branch (PR targeting `staging`), so we use the
# default-branch ref explicitly — same approach as
# qa-review.yml so the script source is always trusted.
ref: ${{ github.event.repository.default_branch }}
- name: Run sop-checklist-gate
env:
GITEA_TOKEN: ${{ secrets.SOP_CHECKLIST_GATE_TOKEN || secrets.GITHUB_TOKEN }}
PR_NUMBER: ${{ github.event.pull_request.number || github.event.issue.number }}
OWNER: ${{ github.repository_owner }}
REPO_NAME: ${{ github.event.repository.name }}
run: |
set -euo pipefail
python3 .gitea/scripts/sop-checklist-gate.py \
--owner "$OWNER" \
--repo "$REPO_NAME" \
--pr "$PR_NUMBER" \
--config .gitea/sop-checklist-config.yaml \
--gitea-host git.moleculesai.app
+7 -8
View File
@@ -59,10 +59,9 @@ name: Staging verify
# are populated.
on:
push:
branches: [main]
paths:
- '.gitea/workflows/publish-workspace-server-image.yml'
workflow_run:
workflows: ["publish-workspace-server-image"]
types: [completed]
permissions:
contents: read
packages: write
@@ -79,10 +78,10 @@ env:
jobs:
staging-smoke:
# The push trigger fires when publish-workspace-server-image.yml is updated
# (post-merge of the publish workflow). This is the best-available proxy
# for "publish succeeded" without workflow_run. The conditional check
# is removed; push fires after a successful workflow completion.
# Skip when the upstream workflow failed — no image to test against.
# workflow_dispatch trigger dropped in this Gitea port; only the
# workflow_run path remains.
if: ${{ github.event.workflow_run.conclusion == 'success' }}
runs-on: ubuntu-latest
# Phase 3 (RFC #219 §1): surface broken workflows without blocking.
continue-on-error: true
@@ -63,7 +63,6 @@ export function DropTargetBadge() {
<>
{ghostVisible && (
<div
data-testid="ghost-slot"
className="pointer-events-none absolute z-40 rounded-lg border-2 border-dashed border-emerald-400/70 bg-emerald-500/10"
style={{
left: slotTL.x,
@@ -74,7 +73,6 @@ export function DropTargetBadge() {
/>
)}
<div
data-testid="drop-badge"
className="pointer-events-none absolute z-50 -translate-x-1/2 -translate-y-full rounded-md bg-emerald-500 px-2 py-0.5 text-[11px] font-medium text-emerald-50 shadow-lg shadow-emerald-950/40"
style={{ left: badge.x, top: badge.y - 6 }}
>
@@ -1,253 +0,0 @@
// @vitest-environment jsdom
/**
* Tests for DropTargetBadge — floating drag affordance rendered over the
* ReactFlow canvas while a workspace node is being dragged onto a parent.
*
* Covers:
* - Renders nothing when dragOverNodeId is null
* - Renders nothing when target node not found in store
* - Renders nothing when getInternalNode returns null
* - Renders ghost slot + badge when valid target is found
* - Ghost hidden when slot falls outside parent bounds
* - Badge text includes the target workspace name
* - Badge positioned via screen-space coordinates from flowToScreenPosition
*/
import React from "react";
import { render, screen, cleanup } from "@testing-library/react";
import { afterEach, describe, expect, it, vi } from "vitest";
import { DropTargetBadge } from "../DropTargetBadge";
// ─── Mutable store state — hoisted so vi.mock factory closures capture the ref ─
let _storeState: {
dragOverNodeId: string | null;
nodes: Array<{
id: string;
data: Record<string, unknown>;
parentId: string | null;
measured?: { width: number; height: number };
}>;
} = {
dragOverNodeId: null,
nodes: [],
};
const _subscribers = new Set<() => void>();
function _notifySubscribers() {
for (const fn of _subscribers) fn();
}
const _mockUseCanvasStore = vi.hoisted(() => {
const impl = (selector: (s: typeof _storeState) => unknown) => selector(_storeState);
return impl;
});
// Module-level mutable impl — setFlowMock() swaps it out per test.
let _flowImpl: (arg: { x: number; y: number }) => { x: number; y: number } =
({ x, y }) => ({ x: x * 2, y: y * 2 });
let _flowToScreenPosition = vi.hoisted(() =>
vi.fn((arg: { x: number; y: number }) => _flowImpl(arg)),
);
let _getInternalNode = vi.hoisted(() =>
vi.fn<(id: string) => {
internals: { positionAbsolute: { x: number; y: number } };
measured?: { width: number; height: number };
} | null>(() => null),
);
const _mockUseReactFlow = vi.hoisted(() =>
vi.fn(() => ({
getInternalNode: _getInternalNode,
flowToScreenPosition: _flowToScreenPosition,
})),
);
// ─── Module mocks ─────────────────────────────────────────────────────────────
vi.mock("@/store/canvas", () => ({
useCanvasStore: _mockUseCanvasStore,
}));
vi.mock("@xyflow/react", () => ({
useReactFlow: _mockUseReactFlow,
}));
// ─── Helpers ──────────────────────────────────────────────────────────────────
function setStore(state: Partial<typeof _storeState>) {
_storeState = { ..._storeState, ...state };
_notifySubscribers();
}
// Helper to set per-test flowToScreenPosition mock — replaces _flowImpl.
function setFlowMock(impl: (arg: { x: number; y: number }) => { x: number; y: number }) {
_flowImpl = impl;
}
// ─── Tests ────────────────────────────────────────────────────────────────────
describe("DropTargetBadge — renders nothing when not dragging", () => {
afterEach(() => {
cleanup();
_storeState = { dragOverNodeId: null, nodes: [] };
_getInternalNode.mockReset().mockReturnValue(null);
_flowImpl = ({ x, y }) => ({ x: x * 2, y: y * 2 });
});
it("returns null when dragOverNodeId is null", () => {
setStore({ dragOverNodeId: null });
render(<DropTargetBadge />);
expect(document.body.textContent).toBe("");
});
it("returns null when target node not found in store nodes array", () => {
setStore({ dragOverNodeId: "ws-target", nodes: [] });
render(<DropTargetBadge />);
expect(document.body.textContent).toBe("");
});
});
describe("DropTargetBadge — renders nothing when getInternalNode is null", () => {
afterEach(() => {
cleanup();
_storeState = { dragOverNodeId: null, nodes: [] };
_getInternalNode.mockReset().mockReturnValue(null);
_flowImpl = ({ x, y }) => ({ x: x * 2, y: y * 2 });
});
it("returns null when getInternalNode returns null (node not in RF viewport)", () => {
_getInternalNode.mockReturnValue(null);
setStore({
dragOverNodeId: "ws-target",
nodes: [{ id: "ws-target", data: { name: "Target WS" }, parentId: null }],
});
render(<DropTargetBadge />);
expect(document.body.textContent).toBe("");
});
});
describe("DropTargetBadge — renders ghost slot + badge for valid drag target", () => {
afterEach(() => {
cleanup();
_storeState = { dragOverNodeId: null, nodes: [] };
_getInternalNode.mockReset().mockReturnValue(null);
_flowImpl = ({ x, y }) => ({ x: x * 2, y: y * 2 });
});
it("renders the drop badge with target name", () => {
_getInternalNode.mockReturnValue({
internals: { positionAbsolute: { x: 100, y: 200 } },
measured: { width: 220, height: 120 },
});
_flowToScreenPosition
.mockReturnValueOnce({ x: 500, y: 400 }) // slotTL
.mockReturnValueOnce({ x: 900, y: 600 }) // slotBR
.mockReturnValueOnce({ x: 700, y: 200 }); // badge
setStore({
dragOverNodeId: "ws-target",
nodes: [
{ id: "ws-target", data: { name: "SEO Workspace" }, parentId: null, measured: { width: 220, height: 120 } },
],
});
render(<DropTargetBadge />);
expect(screen.getByText(/Drop into: SEO Workspace/)).toBeTruthy();
});
it("renders the ghost slot div via data-testid", () => {
// measured.height must be large enough that parentBR.y > slotTL.y=330 so
// ghostVisible = (slotTL.y < parentBR.y) is true.
// parentBR.y = abs.y + measured.height = 200 + h > 330 → h > 130
_getInternalNode.mockReturnValue({
internals: { positionAbsolute: { x: 100, y: 200 } },
measured: { width: 220, height: 500 },
});
// Component calls flowToScreenPosition 5 times (confirmed via debug):
// 1) badge {x:210, y:200} -> {x:420, y:400} (badge center)
// 2) slotTL {x:116, y:330} -> {x:232, y:660} (slot origin)
// 3) slotBR {x:356, y:460} -> {x:712, y:920} (ghost uses this)
// 4) parentTL {x:100, y:200} -> {x:200, y:400} (parent origin)
// 5) parentBR {x:320, y:320} -> {x:640, y:640} (parent corner)
setFlowMock(({ x, y }: { x: number; y: number }) => {
if (x === 210 && y === 200) return { x: 420, y: 400 };
if (x === 116 && y === 330) return { x: 232, y: 660 };
if (x === 356 && y === 460) return { x: 712, y: 920 };
if (x === 100 && y === 200) return { x: 200, y: 400 };
// 5th call: parentBR = abs + {w:220, h:500} = {320, 700}
if (x === 320 && y === 700) return { x: 640, y: 1400 };
return { x: x * 2, y: y * 2 };
});
setStore({
dragOverNodeId: "ws-target",
nodes: [
{ id: "ws-target", data: { name: "Target" }, parentId: null, measured: { width: 220, height: 500 } },
],
});
render(<DropTargetBadge />);
expect(screen.getByTestId("ghost-slot")).toBeTruthy();
// Ghost uses slotBR from 3rd call: slotBR - slotTL = (712-232, 920-660)
expect(screen.getByTestId("ghost-slot").style.left).toBe("232px");
expect(screen.getByTestId("ghost-slot").style.top).toBe("660px");
expect(screen.getByTestId("ghost-slot").style.width).toBe("480px");
expect(screen.getByTestId("ghost-slot").style.height).toBe("260px");
});
it("ghost is hidden when slot falls entirely outside parent bounds", () => {
_getInternalNode.mockReturnValue({
internals: { positionAbsolute: { x: 100, y: 200 } },
measured: { width: 220, height: 120 },
});
// Set slotBR (3rd call) to be inside parent to hide ghost.
// slotBR.x ≤ parentTL.x makes slotBR.x - slotTL.x < 0 → ghostVisible = false.
setFlowMock(({ x, y }: { x: number; y: number }) => {
if (x === 210 && y === 200) return { x: 420, y: 400 }; // badge (1st call)
if (x === 116 && y === 330) return { x: 232, y: 660 }; // slotTL (2nd call)
if (x === 356 && y === 460) return { x: 150, y: 460 }; // slotBR (3rd): slotBR.x=150 < parentTL.x=200 → hidden
if (x === 100 && y === 200) return { x: 200, y: 400 }; // parentTL (4th call)
if (x === 320 && y === 320) return { x: 640, y: 640 }; // parentBR (5th call)
return { x: x * 2, y: y * 2 };
});
setStore({
dragOverNodeId: "ws-target",
nodes: [
{ id: "ws-target", data: { name: "Tiny" }, parentId: null, measured: { width: 220, height: 120 } },
],
});
render(<DropTargetBadge />);
// Badge should still render, ghost should not
expect(screen.getByText(/Drop into: Tiny/)).toBeTruthy();
expect(screen.queryByTestId("ghost-slot")).toBeNull();
});
it("badge is absolutely positioned with left and top from flowToScreenPosition", () => {
_getInternalNode.mockReturnValue({
internals: { positionAbsolute: { x: 100, y: 200 } },
measured: { width: 220, height: 120 },
});
setFlowMock(({ x, y }: { x: number; y: number }) => {
if (x === 210 && y === 200) return { x: 420, y: 400 };
if (x === 116 && y === 330) return { x: 232, y: 660 };
if (x === 356 && y === 460) return { x: 712, y: 920 };
if (x === 100 && y === 200) return { x: 200, y: 400 };
if (x === 320 && y === 320) return { x: 640, y: 640 };
return { x: x * 2, y: y * 2 };
});
setStore({
dragOverNodeId: "ws-target",
nodes: [
{ id: "ws-target", data: { name: "Target" }, parentId: null, measured: { width: 220, height: 120 } },
],
});
render(<DropTargetBadge />);
expect(screen.getByTestId("drop-badge")).toBeTruthy();
// Badge uses 1st call: {x:210,y:200} -> {x:420,y:400}, badge.y = 400-6 = 394
expect(screen.getByTestId("drop-badge").style.left).toBe("420px");
expect(screen.getByTestId("drop-badge").style.top).toBe("394px");
expect(screen.getByText(/Drop into: Target/)).toBeTruthy();
});
});
+2 -7
View File
@@ -54,14 +54,9 @@ export function MobileChat({
// user sees their prior thread on entry. The store is updated by the
// socket → ChatTab flows the desktop runs; on mobile we read from the
// same buffer to keep state coherent across viewports.
// NOTE: do NOT use `?? []` in the selector — Zustand uses Object.is
// for selector equality. A fallback `?? []` creates a new [] reference on
// every store update when agentMessages[agentId] is undefined, causing an
// infinite re-render loop (React error #185 / Maximum update depth
// exceeded). The undefined case is handled by the initializer below.
const storedMessages = useCanvasStore((s) => s.agentMessages[agentId]);
const storedMessages = useCanvasStore((s) => s.agentMessages[agentId] ?? []);
const [messages, setMessages] = useState<ChatMessage[]>(() =>
(storedMessages ?? []).map((m) => ({
storedMessages.map((m) => ({
id: m.id,
role: "agent",
text: m.content,
@@ -1,535 +0,0 @@
// @vitest-environment jsdom
/**
* Tests for ActivityTab — activity ledger with live updates, filtering,
* expand/collapse, and A2A error hint rendering.
*
* Covers:
* - Loading state
* - Error state (network failure)
* - Empty state (no activities)
* - Activity list rendering (single + multiple)
* - Filter bar: 7 filters, active filter highlighted
* - Each filter updates the rendered list
* - Auto-refresh toggle (Live / Paused)
* - Refresh button calls API
* - Full Trace button opens ConversationTraceModal
* - Duration display in activity rows
* - Expand/collapse row details
* - A2A rows show source → target name flow
* - Error rows styled differently
* - Error detail shown when expanded
* - getSkills exported function (standalone unit)
*/
import React from "react";
import { render, screen, fireEvent, cleanup, act, waitFor } from "@testing-library/react";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { ActivityTab } from "../ActivityTab";
import type { ActivityEntry } from "@/types/activity";
const mockApiGet = vi.fn();
const mockUseSocketEvent = vi.fn();
const mockUseWorkspaceName = vi.fn<(id: string | null) => string>((_id: string | null) => "Test Workspace");
const mockConversationTraceModal = vi.fn(() => null);
const mockConversationTraceModalRender = vi.fn(
({ open }: { open: boolean }) => (open ? <div data-testid="trace-modal">Trace</div> : null),
);
vi.mock("@/hooks/useSocketEvent", () => ({
useSocketEvent: (...args: unknown[]) => mockUseSocketEvent(...args),
}));
vi.mock("@/hooks/useWorkspaceName", () => ({
useWorkspaceName: () => mockUseWorkspaceName,
}));
vi.mock("@/components/ConversationTraceModal", () => ({
ConversationTraceModal: (props: { open: boolean; onClose: () => void; workspaceId: string }) =>
props.open ? <div data-testid="trace-modal">Trace</div> : null,
}));
vi.mock("@/lib/api", () => ({
api: { get: (...args: unknown[]) => mockApiGet(...args) },
}));
// ─── Fixtures ───────────────────────────────────────────────────────────────
function activity(overrides: Partial<ActivityEntry> = {}): ActivityEntry {
return {
id: "act-1",
workspace_id: "ws-1",
activity_type: "agent_log",
source_id: null,
target_id: null,
method: null,
summary: null,
request_body: null,
response_body: null,
duration_ms: null,
status: "ok",
error_detail: null,
created_at: new Date(Date.now() - 60_000).toISOString(),
...overrides,
};
}
// ─── Helpers ────────────────────────────────────────────────────────────────
async function flush() {
await act(async () => { await Promise.resolve(); });
}
// ─── Tests ────────────────────────────────────────────────────────────────
describe("ActivityTab — loading / error / empty", () => {
beforeEach(() => {
mockApiGet.mockReset();
mockUseSocketEvent.mockReset();
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("shows loading state initially", () => {
mockApiGet.mockImplementation(() => new Promise(() => {}));
render(<ActivityTab workspaceId="ws-1" />);
expect(screen.getByText("Loading activity...")).toBeTruthy();
});
it("shows error banner when API fails", async () => {
mockApiGet.mockRejectedValue(new Error("network failure"));
render(<ActivityTab workspaceId="ws-1" />);
await flush();
expect(screen.getByText(/network failure/i)).toBeTruthy();
});
it("shows empty state when no activities", async () => {
mockApiGet.mockResolvedValue([]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
expect(screen.getByText("No activity recorded yet")).toBeTruthy();
});
});
describe("ActivityTab — list rendering", () => {
beforeEach(() => {
mockApiGet.mockReset();
mockUseSocketEvent.mockReset();
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("renders a single activity row", async () => {
mockApiGet.mockResolvedValue([activity({ id: "a1", activity_type: "agent_log" })]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
expect(screen.getByText("LOG")).toBeTruthy();
});
it("renders multiple activity rows", async () => {
mockApiGet.mockResolvedValue([
activity({ id: "a1", activity_type: "agent_log" }),
activity({ id: "a2", activity_type: "task_update" }),
]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
expect(screen.getByText("LOG")).toBeTruthy();
expect(screen.getByText("TASK")).toBeTruthy();
});
it("shows duration when duration_ms is present", async () => {
mockApiGet.mockResolvedValue([
activity({ id: "a1", duration_ms: 1234, activity_type: "agent_log" }),
]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
expect(screen.getByText("1234ms")).toBeTruthy();
});
it("shows summary text when present", async () => {
mockApiGet.mockResolvedValue([
activity({ id: "a1", summary: "Delegated task to SEO Agent", activity_type: "a2a_send" }),
]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
expect(screen.getByText(/Delegated task to SEO Agent/)).toBeTruthy();
});
});
describe("ActivityTab — filter bar", () => {
beforeEach(() => {
mockApiGet.mockReset();
mockUseSocketEvent.mockReset();
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("renders all 7 filter buttons", async () => {
mockApiGet.mockResolvedValue([]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
expect(screen.getByRole("button", { name: /all/i })).toBeTruthy();
expect(screen.getByRole("button", { name: /a2a in/i })).toBeTruthy();
expect(screen.getByRole("button", { name: /a2a out/i })).toBeTruthy();
expect(screen.getByRole("button", { name: /tasks/i })).toBeTruthy();
expect(screen.getByRole("button", { name: /skill promo/i })).toBeTruthy();
expect(screen.getByRole("button", { name: /logs/i })).toBeTruthy();
expect(screen.getByRole("button", { name: /errors/i })).toBeTruthy();
});
it("active filter has aria-pressed=true", async () => {
mockApiGet.mockResolvedValue([]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
const allBtn = screen.getByRole("button", { name: /all/i });
expect(allBtn.getAttribute("aria-pressed")).toBe("true");
});
it("clicking a filter updates aria-pressed and re-fetches", async () => {
mockApiGet.mockResolvedValue([]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
const errorsBtn = screen.getByRole("button", { name: /errors/i });
await act(async () => { errorsBtn.click(); });
await flush();
expect(errorsBtn.getAttribute("aria-pressed")).toBe("true");
// API was called with ?type=error
expect(mockApiGet).toHaveBeenLastCalledWith("/workspaces/ws-1/activity?type=error");
});
it("clicking All removes the type query param", async () => {
mockApiGet.mockResolvedValue([]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
// First click a specific filter
const errorsBtn = screen.getByRole("button", { name: /errors/i });
await act(async () => { errorsBtn.click(); });
await flush();
// Then click All
const allBtn = screen.getByRole("button", { name: /all/i });
await act(async () => { allBtn.click(); });
await flush();
expect(mockApiGet).toHaveBeenLastCalledWith("/workspaces/ws-1/activity");
});
});
describe("ActivityTab — auto-refresh toggle", () => {
beforeEach(() => {
mockApiGet.mockReset();
mockUseSocketEvent.mockReset();
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("renders Live by default", async () => {
mockApiGet.mockResolvedValue([]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
expect(screen.getByText("⟳ Live")).toBeTruthy();
});
it("clicking Live toggles to Paused", async () => {
mockApiGet.mockResolvedValue([]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
const liveBtn = screen.getByText("⟳ Live");
await act(async () => { liveBtn.click(); });
await flush();
expect(screen.getByText("⟳ Paused")).toBeTruthy();
});
it("clicking Paused toggles back to Live", async () => {
mockApiGet.mockResolvedValue([]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
const liveBtn = screen.getByText("⟳ Live");
await act(async () => { liveBtn.click(); });
await flush();
const pausedBtn = screen.getByText("⟳ Paused");
await act(async () => { pausedBtn.click(); });
await flush();
expect(screen.getByText("⟳ Live")).toBeTruthy();
});
});
describe("ActivityTab — refresh button", () => {
beforeEach(() => {
mockApiGet.mockReset();
mockUseSocketEvent.mockReset();
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("Refresh calls the API", async () => {
mockApiGet.mockResolvedValue([]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
const refreshBtn = screen.getByRole("button", { name: /refresh/i });
await act(async () => { refreshBtn.click(); });
await flush();
// loadActivities called again (second call)
expect(mockApiGet.mock.calls.length).toBeGreaterThanOrEqual(2);
});
});
describe("ActivityTab — Full Trace button", () => {
beforeEach(() => {
mockApiGet.mockReset();
mockUseSocketEvent.mockReset();
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("Full Trace button opens the trace modal", async () => {
mockApiGet.mockResolvedValue([]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
const traceBtn = screen.getByRole("button", { name: /full trace/i });
await act(async () => { traceBtn.click(); });
await flush();
expect(screen.getByTestId("trace-modal")).toBeTruthy();
});
});
describe("ActivityTab — row expand / collapse", () => {
beforeEach(() => {
mockApiGet.mockReset();
mockUseSocketEvent.mockReset();
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("row is collapsed by default (shows ▶)", async () => {
mockApiGet.mockResolvedValue([activity({ id: "a1", activity_type: "agent_log" })]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
expect(screen.getByText("▶")).toBeTruthy();
});
it("clicking a row expands it (shows ▼)", async () => {
mockApiGet.mockResolvedValue([activity({ id: "a1", activity_type: "agent_log" })]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
const rowBtn = screen.getByText("LOG").closest("button") as HTMLButtonElement;
await act(async () => { rowBtn.click(); });
await flush();
expect(screen.getByText("▼")).toBeTruthy();
});
it("clicking expanded row collapses it", async () => {
mockApiGet.mockResolvedValue([activity({ id: "a1", activity_type: "agent_log" })]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
const rowBtn = screen.getByText("LOG").closest("button") as HTMLButtonElement;
await act(async () => { rowBtn.click(); }); // expand
await flush();
await act(async () => { rowBtn.click(); }); // collapse
await flush();
expect(screen.getByText("▶")).toBeTruthy();
});
});
describe("ActivityTab — A2A rows with source/target", () => {
beforeEach(() => {
mockApiGet.mockReset();
mockUseSocketEvent.mockReset();
mockUseWorkspaceName.mockImplementation((id: string | null) => {
if (id === "ws-agent-1") return "Alice Agent";
if (id === "ws-agent-2") return "Bob Agent";
return "Unknown";
});
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("shows source → target for a2a_receive rows", async () => {
mockApiGet.mockResolvedValue([
activity({
id: "a1",
activity_type: "a2a_receive",
source_id: "ws-agent-1",
target_id: "ws-agent-2",
method: "message/send",
}),
]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
expect(screen.getByText("Alice Agent")).toBeTruthy();
expect(screen.getByText("→")).toBeTruthy();
expect(screen.getByText("Bob Agent")).toBeTruthy();
});
it("shows A2A OUT badge for a2a_send rows", async () => {
mockApiGet.mockResolvedValue([
activity({
id: "a1",
activity_type: "a2a_send",
source_id: "ws-agent-1",
target_id: "ws-agent-2",
}),
]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
expect(screen.getByText("A2A OUT")).toBeTruthy();
});
});
describe("ActivityTab — error rows", () => {
beforeEach(() => {
mockApiGet.mockReset();
mockUseSocketEvent.mockReset();
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("error status row renders with ERROR badge", async () => {
mockApiGet.mockResolvedValue([
activity({ id: "a1", activity_type: "error", status: "error" }),
]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
expect(screen.getByText("ERROR")).toBeTruthy();
});
it("error detail is shown when row is expanded", async () => {
mockApiGet.mockResolvedValue([
activity({
id: "a1",
activity_type: "error",
status: "error",
error_detail: "Connection refused",
duration_ms: null,
}),
]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
const rowBtn = screen.getByText("ERROR").closest("button") as HTMLButtonElement;
await act(async () => { rowBtn.click(); });
await flush();
// Text appears twice: collapsed-row preview + expanded detail section
expect(screen.getAllByText("Connection refused")).toHaveLength(2);
});
});
describe("ActivityTab — type badge rendering", () => {
beforeEach(() => {
mockApiGet.mockReset();
mockUseSocketEvent.mockReset();
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("renders correct badge text for each type", async () => {
const types: ActivityEntry["activity_type"][] = [
"a2a_receive", "a2a_send", "task_update", "skill_promotion", "agent_log", "error",
];
const entries = types.map((t, i) =>
activity({ id: `a${i}`, activity_type: t }),
);
mockApiGet.mockResolvedValue(entries);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
expect(screen.getByText("A2A IN")).toBeTruthy();
expect(screen.getByText("A2A OUT")).toBeTruthy();
expect(screen.getByText("TASK")).toBeTruthy();
expect(screen.getByText("PROMO")).toBeTruthy();
expect(screen.getByText("LOG")).toBeTruthy();
expect(screen.getByText("ERROR")).toBeTruthy();
});
});
describe("ActivityTab — count display", () => {
beforeEach(() => {
mockApiGet.mockReset();
mockUseSocketEvent.mockReset();
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("shows count with 'activities' label when filter=all", async () => {
mockApiGet.mockResolvedValue([
activity({ id: "a1" }),
activity({ id: "a2" }),
]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
expect(screen.getByText(/2 activities/)).toBeTruthy();
});
it("shows count with filter label when non-all filter selected", async () => {
mockApiGet.mockResolvedValue([activity({ id: "a1", activity_type: "error" })]);
render(<ActivityTab workspaceId="ws-1" />);
await flush();
const errorsBtn = screen.getByRole("button", { name: /errors/i });
await act(async () => { errorsBtn.click(); });
await flush();
expect(screen.getByText(/1 error entries/)).toBeTruthy();
});
});
describe("getSkills — unit", () => {
it("returns empty array for null card", async () => {
const { getSkills } = await import("../DetailsTab");
expect(getSkills(null)).toEqual([]);
});
it("returns empty array when skills is not an array", async () => {
const { getSkills } = await import("../DetailsTab");
expect(getSkills({ name: "test" } as Record<string, unknown>)).toEqual([]);
});
it("extracts skill ids and descriptions", async () => {
const { getSkills } = await import("../DetailsTab");
const card = {
skills: [
{ id: "web-search", description: "Search the web" },
{ name: "code-interpreter" },
{ id: "analytics" },
],
};
const result = getSkills(card as Record<string, unknown>);
expect(result).toEqual([
{ id: "web-search", description: "Search the web" },
{ id: "code-interpreter" },
{ id: "analytics" },
]);
});
it("filters out skills with no id or name", async () => {
const { getSkills } = await import("../DetailsTab");
const card = { skills: [{ description: "no id" }, { id: "valid" }] };
expect(getSkills(card as Record<string, unknown>)).toEqual([{ id: "valid" }]);
});
});
@@ -1,459 +0,0 @@
// @vitest-environment jsdom
/**
* Tests for DetailsTab — workspace detail panel with editable fields,
* delete/restart workflows, peers list, error display, and section
* composition.
*
* Covers:
* - View mode: all rows rendered (name, role, tier, status, URL, etc.)
* - Edit mode: name/role/tier fields become editable
* - Save workflow: calls PATCH and updates store
* - Cancel: reverts fields to original data
* - Delete: two-step confirm (confirm button shows alertdialog)
* - Delete confirm: calls DELETE and removes node from store
* - Restart button: calls POST /restart for failed/degraded/offline
* - Error section: shown for failed/degraded with lastSampleError
* - Skills section: rendered when agentCard has skills
* - Peers section: loads and displays peer list
* - Peers section: empty state when offline
* - ConsoleModal: opens/closes via button click
*/
import React from "react";
import { render, screen, fireEvent, cleanup, act, waitFor } from "@testing-library/react";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { DetailsTab } from "../DetailsTab";
import type { WorkspaceNodeData } from "@/store/canvas";
const mockApi = vi.hoisted(() => ({
get: vi.fn(),
patch: vi.fn(),
del: vi.fn(),
post: vi.fn(),
}));
const mockUpdateNodeData = vi.hoisted(() => vi.fn());
const mockRemoveSubtree = vi.hoisted(() => vi.fn());
const mockSelectNode = vi.hoisted(() => vi.fn());
const mockUseCanvasStore = vi.hoisted(() => {
const fn = (selector: (s: {
updateNodeData: typeof mockUpdateNodeData;
removeSubtree: typeof mockRemoveSubtree;
selectNode: typeof mockSelectNode;
}) => unknown) =>
selector({
updateNodeData: mockUpdateNodeData,
removeSubtree: mockRemoveSubtree,
selectNode: mockSelectNode,
});
return fn;
});
vi.mock("@/store/canvas", () => ({
useCanvasStore: mockUseCanvasStore,
}));
vi.mock("@/lib/api", () => ({
api: mockApi,
}));
vi.mock("@/components/BudgetSection", () => ({
BudgetSection: () => <div data-testid="budget-section">BudgetSection</div>,
}));
vi.mock("@/components/WorkspaceUsage", () => ({
WorkspaceUsage: () => <div data-testid="workspace-usage">WorkspaceUsage</div>,
}));
vi.mock("@/components/ConsoleModal", () => ({
ConsoleModal: ({ open, onClose }: { open: boolean; onClose: () => void; workspaceId: string; workspaceName: string }) =>
open ? (
<div role="dialog" data-testid="console-modal">
<button onClick={onClose}>Close Console</button>
</div>
) : null,
}));
// ─── Fixtures ───────────────────────────────────────────────────────────────
const baseData: WorkspaceNodeData = {
name: "Test Workspace",
status: "online",
tier: 2,
url: "https://test.molecules.ai",
parentId: null,
activeTasks: 0,
agentCard: null,
} as WorkspaceNodeData;
function data(overrides: Partial<WorkspaceNodeData> = {}): WorkspaceNodeData {
return { ...baseData, ...overrides } as WorkspaceNodeData;
}
// ─── Helpers ───────────────────────────────────────────────────────────────
async function flush() {
await act(async () => { await Promise.resolve(); });
}
// ─── Tests ────────────────────────────────────────────────────────────────
describe("DetailsTab — view mode", () => {
beforeEach(() => {
mockApi.get.mockReset();
mockUpdateNodeData.mockReset();
mockRemoveSubtree.mockReset();
mockSelectNode.mockReset();
mockApi.get.mockResolvedValue([]);
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("renders name, role, tier, status, URL, parent rows", () => {
render(<DetailsTab workspaceId="ws-1" data={data({ role: "SEO Specialist", url: "https://example.com" })} />);
expect(screen.getByText("Test Workspace")).toBeTruthy();
expect(screen.getByText("SEO Specialist")).toBeTruthy();
expect(screen.getByText("T2")).toBeTruthy();
expect(screen.getByText("online")).toBeTruthy();
expect(screen.getByText("https://example.com")).toBeTruthy();
expect(screen.getByText("root")).toBeTruthy();
});
it("renders Edit button", () => {
render(<DetailsTab workspaceId="ws-1" data={data()} />);
expect(screen.getByRole("button", { name: /edit/i })).toBeTruthy();
});
it("renders BudgetSection and WorkspaceUsage", () => {
render(<DetailsTab workspaceId="ws-1" data={data()} />);
expect(screen.getByTestId("budget-section")).toBeTruthy();
expect(screen.getByTestId("workspace-usage")).toBeTruthy();
});
it("renders Restart button for failed status", () => {
render(<DetailsTab workspaceId="ws-1" data={data({ status: "failed" })} />);
expect(screen.getByRole("button", { name: /retry/i })).toBeTruthy();
});
it("renders Restart button for offline status", () => {
render(<DetailsTab workspaceId="ws-1" data={data({ status: "offline" })} />);
expect(screen.getByRole("button", { name: /restart/i })).toBeTruthy();
});
it("renders Restart button for degraded status", () => {
render(<DetailsTab workspaceId="ws-1" data={data({ status: "degraded" })} />);
expect(screen.getByRole("button", { name: /restart/i })).toBeTruthy();
});
it("does not render Restart for online status", () => {
render(<DetailsTab workspaceId="ws-1" data={data()} />);
expect(screen.queryByRole("button", { name: /restart|retry/i })).toBeNull();
});
it("renders error section for failed status with lastSampleError", () => {
render(
<DetailsTab
workspaceId="ws-1"
data={data({ status: "failed", lastSampleError: "ModuleNotFoundError: No module named 'requests'" })}
/>,
);
expect(screen.getByTestId("details-error-log")).toBeTruthy();
expect(screen.getByText(/ModuleNotFoundError/)).toBeTruthy();
});
it("renders error rate for degraded status", () => {
render(<DetailsTab workspaceId="ws-1" data={data({ status: "degraded", lastErrorRate: 0.15 })} />);
expect(screen.getByText(/15%/)).toBeTruthy();
});
it("renders Delete Workspace button in Danger Zone", () => {
render(<DetailsTab workspaceId="ws-1" data={data()} />);
expect(screen.getByRole("button", { name: /delete workspace/i })).toBeTruthy();
});
});
describe("DetailsTab — edit mode", () => {
beforeEach(() => {
mockApi.patch.mockReset();
mockUpdateNodeData.mockReset();
mockApi.get.mockResolvedValue([]);
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("clicking Edit shows form fields", () => {
render(<DetailsTab workspaceId="ws-1" data={data({ role: "Agent" })} />);
fireEvent.click(screen.getByRole("button", { name: /edit/i }));
expect(screen.getByLabelText(/name/i)).toBeTruthy();
expect(screen.getByLabelText(/role/i)).toBeTruthy();
expect(screen.getByLabelText(/tier/i)).toBeTruthy();
});
it("Edit form pre-fills current values", () => {
render(<DetailsTab workspaceId="ws-1" data={data({ name: "My WS", role: "Coder" })} />);
fireEvent.click(screen.getByRole("button", { name: /edit/i }));
expect((screen.getByLabelText(/name/i) as HTMLInputElement).value).toBe("My WS");
expect((screen.getByLabelText(/role/i) as HTMLInputElement).value).toBe("Coder");
});
it("Save calls PATCH and exits edit mode", async () => {
mockApi.patch.mockResolvedValue({});
render(<DetailsTab workspaceId="ws-1" data={data({ name: "WS" })} />);
fireEvent.click(screen.getByRole("button", { name: /edit/i }));
await flush();
const nameInput = screen.getByLabelText(/name/i) as HTMLInputElement;
fireEvent.change(nameInput, { target: { value: "Renamed WS" } });
await flush();
// Use scoped search: BudgetSection also has a Save button
const saveBtn = Array.from(document.querySelectorAll("button")).find(
(b) => b.textContent === "Save" && !b.getAttribute("data-testid"),
) as HTMLButtonElement;
fireEvent.click(saveBtn);
await flush();
expect(mockApi.patch).toHaveBeenCalledWith(
"/workspaces/ws-1",
expect.objectContaining({ name: "Renamed WS" }),
);
expect(mockUpdateNodeData).toHaveBeenCalledWith("ws-1", expect.objectContaining({ name: "Renamed WS" }));
// Edit fields should no longer be visible
expect(screen.queryByLabelText(/name/i)).toBeNull();
});
it("Cancel reverts to view mode without saving", async () => {
mockApi.patch.mockResolvedValue({});
render(<DetailsTab workspaceId="ws-1" data={data({ name: "Original" })} />);
fireEvent.click(screen.getByRole("button", { name: /edit/i }));
await flush();
const nameInput = screen.getByLabelText(/name/i) as HTMLInputElement;
fireEvent.change(nameInput, { target: { value: "Changed" } });
await flush();
const cancelBtn = Array.from(document.querySelectorAll("button")).find(
(b) => b.textContent === "Cancel" && !b.getAttribute("data-testid"),
) as HTMLButtonElement;
fireEvent.click(cancelBtn);
await flush();
expect(mockApi.patch).not.toHaveBeenCalled();
expect(screen.getByText("Original")).toBeTruthy();
expect(screen.queryByLabelText(/name/i)).toBeNull();
});
it("Save shows error banner on failure", async () => {
mockApi.patch.mockRejectedValue(new Error("Server error"));
render(<DetailsTab workspaceId="ws-1" data={data()} />);
fireEvent.click(screen.getByRole("button", { name: /edit/i }));
await flush();
const saveBtn = Array.from(document.querySelectorAll("button")).find(
(b) => b.textContent === "Save" && !b.getAttribute("data-testid"),
) as HTMLButtonElement;
fireEvent.click(saveBtn);
await flush();
expect(screen.getByText(/server error/i)).toBeTruthy();
});
});
describe("DetailsTab — delete workflow", () => {
beforeEach(() => {
mockApi.del.mockReset();
mockRemoveSubtree.mockReset();
mockSelectNode.mockReset();
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("clicking Delete shows confirm dialog", async () => {
render(<DetailsTab workspaceId="ws-1" data={data()} />);
await flush();
fireEvent.click(screen.getByRole("button", { name: /delete workspace/i }));
await flush();
expect(screen.getByRole("alertdialog")).toBeTruthy();
expect(screen.getByText(/confirm deletion/i)).toBeTruthy();
});
it("confirming delete calls DELETE and removes node from store", async () => {
mockApi.del.mockResolvedValue(undefined);
render(<DetailsTab workspaceId="ws-1" data={data()} />);
await flush();
fireEvent.click(screen.getByRole("button", { name: /delete workspace/i }));
await flush();
// Radix ConfirmDialog uses dispatchEvent with bubbling click
const confirmBtn = Array.from(document.querySelectorAll("button")).find(
(b) => b.textContent === "Confirm Delete",
) as HTMLButtonElement;
fireEvent(confirmBtn, new MouseEvent("click", { bubbles: true }));
await flush();
expect(mockApi.del).toHaveBeenCalledWith("/workspaces/ws-1?confirm=true");
expect(mockRemoveSubtree).toHaveBeenCalledWith("ws-1");
expect(mockSelectNode).toHaveBeenCalledWith(null);
});
it("cancelling delete returns to view mode", async () => {
mockApi.del.mockResolvedValue(undefined);
render(<DetailsTab workspaceId="ws-1" data={data()} />);
await flush();
fireEvent.click(screen.getByRole("button", { name: /delete workspace/i }));
await flush();
const cancelBtn = Array.from(document.querySelectorAll("button")).find(
(b) => b.textContent === "Cancel",
) as HTMLButtonElement;
fireEvent(cancelBtn, new MouseEvent("click", { bubbles: true }));
await flush();
expect(screen.queryByRole("alertdialog")).toBeNull();
expect(screen.getByRole("button", { name: /delete workspace/i })).toBeTruthy();
});
});
describe("DetailsTab — restart workflow", () => {
beforeEach(() => {
mockApi.post.mockReset();
mockUpdateNodeData.mockReset();
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("Restart button calls POST /restart and sets status to provisioning", async () => {
mockApi.post.mockResolvedValue(undefined);
render(<DetailsTab workspaceId="ws-1" data={data({ status: "failed" })} />);
await flush();
fireEvent.click(screen.getByRole("button", { name: /retry/i }));
await flush();
expect(mockApi.post).toHaveBeenCalledWith("/workspaces/ws-1/restart", {});
expect(mockUpdateNodeData).toHaveBeenCalledWith("ws-1", { status: "provisioning" });
});
it("Restart shows error on failure", async () => {
mockApi.post.mockRejectedValue(new Error("Restart failed"));
render(<DetailsTab workspaceId="ws-1" data={data({ status: "offline" })} />);
await flush();
fireEvent.click(screen.getByRole("button", { name: /restart/i }));
await flush();
expect(screen.getByText(/restart failed/i)).toBeTruthy();
});
});
describe("DetailsTab — peers section", () => {
beforeEach(() => {
mockApi.get.mockReset();
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("loads peers from API", async () => {
mockApi.get.mockResolvedValue([
{ id: "p1", name: "Alice Agent", role: "seo", status: "online", tier: 2 },
{ id: "p2", name: "Bob Agent", role: null, status: "offline", tier: 3 },
]);
render(<DetailsTab workspaceId="ws-1" data={data()} />);
await flush();
expect(screen.getByText("Alice Agent")).toBeTruthy();
expect(screen.getByText("Bob Agent")).toBeTruthy();
});
it("shows 'No reachable peers' when list is empty", async () => {
mockApi.get.mockResolvedValue([]);
render(<DetailsTab workspaceId="ws-1" data={data()} />);
await flush();
expect(screen.getByText("No reachable peers")).toBeTruthy();
});
it("shows offline message when workspace is not online", async () => {
mockApi.get.mockResolvedValue([]);
render(<DetailsTab workspaceId="ws-1" data={data({ status: "provisioning" })} />);
await flush();
expect(screen.getByText(/only discoverable while the workspace is online/i)).toBeTruthy();
});
it("clicking peer name selects that node", async () => {
mockApi.get.mockResolvedValue([{ id: "p1", name: "Alice Agent", role: null, status: "online", tier: 2 }]);
render(<DetailsTab workspaceId="ws-1" data={data()} />);
await flush();
fireEvent.click(screen.getByText("Alice Agent"));
await flush();
expect(mockSelectNode).toHaveBeenCalledWith("p1");
});
});
describe("DetailsTab — skills section", () => {
beforeEach(() => {
mockApi.get.mockReset();
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("renders skills from agentCard", () => {
render(
<DetailsTab
workspaceId="ws-1"
data={data({ agentCard: { name: "Test Agent", skills: [
{ id: "web-search", description: "Search the web" },
{ id: "code-interpreter" },
]} as unknown as WorkspaceNodeData["agentCard"] })}
/>,
);
expect(screen.getByText("web-search")).toBeTruthy();
expect(screen.getByText("Search the web")).toBeTruthy();
expect(screen.getByText("code-interpreter")).toBeTruthy();
});
it("does not render Skills section when agentCard is null", () => {
render(<DetailsTab workspaceId="ws-1" data={data()} />);
expect(screen.queryByText("Skills")).toBeNull();
});
});
describe("DetailsTab — ConsoleModal", () => {
beforeEach(() => {
mockApi.get.mockReset();
});
afterEach(() => {
cleanup();
vi.useRealTimers();
});
it("View console output button opens ConsoleModal", async () => {
render(
<DetailsTab
workspaceId="ws-1"
data={data({ status: "failed", lastSampleError: "Traceback..." })}
/>,
);
await flush();
fireEvent.click(screen.getByRole("button", { name: /view console output/i }));
await flush();
expect(screen.getByTestId("console-modal")).toBeTruthy();
});
it("Close button closes ConsoleModal", async () => {
render(
<DetailsTab
workspaceId="ws-1"
data={data({ status: "failed", lastSampleError: "Traceback..." })}
/>,
);
await flush();
fireEvent.click(screen.getByRole("button", { name: /view console output/i }));
await flush();
expect(screen.getByTestId("console-modal")).toBeTruthy();
fireEvent.click(screen.getByRole("button", { name: /close console/i }));
await flush();
expect(screen.queryByTestId("console-modal")).toBeNull();
});
});
File diff suppressed because it is too large Load Diff
-357
View File
@@ -1,357 +0,0 @@
"""Tests for `.gitea/scripts/lint_mask_pr_atomicity.py` — Tier 2d lint.
Structural enforcement of internal#350 Tier 2d: a PR that touches
`.gitea/workflows/ci.yml` and modifies `continue-on-error` OR the
`all-required` sentinel's `needs:` block must EITHER:
- Touch both atomically in the same PR (preferred), OR
- Cross-link to the paired PR via `Paired: #NNN` in body OR a commit
message.
The class this lint exists to prevent: PR#665 (interim
continue-on-error: true on platform-build) + PR#668 (sentinel-exempt)
were designed-as-a-pair but merged solo — #665 landed at 04:47Z, #668
still open at 05:07Z when the watchdog fired. ~20 min of main red.
Test classes (per `feedback_branch_count_before_approving`, every
prod branch enumerated):
- test_diff_touches_neither_passes — diff is in ci.yml
but neither continue-on-error nor all-required.needs is touched.
PR is exempt. Exit 0.
- test_diff_touches_both_atomically_passes — both touched in
the same PR. Atomic. Exit 0.
- test_diff_touches_coe_only_no_pair_fails — continue-on-error
flipped without sentinel-needs change AND no `Paired: #NNN`
reference anywhere. Exit 1.
- test_diff_touches_needs_only_no_pair_fails — sentinel `needs:`
changed without `continue-on-error` change AND no pair reference.
Exit 1.
- test_diff_touches_coe_only_pair_in_body — coe changed, no
needs change, body has `Paired: #668`. Exit 0.
- test_diff_touches_needs_only_pair_in_commit — needs changed, no
coe change, commit message includes `Paired: #665`. Exit 0.
- test_paired_reference_must_be_numeric — `Paired: #abc` or
`Paired: NNNN` (missing `#`) doesn't satisfy the rule. Exit 1.
- test_ci_yml_unchanged_skips — no ci.yml in the
diff at all (defensive — workflow paths-filter already prevents,
but the lint should not crash). Exit 0.
The lint receives base SHA + head SHA via env (set by the workflow
from the pull_request payload) and uses `git show` to read both
sides without a separate clone. Tests stub `subprocess.run` to drive
the diff content; the actual git is never invoked.
Run:
python3 -m pytest tests/test_lint_mask_pr_atomicity.py -v
Dependencies: stdlib + PyYAML (the script reads ci.yml via PyYAML AST
per `feedback_behavior_based_ast_gates`). No network. No live git.
"""
from __future__ import annotations
import importlib.util
import os
import subprocess
import sys
import textwrap
from pathlib import Path
from unittest import mock
import pytest
SCRIPT_PATH = (
Path(__file__).resolve().parent.parent
/ ".gitea"
/ "scripts"
/ "lint_mask_pr_atomicity.py"
)
# Minimal ci.yml fixture — only the bits the lint actually parses
# (a job with continue-on-error + the all-required aggregator).
CI_YML_BASE = """
name: CI
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
platform-build:
runs-on: ubuntu-latest
continue-on-error: false
steps:
- run: echo build
canvas-build:
runs-on: ubuntu-latest
continue-on-error: false
steps:
- run: echo build
all-required:
runs-on: ubuntu-latest
needs:
- platform-build
- canvas-build
if: always()
steps:
- run: echo agg
"""
# Same as base but with continue-on-error flipped on platform-build.
CI_YML_COE_FLIPPED = CI_YML_BASE.replace(
" platform-build:\n runs-on: ubuntu-latest\n continue-on-error: false",
" platform-build:\n runs-on: ubuntu-latest\n continue-on-error: true",
)
# Same as base but with canvas-build dropped from all-required.needs.
CI_YML_NEEDS_CHANGED = CI_YML_BASE.replace(
" needs:\n - platform-build\n - canvas-build",
" needs:\n - platform-build",
)
# Both changed at once.
CI_YML_BOTH = CI_YML_COE_FLIPPED.replace(
" needs:\n - platform-build\n - canvas-build",
" needs:\n - platform-build",
)
def _import_lint(monkeypatch):
"""Import the lint module under a fresh name per test."""
spec = importlib.util.spec_from_file_location(
f"lint_mask_pr_atomicity_{os.getpid()}_{id(monkeypatch)}",
SCRIPT_PATH,
)
m = importlib.util.module_from_spec(spec)
spec.loader.exec_module(m)
return m
def _stub_git(base_yml: str | None, head_yml: str | None, commits: list[str]):
"""Build a fake `subprocess.run` that emulates git show + log.
base_yml / head_yml: contents the lint sees at base/head SHA.
Pass `None` to simulate "path didn't exist on that side" (git
show returns exit code 128 — file-not-in-tree).
commits: list of commit messages on the PR (head's ancestry up to
the base merge-base). The lint runs
`git log --format=%B base..head` to find Paired: refs.
"""
def fake_run(cmd, *args, **kwargs):
if not isinstance(cmd, list):
raise AssertionError(f"unexpected non-list cmd: {cmd!r}")
# `git show <sha>:<path>`
if cmd[:2] == ["git", "show"] and len(cmd) >= 3 and ":" in cmd[2]:
sha, path = cmd[2].split(":", 1)
if "base" in sha or "BASE" in sha:
content = base_yml
else:
content = head_yml
if content is None:
return subprocess.CompletedProcess(
cmd, returncode=128, stdout="", stderr="fatal: path not in tree"
)
return subprocess.CompletedProcess(
cmd, returncode=0, stdout=content, stderr=""
)
# `git log --format=%B base..head -- .`
if cmd[:2] == ["git", "log"]:
body = "\n\n--commit-boundary--\n\n".join(commits)
return subprocess.CompletedProcess(
cmd, returncode=0, stdout=body, stderr=""
)
# `git diff --name-only base..head`
if cmd[:2] == ["git", "diff"]:
# If either side had ci.yml, it's in the diff; else not.
paths = []
if (base_yml or "") != (head_yml or ""):
paths.append(".gitea/workflows/ci.yml")
return subprocess.CompletedProcess(
cmd, returncode=0, stdout="\n".join(paths) + "\n", stderr=""
)
raise AssertionError(f"unexpected git invocation: {cmd!r}")
return fake_run
@pytest.fixture()
def env(monkeypatch):
monkeypatch.setenv("BASE_SHA", "base-sha-1")
monkeypatch.setenv("HEAD_SHA", "head-sha-1")
monkeypatch.setenv("PR_BODY", "")
monkeypatch.setenv("CI_WORKFLOW_PATH", ".gitea/workflows/ci.yml")
monkeypatch.setenv("SENTINEL_JOB_KEY", "all-required")
return monkeypatch
# ---------------------------------------------------------------------------
# Diff in ci.yml but neither rule predicate triggered → pass
# ---------------------------------------------------------------------------
def test_diff_touches_neither_passes(env, monkeypatch, capsys):
# Add a comment-only change (no coe flip, no needs change).
base = CI_YML_BASE
head = "# a harmless comment\n" + CI_YML_BASE
monkeypatch.setattr(
subprocess, "run", _stub_git(base, head, ["chore: comment"])
)
m = _import_lint(monkeypatch)
rc = m.run()
assert rc == 0
out = capsys.readouterr().out
assert "no atomicity risk" in out.lower() or "ok" in out.lower()
# ---------------------------------------------------------------------------
# Diff touches BOTH coe and sentinel.needs in the same PR → atomic, pass
# ---------------------------------------------------------------------------
def test_diff_touches_both_atomically_passes(env, monkeypatch, capsys):
monkeypatch.setattr(
subprocess,
"run",
_stub_git(CI_YML_BASE, CI_YML_BOTH, ["fix(ci): atomic flip"]),
)
m = _import_lint(monkeypatch)
rc = m.run()
assert rc == 0
out = capsys.readouterr().out
assert "atomic" in out.lower()
# ---------------------------------------------------------------------------
# Diff touches ONLY continue-on-error, no pair reference → fail
# ---------------------------------------------------------------------------
def test_diff_touches_coe_only_no_pair_fails(env, monkeypatch, capsys):
monkeypatch.setattr(
subprocess,
"run",
_stub_git(
CI_YML_BASE,
CI_YML_COE_FLIPPED,
["fix(ci): flip coe on platform-build"],
),
)
m = _import_lint(monkeypatch)
rc = m.run()
assert rc == 1
out = capsys.readouterr().out
assert "paired" in out.lower() or "atomicity" in out.lower()
# Actionable failure: must name what is missing.
assert "continue-on-error" in out.lower()
# ---------------------------------------------------------------------------
# Diff touches ONLY sentinel.needs, no pair reference → fail
# ---------------------------------------------------------------------------
def test_diff_touches_needs_only_no_pair_fails(env, monkeypatch, capsys):
monkeypatch.setattr(
subprocess,
"run",
_stub_git(
CI_YML_BASE,
CI_YML_NEEDS_CHANGED,
["fix(ci): drop canvas-build from sentinel"],
),
)
m = _import_lint(monkeypatch)
rc = m.run()
assert rc == 1
out = capsys.readouterr().out
assert "paired" in out.lower() or "atomicity" in out.lower()
assert "needs" in out.lower() or "sentinel" in out.lower()
# ---------------------------------------------------------------------------
# COE-only flip with `Paired: #668` in PR body → pass
# ---------------------------------------------------------------------------
def test_diff_touches_coe_only_pair_in_body(env, monkeypatch, capsys):
monkeypatch.setenv("PR_BODY", "Interim coe flip. Paired: #668")
monkeypatch.setattr(
subprocess,
"run",
_stub_git(
CI_YML_BASE,
CI_YML_COE_FLIPPED,
["fix(ci): flip coe on platform-build"],
),
)
m = _import_lint(monkeypatch)
rc = m.run()
assert rc == 0
out = capsys.readouterr().out
assert "paired" in out.lower()
assert "668" in out
# ---------------------------------------------------------------------------
# Needs-only flip with `Paired: #665` in a commit message → pass
# ---------------------------------------------------------------------------
def test_diff_touches_needs_only_pair_in_commit(env, monkeypatch, capsys):
monkeypatch.setattr(
subprocess,
"run",
_stub_git(
CI_YML_BASE,
CI_YML_NEEDS_CHANGED,
[
"fix(ci): drop canvas-build from sentinel\n\nPaired: #665",
],
),
)
m = _import_lint(monkeypatch)
rc = m.run()
assert rc == 0
out = capsys.readouterr().out
assert "paired" in out.lower()
assert "665" in out
# ---------------------------------------------------------------------------
# `Paired: #abc` is not a valid issue/PR ref — fail
# ---------------------------------------------------------------------------
def test_paired_reference_must_be_numeric(env, monkeypatch, capsys):
monkeypatch.setenv("PR_BODY", "Paired: #abc")
monkeypatch.setattr(
subprocess,
"run",
_stub_git(
CI_YML_BASE,
CI_YML_COE_FLIPPED,
["fix(ci): flip coe"],
),
)
m = _import_lint(monkeypatch)
rc = m.run()
assert rc == 1
# ---------------------------------------------------------------------------
# Defensive: ci.yml not in diff at all → skip cleanly
# ---------------------------------------------------------------------------
def test_ci_yml_unchanged_skips(env, monkeypatch, capsys):
monkeypatch.setattr(
subprocess, "run", _stub_git(CI_YML_BASE, CI_YML_BASE, ["chore: noop"])
)
m = _import_lint(monkeypatch)
rc = m.run()
assert rc == 0
out = capsys.readouterr().out
assert "ci.yml" in out.lower() or "not in" in out.lower() or "skip" in out.lower()
# ---------------------------------------------------------------------------
# Cross-cutting: file ADDED on head side (no base) — coe inferred as
# "newly added with coe=true". Should NOT trigger the lint (it's a new
# file, not a flip — Tier 2e covers tracking-issue for new coe=true).
# ---------------------------------------------------------------------------
def test_ci_yml_newly_added_passes(env, monkeypatch, capsys):
monkeypatch.setattr(
subprocess,
"run",
_stub_git(None, CI_YML_COE_FLIPPED, ["feat(ci): add ci.yml"]),
)
m = _import_lint(monkeypatch)
rc = m.run()
assert rc == 0
-554
View File
@@ -1,554 +0,0 @@
"""Tests for `.gitea/scripts/lint-required-no-paths.py`.
Structural enforcement of `feedback_path_filtered_workflow_cant_be_required`:
no workflow whose status-check context is in `branch_protections/main`
`status_check_contexts` may use `paths:` or `paths-ignore:` filters in its
`on:` block. A path-filtered workflow silently does not fire on a PR whose
diff doesn't touch the filter — Gitea treats that as `pending` forever,
not `skipped`-as-`success`, so the gate degrades to an indefinite block.
Worse, a docs-only PR could never satisfy a required check whose filter
excludes docs paths, and the protected branch becomes unreachable.
Five test classes:
- test_no_required_workflows_succeeds — empty status_check_contexts → exit 0
- test_required_workflow_no_paths_passes — required workflow with no
paths filter → exit 0
- test_required_workflow_with_paths_filter_fails — required workflow
with `paths: ['**.go']` → exit 1, error names workflow
- test_required_workflow_with_paths_ignore_fails — same shape for
`paths-ignore`
- test_unknown_required_context_warns_not_fails — context whose
workflow file is missing → warn, do NOT fail (graceful — could be a
cross-repo context name or a workflow renamed mid-PR; the lint is for
paths-filter detection, not orphaned-context detection — that's
ci-required-drift's job)
Also covers the workflow-name → file-path mapping (parses the
`<workflow_name> / <job_name> (<event>)` context format) and the
multi-event `on:` block edge cases (paths under `on.push` vs `on.pull_request`
vs top-level `on.paths`).
Run:
python3 -m pytest tests/test_lint_required_no_paths.py -v
Dependencies: stdlib + PyYAML (already required by the script itself).
No network. No live Gitea calls — `api()` is stubbed.
"""
from __future__ import annotations
import importlib.util
import os
import sys
from pathlib import Path
from unittest import mock
import pytest
# --------------------------------------------------------------------------
# Module import fixture — mirror of tests/test_ci_required_drift.py shape
# --------------------------------------------------------------------------
SCRIPT_PATH = (
Path(__file__).resolve().parent.parent
/ ".gitea"
/ "scripts"
/ "lint-required-no-paths.py"
)
@pytest.fixture()
def lint_module(tmp_path, monkeypatch):
"""Import the script as a module with a clean env per test.
Tests need a per-test workflows directory under tmp_path; the module
reads `WORKFLOWS_DIR` from env. Fresh import per test means tests
cannot leak global state into each other.
"""
env = {
"GITEA_TOKEN": "test-token",
"GITEA_HOST": "git.example.test",
"REPO": "owner/repo",
"BRANCH": "main",
"WORKFLOWS_DIR": str(tmp_path / ".gitea" / "workflows"),
}
(tmp_path / ".gitea" / "workflows").mkdir(parents=True)
monkeypatch.setattr(os, "environ", {**os.environ, **env})
spec = importlib.util.spec_from_file_location(
f"lint_required_no_paths_{id(tmp_path)}", SCRIPT_PATH
)
m = importlib.util.module_from_spec(spec)
spec.loader.exec_module(m)
# Force-set the globals from env (they were captured at import time;
# we mutate them so the per-test tmp_path is what the script reads).
m.GITEA_TOKEN = env["GITEA_TOKEN"]
m.GITEA_HOST = env["GITEA_HOST"]
m.REPO = env["REPO"]
m.BRANCH = env["BRANCH"]
m.WORKFLOWS_DIR = env["WORKFLOWS_DIR"]
m.OWNER, m.NAME = "owner", "repo"
m.API = f"https://{env['GITEA_HOST']}/api/v1"
return m
def _write_workflow(workflows_dir: str, filename: str, content: str) -> Path:
p = Path(workflows_dir) / filename
p.write_text(content, encoding="utf-8")
return p
def _make_stub_api(responses: dict):
"""Build a fake `api()` callable.
`responses` maps (method, path) tuples to either:
- (status_int, body) → returned as-is
- Exception instance → raised
Calls are recorded in `.calls` for later assertion.
"""
class StubApi:
def __init__(self):
self.calls: list[tuple] = []
def __call__(self, method, path, *, body=None, query=None, expect_json=True):
self.calls.append((method, path, body, query))
key = (method, path)
if key not in responses:
raise AssertionError(
f"unexpected api call: {method} {path} (no stub registered)"
)
r = responses[key]
if isinstance(r, Exception):
raise r
return r
return StubApi()
# --------------------------------------------------------------------------
# context → (workflow_name, job_name, event) parser
# --------------------------------------------------------------------------
def test_parse_context_standard_shape(lint_module):
"""`<workflow_name> / <job_name> (<event>)` round-trips cleanly."""
parsed = lint_module.parse_context(
"Secret scan / Scan diff for credential-shaped strings (pull_request)"
)
assert parsed == (
"Secret scan",
"Scan diff for credential-shaped strings",
"pull_request",
)
def test_parse_context_with_slash_in_job_name(lint_module):
"""Job names CAN contain ' / ' literally in Gitea; the parser must
split on the LAST ' / ' before the trailing ' (event)' suffix."""
parsed = lint_module.parse_context(
"ci / setup / install-deps (pull_request)"
)
# Workflow = first segment; job = everything between first ' / ' and
# the trailing ' (event)'. Pragmatic split: the workflow name is
# `name:` from the YAML, so multi-slash workflow names are unlikely;
# treat the first ' / ' as the divider.
assert parsed[0] == "ci"
assert parsed[1] == "setup / install-deps"
assert parsed[2] == "pull_request"
def test_parse_context_unparseable_returns_none(lint_module):
"""Malformed context string → None so the caller can warn-and-skip."""
assert lint_module.parse_context("garbage no event marker") is None
assert lint_module.parse_context("") is None
# --------------------------------------------------------------------------
# workflow-name → file resolution
# --------------------------------------------------------------------------
def test_resolve_workflow_file_matches_name_attr(lint_module):
"""Resolution scans workflows/*.yml for a `name:` matching the
context's workflow_name. Filename is NOT the source of truth — the
`name:` attribute is, because Gitea's context format uses
`name:` (not the filename).
"""
_write_workflow(
lint_module.WORKFLOWS_DIR,
"some-file.yml",
"name: Secret scan\non:\n pull_request:\n types: [opened]\njobs:\n scan:\n runs-on: ubuntu-latest\n",
)
p = lint_module.resolve_workflow_file("Secret scan")
assert p is not None
assert p.name == "some-file.yml"
def test_resolve_workflow_file_returns_none_when_missing(lint_module):
"""No matching `name:` found → None."""
_write_workflow(
lint_module.WORKFLOWS_DIR,
"other.yml",
"name: Other\non:\n pull_request: {}\njobs:\n x:\n runs-on: ubuntu-latest\n",
)
assert lint_module.resolve_workflow_file("Secret scan") is None
# --------------------------------------------------------------------------
# paths-filter detection
# --------------------------------------------------------------------------
def test_workflow_has_no_paths_filter_clean(lint_module):
"""No paths/paths-ignore → returns empty list (no findings)."""
_write_workflow(
lint_module.WORKFLOWS_DIR,
"clean.yml",
"name: Clean\n"
"on:\n"
" pull_request:\n"
" types: [opened, synchronize]\n"
"jobs:\n"
" x:\n"
" runs-on: ubuntu-latest\n",
)
findings = lint_module.detect_paths_filters(
Path(lint_module.WORKFLOWS_DIR) / "clean.yml"
)
assert findings == []
def test_workflow_with_pull_request_paths_filter_detected(lint_module):
"""`on.pull_request.paths` → ONE finding naming pull_request + paths."""
_write_workflow(
lint_module.WORKFLOWS_DIR,
"bad.yml",
"name: Bad\n"
"on:\n"
" pull_request:\n"
" paths: ['**.go', 'workspace/**']\n"
"jobs:\n"
" x:\n"
" runs-on: ubuntu-latest\n",
)
findings = lint_module.detect_paths_filters(
Path(lint_module.WORKFLOWS_DIR) / "bad.yml"
)
assert len(findings) == 1
f = findings[0]
assert "pull_request" in f
assert "paths" in f
assert "**.go" in f or "workspace/**" in f # filter content surfaced
def test_workflow_with_paths_ignore_filter_detected(lint_module):
"""`on.pull_request.paths-ignore` → finding naming paths-ignore.
paths-ignore is the SAME class of defect: a docs-only PR (that
matches the ignore pattern) silently won't fire the workflow, and the
required context stays pending.
"""
_write_workflow(
lint_module.WORKFLOWS_DIR,
"bad.yml",
"name: Bad\n"
"on:\n"
" pull_request:\n"
" paths-ignore: ['docs/**']\n"
"jobs:\n"
" x:\n"
" runs-on: ubuntu-latest\n",
)
findings = lint_module.detect_paths_filters(
Path(lint_module.WORKFLOWS_DIR) / "bad.yml"
)
assert len(findings) == 1
assert "paths-ignore" in findings[0]
def test_workflow_with_push_paths_filter_detected(lint_module):
"""`on.push.paths` → also a finding. A required check on a PR is
typically `(pull_request)`-event, but a workflow may ALSO have a
push trigger; a paths filter on the push side affects the same
workflow file, and a future PR might add `paths:` to the wrong
event-branch and trip the gate. Surface all paths-filter sites.
"""
_write_workflow(
lint_module.WORKFLOWS_DIR,
"bad.yml",
"name: Bad\n"
"on:\n"
" pull_request:\n"
" types: [opened]\n"
" push:\n"
" branches: [main]\n"
" paths: ['**.py']\n"
"jobs:\n"
" x:\n"
" runs-on: ubuntu-latest\n",
)
findings = lint_module.detect_paths_filters(
Path(lint_module.WORKFLOWS_DIR) / "bad.yml"
)
assert len(findings) == 1
assert "push" in findings[0]
assert "paths" in findings[0]
def test_workflow_with_both_paths_and_paths_ignore_two_findings(lint_module):
"""Both filters under one event → two findings (one per offending
key). Test ensures the detector doesn't short-circuit after the
first."""
_write_workflow(
lint_module.WORKFLOWS_DIR,
"bad.yml",
"name: Bad\n"
"on:\n"
" pull_request:\n"
" paths: ['**.go']\n"
" paths-ignore: ['docs/**']\n"
"jobs:\n"
" x:\n"
" runs-on: ubuntu-latest\n",
)
findings = lint_module.detect_paths_filters(
Path(lint_module.WORKFLOWS_DIR) / "bad.yml"
)
assert len(findings) == 2
def test_workflow_with_on_shorthand_string_passes(lint_module):
"""`on: pull_request` (string shorthand, no sub-keys) cannot have a
paths filter — detector treats it as clean."""
_write_workflow(
lint_module.WORKFLOWS_DIR,
"clean.yml",
"name: Clean\non: pull_request\njobs:\n x:\n runs-on: ubuntu-latest\n",
)
findings = lint_module.detect_paths_filters(
Path(lint_module.WORKFLOWS_DIR) / "clean.yml"
)
assert findings == []
def test_workflow_with_on_list_shorthand_passes(lint_module):
"""`on: [pull_request, push]` (list shorthand) cannot carry filters
either — clean."""
_write_workflow(
lint_module.WORKFLOWS_DIR,
"clean.yml",
"name: Clean\non: [pull_request, push]\njobs:\n x:\n runs-on: ubuntu-latest\n",
)
findings = lint_module.detect_paths_filters(
Path(lint_module.WORKFLOWS_DIR) / "clean.yml"
)
assert findings == []
def test_workflow_on_event_with_null_value_passes(lint_module):
"""`pull_request:` with no body (None / null) is event-shorthand —
no filter possible."""
_write_workflow(
lint_module.WORKFLOWS_DIR,
"clean.yml",
"name: Clean\non:\n pull_request:\n push:\n branches: [main]\njobs:\n x:\n runs-on: ubuntu-latest\n",
)
findings = lint_module.detect_paths_filters(
Path(lint_module.WORKFLOWS_DIR) / "clean.yml"
)
assert findings == []
# --------------------------------------------------------------------------
# End-to-end lint (main) — required-checks fan-out
# --------------------------------------------------------------------------
def test_no_required_workflows_succeeds(lint_module, monkeypatch, capsys):
"""Empty status_check_contexts → exit 0, no findings reported."""
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{"status_check_contexts": []},
),
})
monkeypatch.setattr(lint_module, "api", stub)
rc = lint_module.run()
assert rc == 0
out = capsys.readouterr().out
assert "no required contexts" in out.lower() or "0 required" in out.lower()
def test_required_workflow_no_paths_passes(lint_module, monkeypatch, capsys):
"""A required workflow with no paths filter → exit 0."""
_write_workflow(
lint_module.WORKFLOWS_DIR,
"secret-scan.yml",
"name: Secret scan\non:\n pull_request:\n types: [opened]\njobs:\n scan:\n runs-on: ubuntu-latest\n",
)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"Secret scan / scan (pull_request)",
]
},
),
})
monkeypatch.setattr(lint_module, "api", stub)
rc = lint_module.run()
assert rc == 0
def test_required_workflow_with_paths_filter_fails(
lint_module, monkeypatch, capsys
):
"""A required workflow that has `paths:` filter → exit 1 + error
names the offending workflow + the filter."""
_write_workflow(
lint_module.WORKFLOWS_DIR,
"secret-scan.yml",
"name: Secret scan\n"
"on:\n"
" pull_request:\n"
" paths: ['**.go']\n"
"jobs:\n"
" scan:\n"
" runs-on: ubuntu-latest\n",
)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{"status_check_contexts": ["Secret scan / scan (pull_request)"]},
),
})
monkeypatch.setattr(lint_module, "api", stub)
rc = lint_module.run()
assert rc == 1
out = capsys.readouterr().out
assert "secret-scan.yml" in out
assert "Secret scan" in out
assert "paths" in out
assert "::error::" in out
def test_required_workflow_with_paths_ignore_fails(
lint_module, monkeypatch, capsys
):
"""Same defect class for `paths-ignore` — exit 1, named."""
_write_workflow(
lint_module.WORKFLOWS_DIR,
"sop-tier-check.yml",
"name: sop-tier-check\n"
"on:\n"
" pull_request_target:\n"
" paths-ignore: ['docs/**']\n"
"jobs:\n"
" tier-check:\n"
" runs-on: ubuntu-latest\n",
)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"sop-tier-check / tier-check (pull_request_target)"
]
},
),
})
monkeypatch.setattr(lint_module, "api", stub)
rc = lint_module.run()
assert rc == 1
out = capsys.readouterr().out
assert "sop-tier-check.yml" in out
assert "paths-ignore" in out
def test_unknown_required_context_warns_not_fails(
lint_module, monkeypatch, capsys
):
"""Required context with no matching workflow file → warn, don't
fail. This is gracefully bounded — the lint's mandate is paths-filter
detection, not orphaned-context detection (`ci-required-drift` is the
canonical detector for that).
"""
# No workflows written → all required contexts will be unresolved.
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"Mystery / job (pull_request)",
]
},
),
})
monkeypatch.setattr(lint_module, "api", stub)
rc = lint_module.run()
assert rc == 0 # warn-not-fail
out = capsys.readouterr().out
assert "::warning::" in out
assert "Mystery" in out
def test_multi_required_one_bad_one_good_fails(
lint_module, monkeypatch, capsys
):
"""Two required contexts; one workflow is bad. Lint still fails
(one defect is enough) and the error names ONLY the bad workflow."""
_write_workflow(
lint_module.WORKFLOWS_DIR,
"good.yml",
"name: Good\non:\n pull_request:\n types: [opened]\njobs:\n x:\n runs-on: ubuntu-latest\n",
)
_write_workflow(
lint_module.WORKFLOWS_DIR,
"bad.yml",
"name: Bad\n"
"on:\n"
" pull_request:\n"
" paths: ['src/**']\n"
"jobs:\n x:\n runs-on: ubuntu-latest\n",
)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"Good / x (pull_request)",
"Bad / x (pull_request)",
]
},
),
})
monkeypatch.setattr(lint_module, "api", stub)
rc = lint_module.run()
assert rc == 1
out = capsys.readouterr().out
assert "bad.yml" in out
# `good.yml` should NOT show up in the error block — only the bad one.
# (It may appear as a "checked" notice; assert it's not flagged as bad.)
assert "::error::" in out
error_lines = [ln for ln in out.split("\n") if ln.startswith("::error::") or "paths" in ln.lower() and "good" in ln.lower()]
# The good workflow must not appear under an ::error:: line referencing paths.
for ln in error_lines:
if ln.startswith("::error::"):
# The error line itself shouldn't name good.yml as offending.
assert "good.yml" not in ln
def test_protection_403_treated_as_skip(lint_module, monkeypatch, capsys):
"""If the token can't read branch_protections (HTTP 403), exit 0
with a clear ::error::-but-non-fatal note. Same scope-fallback shape
as ci-required-drift.py per the precedent.
Rationale: if the lint workflow itself can't read protection, the PR
can't make THIS state worse (a paths-filter PR was already addable
without the lint). Better to surface a token-scope problem loudly
than to red-X every PR until the token is fixed.
"""
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
lint_module.ApiError(
"GET /repos/owner/repo/branch_protections/main → HTTP 403: forbidden"
)
),
})
monkeypatch.setattr(lint_module, "api", stub)
rc = lint_module.run()
assert rc == 0
err = capsys.readouterr().err
assert "::error::" in err
assert "403" in err
-413
View File
@@ -1,413 +0,0 @@
"""Tests for `.gitea/scripts/lint-workflow-yaml.py` — Gitea-1.22.6-hostile shape lint.
Hard-gate (Tier-2) lint that catches workflow YAML shapes Gitea 1.22.6
silently rejects, so they never reach `main`. The six anti-patterns are
documented in saved memory; this test suite is the structural enforcement.
Per-rule positive (anti-pattern present -> exit 1) + negative (clean -> exit 0)
cases, plus a multi-file collision case and an aggregation case.
Run:
python3 -m pytest tests/test_lint_workflow_yaml.py -v
Dependencies: stdlib + PyYAML. No network.
Cross-links:
- feedback_gitea_workflow_dispatch_inputs_unsupported (rule 1)
- internal task #81 (rule 2 — workflow_run unsupported)
- feedback_workflow_name_with_slash_breaks_parsing (rule 3, if filed)
- feedback_gitea_cross_repo_uses_blocked (rule 5)
- feedback_act_runner_github_server_url (rule 6)
- feedback_smoke_test_vendor_truth_not_shape_match (test-shape rule)
"""
from __future__ import annotations
import subprocess
import sys
import textwrap
from pathlib import Path
import pytest # noqa: F401 (declares the dep)
REPO_ROOT = Path(__file__).resolve().parents[1]
SCRIPT = REPO_ROOT / ".gitea" / "scripts" / "lint-workflow-yaml.py"
def _run_lint(workflow_dir: Path) -> subprocess.CompletedProcess:
"""Invoke the lint as a subprocess against an isolated workflow dir."""
return subprocess.run(
[sys.executable, str(SCRIPT), "--workflow-dir", str(workflow_dir)],
capture_output=True,
text=True,
)
def _write(workflow_dir: Path, name: str, content: str) -> Path:
"""Write a workflow YAML fixture and return its path."""
workflow_dir.mkdir(parents=True, exist_ok=True)
p = workflow_dir / name
p.write_text(textwrap.dedent(content).lstrip())
return p
# ---------------------------------------------------------------------------
# Rule 1 — workflow_dispatch.inputs (Gitea 1.22.6 parser rejects)
# ---------------------------------------------------------------------------
WD_INPUTS_BAD = """
name: bad-wd-inputs
on:
workflow_dispatch:
inputs:
version:
description: "version"
required: true
type: string
jobs:
x:
runs-on: ubuntu-latest
steps:
- run: echo hi
"""
WD_INPUTS_OK = """
name: ok-wd-no-inputs
on:
workflow_dispatch:
push:
branches: [main]
jobs:
x:
runs-on: ubuntu-latest
steps:
- run: echo hi
"""
def test_rule1_workflow_dispatch_inputs_detects_violation(tmp_path):
_write(tmp_path, "bad.yml", WD_INPUTS_BAD)
r = _run_lint(tmp_path)
assert r.returncode == 1
assert "workflow_dispatch.inputs" in r.stdout
assert "bad.yml" in r.stdout
def test_rule1_workflow_dispatch_inputs_passes_when_absent(tmp_path):
_write(tmp_path, "ok.yml", WD_INPUTS_OK)
r = _run_lint(tmp_path)
assert r.returncode == 0, f"stdout={r.stdout}\nstderr={r.stderr}"
# ---------------------------------------------------------------------------
# Rule 2 — workflow_run event (not supported on Gitea 1.22.6)
# ---------------------------------------------------------------------------
WF_RUN_BAD = """
name: bad-workflow-run
on:
workflow_run:
workflows: ["upstream"]
types: [completed]
jobs:
x:
runs-on: ubuntu-latest
steps:
- run: echo hi
"""
WF_RUN_OK = """
name: ok-no-workflow-run
on:
push:
branches: [main]
jobs:
x:
runs-on: ubuntu-latest
steps:
- run: echo hi
"""
def test_rule2_workflow_run_event_detects_violation(tmp_path):
_write(tmp_path, "bad.yml", WF_RUN_BAD)
r = _run_lint(tmp_path)
assert r.returncode == 1
assert "workflow_run" in r.stdout
assert "bad.yml" in r.stdout
def test_rule2_workflow_run_event_passes_when_absent(tmp_path):
_write(tmp_path, "ok.yml", WF_RUN_OK)
r = _run_lint(tmp_path)
assert r.returncode == 0, f"stdout={r.stdout}\nstderr={r.stderr}"
# ---------------------------------------------------------------------------
# Rule 3 — name: contains "/" (breaks "<workflow> / <job> (<event>)" parsing)
# ---------------------------------------------------------------------------
NAME_SLASH_BAD = """
name: ci / build
on: [push]
jobs:
x:
runs-on: ubuntu-latest
steps:
- run: echo hi
"""
NAME_SLASH_OK = """
name: ci-build
on: [push]
jobs:
x:
runs-on: ubuntu-latest
steps:
- run: echo hi
"""
def test_rule3_name_with_slash_detects_violation(tmp_path):
_write(tmp_path, "bad.yml", NAME_SLASH_BAD)
r = _run_lint(tmp_path)
assert r.returncode == 1
assert "name" in r.stdout.lower()
assert "/" in r.stdout
assert "bad.yml" in r.stdout
def test_rule3_name_with_slash_passes_when_absent(tmp_path):
_write(tmp_path, "ok.yml", NAME_SLASH_OK)
r = _run_lint(tmp_path)
assert r.returncode == 0, f"stdout={r.stdout}\nstderr={r.stderr}"
# ---------------------------------------------------------------------------
# Rule 4 — name collision across files (cross-file)
# ---------------------------------------------------------------------------
COLLISION_A = """
name: shared-name
on: [push]
jobs:
x:
runs-on: ubuntu-latest
steps:
- run: echo a
"""
COLLISION_B = """
name: shared-name
on: [push]
jobs:
x:
runs-on: ubuntu-latest
steps:
- run: echo b
"""
DISTINCT_A = """
name: name-a
on: [push]
jobs:
x:
runs-on: ubuntu-latest
steps:
- run: echo a
"""
DISTINCT_B = """
name: name-b
on: [push]
jobs:
x:
runs-on: ubuntu-latest
steps:
- run: echo b
"""
def test_rule4_name_collision_across_two_files_detects_violation(tmp_path):
_write(tmp_path, "a.yml", COLLISION_A)
_write(tmp_path, "b.yml", COLLISION_B)
r = _run_lint(tmp_path)
assert r.returncode == 1
assert ("collision" in r.stdout.lower()) or ("duplicate" in r.stdout.lower())
assert "shared-name" in r.stdout
def test_rule4_name_collision_passes_when_names_distinct(tmp_path):
_write(tmp_path, "a.yml", DISTINCT_A)
_write(tmp_path, "b.yml", DISTINCT_B)
r = _run_lint(tmp_path)
assert r.returncode == 0, f"stdout={r.stdout}\nstderr={r.stderr}"
# ---------------------------------------------------------------------------
# Rule 5 — cross-repo `uses: org/repo/...@ref` (blocked on 1.22.6)
# ---------------------------------------------------------------------------
CROSS_REPO_BAD = """
name: bad-cross-repo
on: [push]
jobs:
x:
runs-on: ubuntu-latest
steps:
- uses: molecule-ai/molecule-ci/.gitea/actions/audit-force-merge@main
"""
# actions/checkout — bare `org/repo@ref` form — allowed. Rule 5 targets
# `org/repo/SUBPATH@ref` cross-repo composite/reusable references because
# only those resolve through `[actions].DEFAULT_ACTIONS_URL`+org-suspended-host.
CROSS_REPO_OK = """
name: ok-no-cross-repo
on: [push]
jobs:
x:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd
- run: echo hi
"""
def test_rule5_cross_repo_uses_detects_violation(tmp_path):
_write(tmp_path, "bad.yml", CROSS_REPO_BAD)
r = _run_lint(tmp_path)
assert r.returncode == 1
assert ("cross-repo" in r.stdout.lower()) or ("uses" in r.stdout.lower())
assert "bad.yml" in r.stdout
def test_rule5_cross_repo_uses_passes_when_only_actions_org(tmp_path):
_write(tmp_path, "ok.yml", CROSS_REPO_OK)
r = _run_lint(tmp_path)
assert r.returncode == 0, f"stdout={r.stdout}\nstderr={r.stderr}"
# ---------------------------------------------------------------------------
# Rule 6 — GITHUB_SERVER_URL heuristic (warn-not-fail per halt-condition 3)
# ---------------------------------------------------------------------------
GH_API_REF_NO_SERVER = """
name: warn-server-url
on: [push]
jobs:
x:
runs-on: ubuntu-latest
steps:
- run: curl https://api.github.com/repos/foo/bar
"""
GH_API_REF_WITH_SERVER = """
name: ok-server-url-set
on: [push]
env:
GITHUB_SERVER_URL: https://git.moleculesai.app
jobs:
x:
runs-on: ubuntu-latest
steps:
- run: curl https://api.github.com/repos/foo/bar
"""
def test_rule6_github_server_url_missing_is_warning_not_fatal(tmp_path):
"""Heuristic rule — emits warning but does NOT exit 1.
Per halt-condition 3: heuristic may false-positive (current main has 3:
OCI label + jq-release URL refs). Downgrade to warn-not-fail.
"""
_write(tmp_path, "warn.yml", GH_API_REF_NO_SERVER)
r = _run_lint(tmp_path)
assert r.returncode == 0
combined = (r.stdout + r.stderr).lower()
assert ("github_server_url" in combined) or ("::warning" in combined)
def test_rule6_github_server_url_present_no_warning(tmp_path):
_write(tmp_path, "ok.yml", GH_API_REF_WITH_SERVER)
r = _run_lint(tmp_path)
assert r.returncode == 0
# No warning emitted (server URL is set)
assert "::warning" not in r.stdout
# ---------------------------------------------------------------------------
# Aggregation — single file with multiple anti-patterns
# ---------------------------------------------------------------------------
MULTI_VIOLATIONS = """
name: ci / multi
on:
workflow_dispatch:
inputs:
v:
type: string
workflow_run:
workflows: [up]
types: [completed]
jobs:
x:
runs-on: ubuntu-latest
steps:
- uses: molecule-ai/molecule-ci/.gitea/actions/x@main
"""
def test_all_violations_aggregated_single_file(tmp_path):
_write(tmp_path, "multi.yml", MULTI_VIOLATIONS)
r = _run_lint(tmp_path)
assert r.returncode == 1
out = r.stdout
# All four FATAL rules should be reported (1, 2, 3, 5)
assert "workflow_dispatch.inputs" in out
assert "workflow_run" in out
assert "/" in out # rule 3 surfaces the slash
assert ("cross-repo" in out.lower()) or ("uses" in out.lower())
# ---------------------------------------------------------------------------
# Empty-dir / no-workflows edge case
# ---------------------------------------------------------------------------
def test_no_workflows_exits_zero(tmp_path):
r = _run_lint(tmp_path)
assert r.returncode == 0
# ---------------------------------------------------------------------------
# Vendor-truth: rule 1 catches the exact 2026-05-11 publish-runtime.yml shape
# ---------------------------------------------------------------------------
# The exact YAML shape from feedback_gitea_workflow_dispatch_inputs_unsupported
# that caused publish-runtime-v1.0.0 to silently freeze PyPI at 0.1.129 for ~24h.
PUBLISH_RUNTIME_VENDOR_TRUTH = """
name: publish-runtime
on:
push:
tags: ['runtime-v*']
workflow_dispatch:
inputs:
version:
description: "Version to publish (e.g. 0.1.6). Required for manual dispatch."
required: true
type: string
jobs:
x:
runs-on: ubuntu-latest
steps:
- run: echo hi
"""
def test_rule1_catches_2026_05_11_publish_runtime_regression(tmp_path):
"""Vendor-truth fixture: the exact YAML shape that froze PyPI for 24h."""
_write(tmp_path, "publish-runtime.yml", PUBLISH_RUNTIME_VENDOR_TRUTH)
r = _run_lint(tmp_path)
assert r.returncode == 1, (
"Lint must catch the 2026-05-11 publish-runtime regression "
f"(memory: feedback_gitea_workflow_dispatch_inputs_unsupported)."
f"\nstdout={r.stdout}"
)
-72
View File
@@ -189,78 +189,6 @@ def test_is_red_no_statuses(wd_module):
assert failed == []
# --------------------------------------------------------------------------
# Per-entry vendor-truth key (rev4) — see status-reaper rev4 sibling
#
# Gitea 1.22.6 returns per-entry items in combined.statuses[] with key
# `status`, not `state`. Pre-rev4 code only read `state` → failed[]
# was always empty → render_body always emitted the fallback "no
# per-context entries were in a red state". These tests use the
# canonical Gitea shape to lock the fix in.
# --------------------------------------------------------------------------
def test_is_red_vendor_truth_status_key_under_pending(wd_module):
"""Real Gitea 1.22.6 shape: per-entry uses `status`. A single failed
context counts as red even when combined is `pending`. Pre-rev4
this returned `(False, [])` because `s.get("state")` was None."""
red, failed = wd_module.is_red({
"state": "pending",
"statuses": [
{"context": "ci/lint", "status": "success"},
{"context": "ci/test", "status": "failure"},
{"context": "ci/build", "status": "pending"},
],
})
assert red is True
assert [s["context"] for s in failed] == ["ci/test"]
def test_is_red_status_takes_precedence_over_state(wd_module):
"""If both keys present (defensive), `status` (vendor truth) wins."""
red, failed = wd_module.is_red({
"state": "pending",
"statuses": [
# `status=failure` is truth even though `state=success` is
# stale. Locking in the precedence prevents a hypothetical
# future Gitea release that emits both from re-introducing
# the bug under a different shape.
{"context": "ci/test", "status": "failure", "state": "success"},
],
})
assert red is True
assert len(failed) == 1
def test_is_red_state_only_fallback_still_works(wd_module):
"""Backward-compat: a legacy fixture or future Gitea variant that
only emits `state` still trips the red detection via the fallback
chain. Keeps pre-rev4 fixtures green during the rev4 rollout."""
red, failed = wd_module.is_red({
"state": "pending",
"statuses": [
{"context": "ci/test", "state": "failure"}, # legacy shape
],
})
assert red is True
assert len(failed) == 1
def test_render_body_uses_status_key_for_per_entry_state(wd_module):
"""render_body must surface the per-entry `status` value in the
issue body. Pre-rev4 it read `state` (always None on real Gitea) →
every issue body said `(no state)`, defeating the diagnostic."""
failed = [
{"context": "ci/test", "status": "failure",
"target_url": "https://example.test/run/1",
"description": "broke"},
]
body = wd_module.render_body("deadbeefcafe1234", failed, {})
assert "`failure`" in body, (
"render_body did not surface per-entry status — likely still "
"reading `state` key only (rev1-3 bug)."
)
assert "(no state)" not in body
# --------------------------------------------------------------------------
# Happy path — main is green, no issue created
# --------------------------------------------------------------------------
-150
View File
@@ -544,156 +544,6 @@ def test_reap_unparseable_push_context_preserved(sr_module, monkeypatch):
assert counters["preserved_unparseable"] == 1
# --------------------------------------------------------------------------
# Per-context status-key vendor-truth (rev4)
#
# Gitea 1.22.6 returns commit-status entries with key `status` per entry,
# NOT `state`. The TOP-LEVEL combined aggregate uses `state`. This schema
# asymmetry caused rev1-3 to take the compensation path 0 times despite
# triggering on real failures: `s.get("state")` returned None → state
# evaluated to "" → `"" != "failure"` guard preserved every entry.
#
# These tests explicitly use the vendor-truth shape (`status` per entry),
# proving the rev4 fix routes the failure entry through compensation.
# Fixtures in rev1-3 tests above use `state` (the pre-fix bug shape) —
# we keep them for backward-compat coverage via the fallback in
# `s.get("status") or s.get("state")`, but the canonical Gitea shape
# uses `status`. Logged under
# `feedback_smoke_test_vendor_truth_not_shape_match`.
# --------------------------------------------------------------------------
def test_reap_per_context_uses_status_key_not_state(sr_module, monkeypatch):
"""Empirical Gitea 1.22.6 shape: per-entry uses `status`, top-level
uses `state`. The rev4 fix MUST detect failure via `status`."""
calls = []
def fake_api(method, path, *, body=None, query=None, expect_json=True):
calls.append((method, path, body))
return (201, {})
monkeypatch.setattr(sr_module, "api", fake_api)
workflow_map = {"staging-smoke": False} # no push trigger → Class-O
# Real Gitea-shaped response: top-level `state`, per-entry `status`.
# No `state` key on the per-entry item.
combined = {
"state": "failure",
"statuses": [
{
"context": "staging-smoke / smoke (push)",
"status": "failure", # ← vendor-truth key
"target_url": "https://example.test/run/1",
"description": "smoke job failed",
}
],
}
counters = sr_module.reap(workflow_map, combined, SHA, dry_run=False)
# The bug-class assertion: pre-rev4 this would have been 0, with
# preserved_non_failure=1. Rev4 reads `status` → routes to compensate.
assert counters["compensated"] == 1, (
"Compensation path unreachable: status-reaper still reads `state` "
"instead of `status` on per-entry combined.statuses[] items "
"(rev1-3 bug)."
)
assert counters["preserved_non_failure"] == 0
assert len(calls) == 1
assert calls[0][0] == "POST"
assert calls[0][1] == f"/repos/owner/repo/statuses/{SHA}"
def test_reap_per_context_status_key_takes_precedence_over_state(
sr_module, monkeypatch
):
"""Defensive: if both `status` and `state` are present (e.g. a
hypothetical Gitea version emits both), `status` (the canonical
Gitea 1.22.6 key) wins. Guards against a future regression where
a fixture or future Gitea release emits stale `state="success"`
while `status="failure"` is the truth."""
calls = []
def fake_api(method, path, *, body=None, query=None, expect_json=True):
calls.append((method, path, body))
return (201, {})
monkeypatch.setattr(sr_module, "api", fake_api)
workflow_map = {"staging-smoke": False}
combined = {
"state": "failure",
"statuses": [
{
"context": "staging-smoke / smoke (push)",
# Both keys present — vendor-truth `status` MUST win.
"status": "failure",
"state": "success",
"target_url": "https://example.test/run/2",
"description": "smoke job failed",
}
],
}
counters = sr_module.reap(workflow_map, combined, SHA, dry_run=False)
assert counters["compensated"] == 1
assert counters["preserved_non_failure"] == 0
assert len(calls) == 1
def test_reap_per_context_state_only_fallback(sr_module, monkeypatch):
"""Backward-compat: a test fixture or older Gitea variant that emits
only `state` (no `status`) must still flow through compensation.
Belt-and-suspenders against future fixture drift. Keeps rev1-3
`state`-using fixtures green."""
calls = []
def fake_api(method, path, *, body=None, query=None, expect_json=True):
calls.append((method, path, body))
return (201, {})
monkeypatch.setattr(sr_module, "api", fake_api)
workflow_map = {"staging-smoke": False}
combined = {
"state": "failure",
"statuses": [
{
"context": "staging-smoke / smoke (push)",
"state": "failure", # legacy fixture shape only
"target_url": "https://example.test/run/3",
}
],
}
counters = sr_module.reap(workflow_map, combined, SHA, dry_run=False)
assert counters["compensated"] == 1
assert len(calls) == 1
def test_reap_per_context_missing_both_keys_preserves(sr_module, monkeypatch):
"""A per-entry item lacking BOTH `status` and `state` must be
preserved (counted under preserved_non_failure). This is the only
correctly-behaving leg of the pre-rev4 bug — exercising it ensures
the fallback chain doesn't accidentally over-compensate on
malformed entries."""
monkeypatch.setattr(
sr_module, "api",
lambda *a, **kw: (_ for _ in ()).throw(
AssertionError("api should not be called")
),
)
workflow_map = {"staging-smoke": False}
combined = {
"state": "failure",
"statuses": [
{
"context": "staging-smoke / smoke (push)",
# No status, no state — neither key present.
"target_url": "https://example.test/run/4",
}
],
}
counters = sr_module.reap(workflow_map, combined, SHA, dry_run=False)
assert counters["compensated"] == 0
assert counters["preserved_non_failure"] == 1
# --------------------------------------------------------------------------
# ApiError propagation
# --------------------------------------------------------------------------