Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 65831c839e | |||
| d342149646 | |||
| 54907ee852 | |||
| 99453c6a71 |
@@ -23,6 +23,7 @@ import dataclasses
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
@@ -326,6 +327,43 @@ def update_pull(pr_number: int, *, dry_run: bool) -> None:
|
||||
)
|
||||
|
||||
|
||||
def wait_for_ci(
|
||||
head_sha: str,
|
||||
contexts: list[str],
|
||||
*,
|
||||
max_wait_seconds: int = 300,
|
||||
poll_interval: int = 15,
|
||||
) -> bool:
|
||||
"""Poll CI statuses for head_sha until all required contexts are terminal.
|
||||
|
||||
Returns True if all contexts reached 'success', False if timeout expired
|
||||
(some still pending or failed).
|
||||
|
||||
Background: after a queue-triggered PR update, CI re-runs on the new head.
|
||||
The queue must not update again until CI completes — otherwise the
|
||||
update-then-wait loop keeps the PR in a perpetually-updating state where
|
||||
CI never finishes on any single head.
|
||||
"""
|
||||
deadline = time.time() + max_wait_seconds
|
||||
while time.time() < deadline:
|
||||
time.sleep(poll_interval)
|
||||
try:
|
||||
pr_status = get_combined_status(head_sha)
|
||||
except Exception as exc:
|
||||
sys.stderr.write(f"::warning::wait_for_ci: status fetch failed: {exc}\n")
|
||||
continue
|
||||
latest = latest_statuses_by_context(pr_status.get("statuses") or [])
|
||||
ok, bad = required_contexts_green(latest, contexts)
|
||||
if ok:
|
||||
sys.stderr.write(f"::notice::wait_for_ci: all contexts green after {int(time.time() - (deadline - max_wait_seconds))}s\n")
|
||||
return True
|
||||
# Log progress
|
||||
pending = [f"{c}={latest.get(c, {}).get('status', 'missing')}" for c in contexts if latest.get(c, {}).get('status') != 'success']
|
||||
sys.stderr.write(f"::notice::wait_for_ci: still waiting ({int(deadline - time.time())}s left): {', '.join(pending[:3])}\n")
|
||||
sys.stderr.write(f"::warning::wait_for_ci: timeout after {max_wait_seconds}s; proceeding with merge check\n")
|
||||
return False
|
||||
|
||||
|
||||
def merge_pull(pr_number: int, *, dry_run: bool) -> None:
|
||||
payload = {
|
||||
"Do": "merge",
|
||||
@@ -338,7 +376,24 @@ def merge_pull(pr_number: int, *, dry_run: bool) -> None:
|
||||
print(f"::notice::merging PR #{pr_number}")
|
||||
if dry_run:
|
||||
return
|
||||
api("POST", f"/repos/{OWNER}/{NAME}/pulls/{pr_number}/merge", body=payload, expect_json=False)
|
||||
# Gitea's merge endpoint returns HTTP 200 with an empty body on success.
|
||||
# The generic api() wrapper raises ApiError on non-2xx, so a 200 with an
|
||||
# empty body reaches the json.loads() path and raises JSONDecodeError,
|
||||
# which api() re-raises as ApiError — making the queue think the merge
|
||||
# failed when it actually succeeded. Work around this by catching the
|
||||
# expected JSONDecodeError here and treating it as success.
|
||||
try:
|
||||
api("POST", f"/repos/{OWNER}/{NAME}/pulls/{pr_number}/merge", body=payload, expect_json=False)
|
||||
except ApiError as exc:
|
||||
# Surface non-merge errors (5xx server errors, 403 forbidden, etc.)
|
||||
if "merge" in str(exc).lower() or "405" in str(exc) or "409" in str(exc):
|
||||
# 405 = PR not mergeable (already merged or CI still running by
|
||||
# the time we got here — the PR will be re-checked next tick)
|
||||
# 409 = merge conflict detected at merge time
|
||||
# In both cases the PR stays open and the next tick re-evaluates.
|
||||
sys.stderr.write(f"::warning::merge call returned: {exc}\n")
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
def process_once(*, dry_run: bool = False) -> int:
|
||||
@@ -390,6 +445,32 @@ def process_once(*, dry_run: bool = False) -> int:
|
||||
print(f"::notice::PR #{pr_number} decision={decision.action}: {decision.reason}")
|
||||
if decision.action == "update":
|
||||
update_pull(pr_number, dry_run=dry_run)
|
||||
# After an update, CI re-runs on the new head. If we check statuses
|
||||
# immediately we see pending (CI not started yet on the new head), so
|
||||
# the next tick updates again — CI never completes on any single head.
|
||||
# Fix: re-fetch the PR to get the new head SHA, then poll CI for up
|
||||
# to 5 min until all required contexts reach terminal state. If CI
|
||||
# finishes in time, proceed to merge on the same tick.
|
||||
if not dry_run:
|
||||
updated_pr = get_pull(pr_number)
|
||||
new_head = updated_pr.get("head", {}).get("sha", "")
|
||||
if new_head and new_head != head_sha:
|
||||
sys.stderr.write(f"::notice::PR #{pr_number}: update created new head {new_head[:8]}; waiting for CI...\n")
|
||||
waited = wait_for_ci(new_head, contexts, max_wait_seconds=300, poll_interval=15)
|
||||
if waited:
|
||||
# CI completed — re-fetch main to confirm it hasn't moved,
|
||||
# then merge immediately without another update cycle.
|
||||
current_main_sha = get_branch_head(WATCH_BRANCH)
|
||||
if current_main_sha != main_sha:
|
||||
sys.stderr.write(f"::notice::PR #{pr_number}: main moved {main_sha[:8]} -> {current_main_sha[:8]}; deferring\n")
|
||||
return 0
|
||||
sys.stderr.write(f"::notice::PR #{pr_number}: CI complete; merging now\n")
|
||||
merge_pull(pr_number, dry_run=dry_run)
|
||||
return 0
|
||||
else:
|
||||
sys.stderr.write(f"::warning::PR #{pr_number}: CI did not finish within 5 min; will retry next tick\n")
|
||||
else:
|
||||
sys.stderr.write(f"::notice::PR #{pr_number}: update did not change head SHA; will retry\n")
|
||||
post_comment(
|
||||
pr_number,
|
||||
(
|
||||
@@ -400,6 +481,13 @@ def process_once(*, dry_run: bool = False) -> int:
|
||||
)
|
||||
return 0
|
||||
if decision.ready:
|
||||
# Re-fetch PR to confirm head hasn't changed since we last checked
|
||||
# (CI may have updated the head while we were evaluating).
|
||||
current_pr = get_pull(pr_number)
|
||||
current_head = current_pr.get("head", {}).get("sha", "")
|
||||
if current_head != head_sha:
|
||||
print(f"::notice::PR #{pr_number} head changed {head_sha[:8]} -> {current_head[:8]}; re-evaluating")
|
||||
return 0
|
||||
latest_main_sha = get_branch_head(WATCH_BRANCH)
|
||||
if latest_main_sha != main_sha:
|
||||
print(
|
||||
|
||||
@@ -32,6 +32,12 @@ on:
|
||||
# iterating all open PRs when PR_NUMBER is empty.
|
||||
workflow_dispatch:
|
||||
|
||||
# Cancel stale runs so the 8-runner pool stays available for PR jobs.
|
||||
# Per-SHA group ensures push and cron runs at different SHAs don't cancel each other.
|
||||
concurrency:
|
||||
group: gate-check-v3-${{ github.event.pull_request.head.sha || github.sha }}
|
||||
cancel-in-progress: true
|
||||
|
||||
permissions:
|
||||
# read: contents — for checkout (base ref, not PR head for security)
|
||||
# read: pull-requests — for reading PR info via API
|
||||
|
||||
@@ -162,7 +162,6 @@ jobs:
|
||||
exit 1
|
||||
fi
|
||||
python -m twine upload \
|
||||
--verbose \
|
||||
--repository pypi \
|
||||
--username __token__ \
|
||||
--password "$PYPI_TOKEN" \
|
||||
|
||||
@@ -44,6 +44,12 @@ on:
|
||||
- ".github/scripts/lint_secret_pattern_drift.py"
|
||||
- ".githooks/pre-commit"
|
||||
|
||||
# Cancel stale runs to keep the 8-runner pool available for PR jobs.
|
||||
# Per-SHA group ensures push and scheduled runs at different SHAs don't cancel each other.
|
||||
concurrency:
|
||||
group: secret-pattern-drift-${{ github.event.pull_request.head.sha || github.sha }}
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
GITHUB_SERVER_URL: https://git.moleculesai.app
|
||||
|
||||
|
||||
@@ -22,6 +22,11 @@ on:
|
||||
- cron: '17 4 * * 1' # Mondays at 04:17 UTC
|
||||
workflow_dispatch:
|
||||
|
||||
# Cancel stale runs to keep the 8-runner pool available for PR jobs.
|
||||
concurrency:
|
||||
group: weekly-platform-go-${{ github.event.pull_request.head.sha || github.sha }}
|
||||
cancel-in-progress: true
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
statuses: write
|
||||
|
||||
@@ -192,22 +192,11 @@ def test_pop_missing_id_returns_none(state: inbox.InboxState):
|
||||
|
||||
def test_wait_returns_existing_head_immediately(state: inbox.InboxState):
|
||||
state.record(_msg("a"))
|
||||
wait_timeout = 5.0
|
||||
start = time.monotonic()
|
||||
msg = state.wait(timeout_secs=wait_timeout)
|
||||
msg = state.wait(timeout_secs=5.0)
|
||||
elapsed = time.monotonic() - start
|
||||
assert msg is not None and msg.activity_id == "a"
|
||||
# Structural assertion (NOT an absolute deadline): with a non-empty
|
||||
# queue wait() returns its head without ever entering the timed
|
||||
# block, so it consumes a tiny fraction of timeout_secs. Comparing
|
||||
# against the timeout (rather than a magic "< 0.5s") keeps this
|
||||
# robust on a CPU-starved CI host: only an implementation that
|
||||
# actually blocks for the timeout (≈1.0×) trips it. 0.2× = wide
|
||||
# jitter margin for a scheduler under contention.
|
||||
assert elapsed < wait_timeout * 0.2, (
|
||||
f"wait should not block when queue non-empty: took {elapsed:.2f}s "
|
||||
f"({elapsed / wait_timeout:.2f}× the {wait_timeout:.0f}s timeout)"
|
||||
)
|
||||
assert elapsed < 0.5, f"wait should not block when queue non-empty (took {elapsed:.2f}s)"
|
||||
|
||||
|
||||
def test_wait_blocks_until_message_arrives(state: inbox.InboxState):
|
||||
|
||||
@@ -829,26 +829,16 @@ def _stub_client_for_batch(get_responses: dict[str, MagicMock]) -> MagicMock:
|
||||
|
||||
|
||||
def test_batch_fetcher_runs_submitted_rows_concurrently():
|
||||
# Three rows whose .get() blocks for ~PER_ROW each. With >=3 workers
|
||||
# the batch should overlap the sleeps (~1× PER_ROW wall time), not
|
||||
# run them back-to-back (~3× PER_ROW).
|
||||
#
|
||||
# NOTE: we assert a *structural* property — observed wall time is
|
||||
# meaningfully below the serial sum — NOT an absolute deadline. An
|
||||
# absolute "< 250ms" ceiling silently encodes "the runner host is
|
||||
# idle"; under CI contention (load >100) the same concurrent run
|
||||
# measures >250ms and false-reds an unrelated PR (motivating
|
||||
# incident: #190 / PR#1348, a 1.6ms overshoot on a load-107 host).
|
||||
# Host load scales serial and concurrent timing together, so the
|
||||
# ratio between them stays a reliable concurrency discriminator.
|
||||
# Three rows whose .get() blocks for ~120ms each. With 4 workers the
|
||||
# batch should complete in ~120ms (parallel), not ~360ms (serial).
|
||||
# The 250ms ceiling accommodates CI scheduler jitter while still
|
||||
# discriminating concurrent (~120ms) from serial (~360ms).
|
||||
import time
|
||||
|
||||
PER_ROW = 0.12
|
||||
N_ROWS = 3
|
||||
serial_total = PER_ROW * N_ROWS
|
||||
barrier_start = [0.0]
|
||||
|
||||
def _slow_get(url: str, headers: dict[str, str] | None = None) -> MagicMock:
|
||||
time.sleep(PER_ROW)
|
||||
time.sleep(0.12)
|
||||
for fid in ("a", "b", "c"):
|
||||
if f"/pending-uploads/{fid}/content" in url:
|
||||
return _make_resp(200, content=b"X", content_type="text/plain")
|
||||
@@ -865,22 +855,16 @@ def test_batch_fetcher_runs_submitted_rows_concurrently():
|
||||
client=client,
|
||||
max_workers=4,
|
||||
)
|
||||
start = time.monotonic()
|
||||
barrier_start[0] = time.time()
|
||||
for fid in ("a", "b", "c"):
|
||||
bf.submit(_row_with_id(f"act-{fid}", fid))
|
||||
bf.wait_all()
|
||||
elapsed = time.monotonic() - start
|
||||
elapsed = time.time() - barrier_start[0]
|
||||
bf.close()
|
||||
|
||||
# Concurrent execution overlaps the three sleeps, so wall time is far
|
||||
# below the serial sum. 0.6× leaves generous headroom for scheduler
|
||||
# jitter while still failing loudly if the rows ran serially (which
|
||||
# would land at ~1.0× serial_total regardless of host load).
|
||||
assert elapsed < serial_total * 0.6, (
|
||||
f"3 rows × {PER_ROW}s with 4 workers ran in {elapsed:.3f}s; expected "
|
||||
f"well under the {serial_total:.2f}s serial sum (got "
|
||||
f"{elapsed / serial_total:.2f}× — suggests serial execution, "
|
||||
"Phase 5b regression)"
|
||||
assert elapsed < 0.25, (
|
||||
f"3 rows × 120ms with 4 workers should finish in <250ms; got {elapsed:.3f}s "
|
||||
"(suggests serial execution — Phase 5b regression)"
|
||||
)
|
||||
assert client.get.call_count == 3
|
||||
assert client.post.call_count == 3
|
||||
@@ -1066,14 +1050,13 @@ def test_batch_fetcher_close_after_timeout_does_not_block_on_running_workers():
|
||||
import threading
|
||||
import time
|
||||
|
||||
BLOCK_SECS = 5.0 # the leaked worker's max self-unblock window
|
||||
blocker = threading.Event() # never set — workers stay running
|
||||
|
||||
def _hang_get(url, headers=None):
|
||||
# Wait at most BLOCK_SECS so a buggy implementation eventually
|
||||
# unblocks the test instead of timing out the whole pytest run,
|
||||
# but nothing legitimate should reach this fallback.
|
||||
blocker.wait(timeout=BLOCK_SECS)
|
||||
# Wait at most ~5s so a buggy implementation eventually unblocks
|
||||
# the test instead of timing out the whole pytest run, but
|
||||
# nothing legitimate should reach this fallback.
|
||||
blocker.wait(timeout=5.0)
|
||||
return _make_resp(200, content=b"x", content_type="text/plain")
|
||||
|
||||
client = MagicMock()
|
||||
@@ -1090,23 +1073,17 @@ def test_batch_fetcher_close_after_timeout_does_not_block_on_running_workers():
|
||||
bf.submit(_row_with_id("act-a", "a"))
|
||||
# Tiny timeout — wait_all must report the future as not_done.
|
||||
bf.wait_all(timeout=0.05)
|
||||
start = time.monotonic()
|
||||
t0 = time.time()
|
||||
bf.close()
|
||||
elapsed = time.monotonic() - start
|
||||
elapsed = time.time() - t0
|
||||
# Unblock the lingering worker so it doesn't pollute later tests.
|
||||
blocker.set()
|
||||
|
||||
# Structural assertion (NOT an absolute deadline): without the
|
||||
# cancel-on-timeout fix, close() drains the leaked worker and blocks
|
||||
# the FULL BLOCK_SECS; with the fix shutdown(wait=False) returns
|
||||
# without draining, so close() finishes a small fraction of that.
|
||||
# We compare against BLOCK_SECS rather than a magic "< 1.0s" so a
|
||||
# CPU-starved CI host (which inflates both the drain and the
|
||||
# non-drain path together) cannot false-fail this — only genuinely
|
||||
# draining (≈1.0× BLOCK_SECS) trips it. 0.3× = wide jitter margin.
|
||||
assert elapsed < BLOCK_SECS * 0.3, (
|
||||
f"close() took {elapsed:.2f}s after wait_all timeout "
|
||||
f"({elapsed / BLOCK_SECS:.2f}× the {BLOCK_SECS:.0f}s worker block) — "
|
||||
# Without the cancel-on-timeout fix, close() would block until
|
||||
# blocker.set() — i.e., the full ~5s. With the fix it returns
|
||||
# immediately because shutdown(wait=False) doesn't drain.
|
||||
assert elapsed < 1.0, (
|
||||
f"close() blocked for {elapsed:.2f}s after wait_all timeout — "
|
||||
"cancel-on-timeout regression: close() is draining instead of bailing"
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user