Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| dc858ad164 | |||
| 2ffd44c694 |
@@ -148,15 +148,38 @@ def latest_statuses_by_context(statuses: list[dict]) -> dict[str, dict]:
|
||||
return latest
|
||||
|
||||
|
||||
def _is_tier_low_pending_ok(
|
||||
latest_statuses: dict[str, dict],
|
||||
context: str,
|
||||
pr_labels: set[str],
|
||||
) -> bool:
|
||||
"""Return True if tier:low PR can tolerate sop-checklist pending state.
|
||||
|
||||
Per sop-checklist-config.yaml tier_failure_mode, tier:low uses soft-fail:
|
||||
sop-checklist posts state=pending when acks are satisfied (missing
|
||||
manager/ceo acks are informational only). The queue should accept
|
||||
pending instead of waiting for success.
|
||||
"""
|
||||
if "tier:low" not in pr_labels:
|
||||
return False
|
||||
if "sop-checklist" not in context:
|
||||
return False
|
||||
status = latest_statuses.get(context) or {}
|
||||
return status_state(status) == "pending"
|
||||
|
||||
|
||||
def required_contexts_green(
|
||||
latest_statuses: dict[str, dict],
|
||||
contexts: list[str],
|
||||
pr_labels: set[str] | None = None,
|
||||
) -> tuple[bool, list[str]]:
|
||||
missing_or_bad: list[str] = []
|
||||
for context in contexts:
|
||||
status = latest_statuses.get(context)
|
||||
state = status_state(status or {})
|
||||
if state != "success":
|
||||
if pr_labels and _is_tier_low_pending_ok(latest_statuses, context, pr_labels):
|
||||
continue # tier:low soft-fail: accept pending sop-checklist
|
||||
missing_or_bad.append(f"{context}={state or 'missing'}")
|
||||
return not missing_or_bad, missing_or_bad
|
||||
|
||||
@@ -209,6 +232,7 @@ def evaluate_merge_readiness(
|
||||
pr_status: dict,
|
||||
required_contexts: list[str],
|
||||
pr_has_current_base: bool,
|
||||
pr_labels: set[str] | None = None,
|
||||
) -> MergeDecision:
|
||||
# Check push-required contexts explicitly instead of combined state.
|
||||
# Combined state can be "failure" due to non-blocking jobs
|
||||
@@ -228,7 +252,7 @@ def evaluate_merge_readiness(
|
||||
# The required_contexts list is the authoritative gate — it includes only
|
||||
# the checks that actually block merges.
|
||||
latest = latest_statuses_by_context(pr_status.get("statuses") or [])
|
||||
ok, missing_or_bad = required_contexts_green(latest, required_contexts)
|
||||
ok, missing_or_bad = required_contexts_green(latest, required_contexts, pr_labels)
|
||||
if not ok:
|
||||
return MergeDecision(False, "wait", "required contexts not green: " + ", ".join(missing_or_bad))
|
||||
return MergeDecision(True, "merge", "ready")
|
||||
@@ -253,27 +277,32 @@ def get_combined_status(sha: str) -> dict:
|
||||
_, combined = api("GET", f"/repos/{OWNER}/{NAME}/commits/{sha}/status")
|
||||
if not isinstance(combined, dict):
|
||||
raise ApiError(f"status for {sha} response not object")
|
||||
# Fetch full statuses list; 200 covers >99% of real-world runs.
|
||||
# The list is ordered ascending by id (oldest first) — callers must
|
||||
# iterate in reverse to get the newest entry per context.
|
||||
# Best-effort: large repos (main with 550+ statuses) may time out.
|
||||
# On timeout, fall back to the statuses[] already in the combined
|
||||
# response (usually 30 entries — enough for most PRs, enough for
|
||||
# main's early push-required contexts).
|
||||
combined_statuses: list[dict] = combined.get("statuses") or []
|
||||
try:
|
||||
_, all_statuses = api(
|
||||
_, all_statuses_raw = api(
|
||||
"GET",
|
||||
f"/repos/{OWNER}/{NAME}/commits/{sha}/statuses",
|
||||
query={"limit": "50"},
|
||||
)
|
||||
if isinstance(all_statuses, list):
|
||||
combined["statuses"] = all_statuses
|
||||
if isinstance(all_statuses_raw, list):
|
||||
all_statuses: list[dict] = list(all_statuses_raw)
|
||||
else:
|
||||
all_statuses = []
|
||||
except (ApiError, urllib.error.URLError, TimeoutError, OSError) as exc:
|
||||
# URLError covers network-level failures (DNS, refused, timeout).
|
||||
# TimeoutError and OSError cover socket-level timeouts.
|
||||
sys.stderr.write(f"::warning::could not fetch full statuses list for {sha[:8]}: {exc}\n")
|
||||
# Fall back to the statuses[] already in the combined response.
|
||||
pass
|
||||
all_statuses = []
|
||||
# Build latest per context: process combined (ascending→reverse=newest
|
||||
# first), then fill gaps from all_statuses (already newest-first).
|
||||
latest: dict[str, dict] = {}
|
||||
for status in reversed(sorted(combined_statuses, key=lambda s: s.get("id") or 0)):
|
||||
ctx = status.get("context")
|
||||
if isinstance(ctx, str) and ctx not in latest:
|
||||
latest[ctx] = status
|
||||
for status in all_statuses:
|
||||
ctx = status.get("context")
|
||||
if isinstance(ctx, str) and ctx not in latest:
|
||||
latest[ctx] = status
|
||||
combined["statuses"] = list(latest.values())
|
||||
return combined
|
||||
|
||||
|
||||
@@ -380,11 +409,13 @@ def process_once(*, dry_run: bool = False) -> int:
|
||||
commits = get_pull_commits(pr_number)
|
||||
current_base = pr_has_current_base(pr, commits, main_sha)
|
||||
pr_status = get_combined_status(head_sha)
|
||||
pr_labels = label_names(pr)
|
||||
decision = evaluate_merge_readiness(
|
||||
main_status=main_status,
|
||||
pr_status=pr_status,
|
||||
required_contexts=contexts,
|
||||
pr_has_current_base=current_base,
|
||||
pr_labels=pr_labels,
|
||||
)
|
||||
|
||||
print(f"::notice::PR #{pr_number} decision={decision.action}: {decision.reason}")
|
||||
|
||||
@@ -192,22 +192,11 @@ def test_pop_missing_id_returns_none(state: inbox.InboxState):
|
||||
|
||||
def test_wait_returns_existing_head_immediately(state: inbox.InboxState):
|
||||
state.record(_msg("a"))
|
||||
wait_timeout = 5.0
|
||||
start = time.monotonic()
|
||||
msg = state.wait(timeout_secs=wait_timeout)
|
||||
msg = state.wait(timeout_secs=5.0)
|
||||
elapsed = time.monotonic() - start
|
||||
assert msg is not None and msg.activity_id == "a"
|
||||
# Structural assertion (NOT an absolute deadline): with a non-empty
|
||||
# queue wait() returns its head without ever entering the timed
|
||||
# block, so it consumes a tiny fraction of timeout_secs. Comparing
|
||||
# against the timeout (rather than a magic "< 0.5s") keeps this
|
||||
# robust on a CPU-starved CI host: only an implementation that
|
||||
# actually blocks for the timeout (≈1.0×) trips it. 0.2× = wide
|
||||
# jitter margin for a scheduler under contention.
|
||||
assert elapsed < wait_timeout * 0.2, (
|
||||
f"wait should not block when queue non-empty: took {elapsed:.2f}s "
|
||||
f"({elapsed / wait_timeout:.2f}× the {wait_timeout:.0f}s timeout)"
|
||||
)
|
||||
assert elapsed < 0.5, f"wait should not block when queue non-empty (took {elapsed:.2f}s)"
|
||||
|
||||
|
||||
def test_wait_blocks_until_message_arrives(state: inbox.InboxState):
|
||||
|
||||
@@ -829,26 +829,16 @@ def _stub_client_for_batch(get_responses: dict[str, MagicMock]) -> MagicMock:
|
||||
|
||||
|
||||
def test_batch_fetcher_runs_submitted_rows_concurrently():
|
||||
# Three rows whose .get() blocks for ~PER_ROW each. With >=3 workers
|
||||
# the batch should overlap the sleeps (~1× PER_ROW wall time), not
|
||||
# run them back-to-back (~3× PER_ROW).
|
||||
#
|
||||
# NOTE: we assert a *structural* property — observed wall time is
|
||||
# meaningfully below the serial sum — NOT an absolute deadline. An
|
||||
# absolute "< 250ms" ceiling silently encodes "the runner host is
|
||||
# idle"; under CI contention (load >100) the same concurrent run
|
||||
# measures >250ms and false-reds an unrelated PR (motivating
|
||||
# incident: #190 / PR#1348, a 1.6ms overshoot on a load-107 host).
|
||||
# Host load scales serial and concurrent timing together, so the
|
||||
# ratio between them stays a reliable concurrency discriminator.
|
||||
# Three rows whose .get() blocks for ~120ms each. With 4 workers the
|
||||
# batch should complete in ~120ms (parallel), not ~360ms (serial).
|
||||
# The 250ms ceiling accommodates CI scheduler jitter while still
|
||||
# discriminating concurrent (~120ms) from serial (~360ms).
|
||||
import time
|
||||
|
||||
PER_ROW = 0.12
|
||||
N_ROWS = 3
|
||||
serial_total = PER_ROW * N_ROWS
|
||||
barrier_start = [0.0]
|
||||
|
||||
def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock:
|
||||
time.sleep(PER_ROW)
|
||||
time.sleep(0.12)
|
||||
for fid in ("a", "b", "c"):
|
||||
if f"/pending-uploads/{fid}/content" in url:
|
||||
return _make_resp(200, content=b"X", content_type="text/plain")
|
||||
@@ -865,22 +855,16 @@ def test_batch_fetcher_runs_submitted_rows_concurrently():
|
||||
client=client,
|
||||
max_workers=4,
|
||||
)
|
||||
start = time.monotonic()
|
||||
barrier_start[0] = time.time()
|
||||
for fid in ("a", "b", "c"):
|
||||
bf.submit(_row_with_id(f"act-{fid}", fid))
|
||||
bf.wait_all()
|
||||
elapsed = time.monotonic() - start
|
||||
elapsed = time.time() - barrier_start[0]
|
||||
bf.close()
|
||||
|
||||
# Concurrent execution overlaps the three sleeps, so wall time is far
|
||||
# below the serial sum. 0.6× leaves generous headroom for scheduler
|
||||
# jitter while still failing loudly if the rows ran serially (which
|
||||
# would land at ~1.0× serial_total regardless of host load).
|
||||
assert elapsed < serial_total * 0.6, (
|
||||
f"3 rows × {PER_ROW}s with 4 workers ran in {elapsed:.3f}s; expected "
|
||||
f"well under the {serial_total:.2f}s serial sum (got "
|
||||
f"{elapsed / serial_total:.2f}× — suggests serial execution, "
|
||||
"Phase 5b regression)"
|
||||
assert elapsed < 0.25, (
|
||||
f"3 rows × 120ms with 4 workers should finish in <250ms; got {elapsed:.3f}s "
|
||||
"(suggests serial execution — Phase 5b regression)"
|
||||
)
|
||||
assert client.get.call_count == 3
|
||||
assert client.post.call_count == 3
|
||||
@@ -1066,14 +1050,13 @@ def test_batch_fetcher_close_after_timeout_does_not_block_on_running_workers():
|
||||
import threading
|
||||
import time
|
||||
|
||||
BLOCK_SECS = 5.0 # the leaked worker's max self-unblock window
|
||||
blocker = threading.Event() # never set — workers stay running
|
||||
|
||||
def _hang_get(url, headers=None):
|
||||
# Wait at most BLOCK_SECS so a buggy implementation eventually
|
||||
# unblocks the test instead of timing out the whole pytest run,
|
||||
# but nothing legitimate should reach this fallback.
|
||||
blocker.wait(timeout=BLOCK_SECS)
|
||||
# Wait at most ~5s so a buggy implementation eventually unblocks
|
||||
# the test instead of timing out the whole pytest run, but
|
||||
# nothing legitimate should reach this fallback.
|
||||
blocker.wait(timeout=5.0)
|
||||
return _make_resp(200, content=b"x", content_type="text/plain")
|
||||
|
||||
client = MagicMock()
|
||||
@@ -1090,23 +1073,17 @@ def test_batch_fetcher_close_after_timeout_does_not_block_on_running_workers():
|
||||
bf.submit(_row_with_id("act-a", "a"))
|
||||
# Tiny timeout — wait_all must report the future as not_done.
|
||||
bf.wait_all(timeout=0.05)
|
||||
start = time.monotonic()
|
||||
t0 = time.time()
|
||||
bf.close()
|
||||
elapsed = time.monotonic() - start
|
||||
elapsed = time.time() - t0
|
||||
# Unblock the lingering worker so it doesn't pollute later tests.
|
||||
blocker.set()
|
||||
|
||||
# Structural assertion (NOT an absolute deadline): without the
|
||||
# cancel-on-timeout fix, close() drains the leaked worker and blocks
|
||||
# the FULL BLOCK_SECS; with the fix shutdown(wait=False) returns
|
||||
# without draining, so close() finishes a small fraction of that.
|
||||
# We compare against BLOCK_SECS rather than a magic "< 1.0s" so a
|
||||
# CPU-starved CI host (which inflates both the drain and the
|
||||
# non-drain path together) cannot false-fail this — only genuinely
|
||||
# draining (≈1.0× BLOCK_SECS) trips it. 0.3× = wide jitter margin.
|
||||
assert elapsed < BLOCK_SECS * 0.3, (
|
||||
f"close() took {elapsed:.2f}s after wait_all timeout "
|
||||
f"({elapsed / BLOCK_SECS:.2f}× the {BLOCK_SECS:.0f}s worker block) — "
|
||||
# Without the cancel-on-timeout fix, close() would block until
|
||||
# blocker.set() — i.e., the full ~5s. With the fix it returns
|
||||
# immediately because shutdown(wait=False) doesn't drain.
|
||||
assert elapsed < 1.0, (
|
||||
f"close() blocked for {elapsed:.2f}s after wait_all timeout — "
|
||||
"cancel-on-timeout regression: close() is draining instead of bailing"
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user