Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 37e2d8a8fb | |||
| 4c0cd6b705 | |||
| af7afc6112 | |||
| dc858ad164 | |||
| 2ffd44c694 | |||
| 4f5d683f4b | |||
| df4a0e3f9d |
@@ -65,6 +65,11 @@ class ApiError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
class MergePermissionError(ApiError):
|
||||
"""Merge failed with a permanent permission error (403/404/405).
|
||||
The queue should skip this PR and move to the next one."""
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class MergeDecision:
|
||||
ready: bool
|
||||
@@ -148,15 +153,38 @@ def latest_statuses_by_context(statuses: list[dict]) -> dict[str, dict]:
|
||||
return latest
|
||||
|
||||
|
||||
def _is_tier_low_pending_ok(
|
||||
latest_statuses: dict[str, dict],
|
||||
context: str,
|
||||
pr_labels: set[str],
|
||||
) -> bool:
|
||||
"""Return True if tier:low PR can tolerate sop-checklist pending state.
|
||||
|
||||
Per sop-checklist-config.yaml tier_failure_mode, tier:low uses soft-fail:
|
||||
sop-checklist posts state=pending when acks are satisfied (missing
|
||||
manager/ceo acks are informational only). The queue should accept
|
||||
pending instead of waiting for success.
|
||||
"""
|
||||
if "tier:low" not in pr_labels:
|
||||
return False
|
||||
if "sop-checklist" not in context:
|
||||
return False
|
||||
status = latest_statuses.get(context) or {}
|
||||
return status_state(status) == "pending"
|
||||
|
||||
|
||||
def required_contexts_green(
|
||||
latest_statuses: dict[str, dict],
|
||||
contexts: list[str],
|
||||
pr_labels: set[str] | None = None,
|
||||
) -> tuple[bool, list[str]]:
|
||||
missing_or_bad: list[str] = []
|
||||
for context in contexts:
|
||||
status = latest_statuses.get(context)
|
||||
state = status_state(status or {})
|
||||
if state != "success":
|
||||
if pr_labels and _is_tier_low_pending_ok(latest_statuses, context, pr_labels):
|
||||
continue # tier:low soft-fail: accept pending sop-checklist
|
||||
missing_or_bad.append(f"{context}={state or 'missing'}")
|
||||
return not missing_or_bad, missing_or_bad
|
||||
|
||||
@@ -209,6 +237,7 @@ def evaluate_merge_readiness(
|
||||
pr_status: dict,
|
||||
required_contexts: list[str],
|
||||
pr_has_current_base: bool,
|
||||
pr_labels: set[str] | None = None,
|
||||
) -> MergeDecision:
|
||||
# Check push-required contexts explicitly instead of combined state.
|
||||
# Combined state can be "failure" due to non-blocking jobs
|
||||
@@ -228,7 +257,7 @@ def evaluate_merge_readiness(
|
||||
# The required_contexts list is the authoritative gate — it includes only
|
||||
# the checks that actually block merges.
|
||||
latest = latest_statuses_by_context(pr_status.get("statuses") or [])
|
||||
ok, missing_or_bad = required_contexts_green(latest, required_contexts)
|
||||
ok, missing_or_bad = required_contexts_green(latest, required_contexts, pr_labels)
|
||||
if not ok:
|
||||
return MergeDecision(False, "wait", "required contexts not green: " + ", ".join(missing_or_bad))
|
||||
return MergeDecision(True, "merge", "ready")
|
||||
@@ -253,27 +282,32 @@ def get_combined_status(sha: str) -> dict:
|
||||
_, combined = api("GET", f"/repos/{OWNER}/{NAME}/commits/{sha}/status")
|
||||
if not isinstance(combined, dict):
|
||||
raise ApiError(f"status for {sha} response not object")
|
||||
# Fetch full statuses list; 200 covers >99% of real-world runs.
|
||||
# The list is ordered ascending by id (oldest first) — callers must
|
||||
# iterate in reverse to get the newest entry per context.
|
||||
# Best-effort: large repos (main with 550+ statuses) may time out.
|
||||
# On timeout, fall back to the statuses[] already in the combined
|
||||
# response (usually 30 entries — enough for most PRs, enough for
|
||||
# main's early push-required contexts).
|
||||
combined_statuses: list[dict] = combined.get("statuses") or []
|
||||
try:
|
||||
_, all_statuses = api(
|
||||
_, all_statuses_raw = api(
|
||||
"GET",
|
||||
f"/repos/{OWNER}/{NAME}/commits/{sha}/statuses",
|
||||
query={"limit": "50"},
|
||||
)
|
||||
if isinstance(all_statuses, list):
|
||||
combined["statuses"] = all_statuses
|
||||
if isinstance(all_statuses_raw, list):
|
||||
all_statuses: list[dict] = list(all_statuses_raw)
|
||||
else:
|
||||
all_statuses = []
|
||||
except (ApiError, urllib.error.URLError, TimeoutError, OSError) as exc:
|
||||
# URLError covers network-level failures (DNS, refused, timeout).
|
||||
# TimeoutError and OSError cover socket-level timeouts.
|
||||
sys.stderr.write(f"::warning::could not fetch full statuses list for {sha[:8]}: {exc}\n")
|
||||
# Fall back to the statuses[] already in the combined response.
|
||||
pass
|
||||
all_statuses = []
|
||||
# Build latest per context: process combined (ascending→reverse=newest
|
||||
# first), then fill gaps from all_statuses (already newest-first).
|
||||
latest: dict[str, dict] = {}
|
||||
for status in reversed(sorted(combined_statuses, key=lambda s: s.get("id") or 0)):
|
||||
ctx = status.get("context")
|
||||
if isinstance(ctx, str) and ctx not in latest:
|
||||
latest[ctx] = status
|
||||
for status in all_statuses:
|
||||
ctx = status.get("context")
|
||||
if isinstance(ctx, str) and ctx not in latest:
|
||||
latest[ctx] = status
|
||||
combined["statuses"] = list(latest.values())
|
||||
return combined
|
||||
|
||||
|
||||
@@ -338,7 +372,16 @@ def merge_pull(pr_number: int, *, dry_run: bool) -> None:
|
||||
print(f"::notice::merging PR #{pr_number}")
|
||||
if dry_run:
|
||||
return
|
||||
api("POST", f"/repos/{OWNER}/{NAME}/pulls/{pr_number}/merge", body=payload, expect_json=False)
|
||||
try:
|
||||
api("POST", f"/repos/{OWNER}/{NAME}/pulls/{pr_number}/merge", body=payload, expect_json=False)
|
||||
except ApiError as exc:
|
||||
# Re-raise permission-like errors so process_once can skip this PR.
|
||||
# 403 = no push access, 404 = repo/pr not found, 405 = not allowed.
|
||||
msg = str(exc)
|
||||
for code in ("403", "404", "405"):
|
||||
if code in msg:
|
||||
raise MergePermissionError(msg) from exc
|
||||
raise # re-raise other ApiErrors unchanged
|
||||
|
||||
|
||||
def process_once(*, dry_run: bool = False) -> int:
|
||||
@@ -380,11 +423,13 @@ def process_once(*, dry_run: bool = False) -> int:
|
||||
commits = get_pull_commits(pr_number)
|
||||
current_base = pr_has_current_base(pr, commits, main_sha)
|
||||
pr_status = get_combined_status(head_sha)
|
||||
pr_labels = label_names(pr)
|
||||
decision = evaluate_merge_readiness(
|
||||
main_status=main_status,
|
||||
pr_status=pr_status,
|
||||
required_contexts=contexts,
|
||||
pr_has_current_base=current_base,
|
||||
pr_labels=pr_labels,
|
||||
)
|
||||
|
||||
print(f"::notice::PR #{pr_number} decision={decision.action}: {decision.reason}")
|
||||
@@ -407,7 +452,25 @@ def process_once(*, dry_run: bool = False) -> int:
|
||||
"deferring to next tick"
|
||||
)
|
||||
return 0
|
||||
merge_pull(pr_number, dry_run=dry_run)
|
||||
try:
|
||||
merge_pull(pr_number, dry_run=dry_run)
|
||||
except MergePermissionError as exc:
|
||||
# Permanent merge failure (HTTP 403/404/405). Post a comment so
|
||||
# maintainers know why, then return 0 so this tick is done.
|
||||
# The PR stays in the queue; future ticks can retry after the
|
||||
# permission issue is resolved.
|
||||
sys.stderr.write(f"::error::merge permission error for PR #{pr_number}: {exc}\n")
|
||||
post_comment(
|
||||
pr_number,
|
||||
(
|
||||
"merge-queue: merge failed with HTTP 405 'User not allowed to merge PR'. "
|
||||
"No available token has Can-merge permission on this repo. "
|
||||
"Fix: grant Can-merge to a token, or add a maintain/admin collaborator. "
|
||||
"Skipping to next queued PR on next tick."
|
||||
),
|
||||
dry_run=dry_run,
|
||||
)
|
||||
return 0
|
||||
return 0
|
||||
return 0
|
||||
|
||||
|
||||
@@ -118,3 +118,13 @@ def test_merge_decision_updates_stale_pr_before_merge():
|
||||
|
||||
assert decision.ready is False
|
||||
assert decision.action == "update"
|
||||
|
||||
|
||||
def test_MergePermissionError_inherits_from_ApiError():
|
||||
assert issubclass(mq.MergePermissionError, mq.ApiError)
|
||||
|
||||
|
||||
def test_MergePermissionError_message_preserved():
|
||||
exc = mq.MergePermissionError("POST /merge -> HTTP 405: User not allowed")
|
||||
assert "405" in str(exc)
|
||||
assert "User not allowed" in str(exc)
|
||||
|
||||
@@ -35,8 +35,8 @@ func insertMCPDelegationRow(ctx context.Context, db *sql.DB, workspaceID, target
|
||||
})
|
||||
_, err := db.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status)
|
||||
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'pending')
|
||||
`, workspaceID, workspaceID, targetID, "Delegating to "+targetID, string(taskJSON))
|
||||
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, $6)
|
||||
`, workspaceID, workspaceID, targetID, "Delegating to "+targetID, string(taskJSON), "pending")
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
)
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
@@ -191,3 +195,115 @@ func TestExtractA2AText_PriorityArtifactsOverMessage(t *testing.T) {
|
||||
t.Errorf("artifacts should take priority: got %q, want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// insertMCPDelegationRow tests
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
func TestInsertMCPDelegationRow_Success(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create sqlmock: %v", err)
|
||||
}
|
||||
prevDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
|
||||
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs("ws-src", "ws-src", "ws-tgt", "Delegating to ws-tgt", sqlmock.AnyArg(), "pending").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
err = insertMCPDelegationRow(context.Background(), mockDB, "ws-src", "ws-tgt", "del-123", "summarise the report")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInsertMCPDelegationRow_DBError(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create sqlmock: %v", err)
|
||||
}
|
||||
prevDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
|
||||
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs("ws-src", "ws-src", "ws-tgt", sqlmock.AnyArg(), sqlmock.AnyArg(), "pending").
|
||||
WillReturnError(context.DeadlineExceeded)
|
||||
|
||||
err = insertMCPDelegationRow(context.Background(), mockDB, "ws-src", "ws-tgt", "del-456", "check the logs")
|
||||
if err == nil {
|
||||
t.Error("expected error, got nil")
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// updateMCPDelegationStatus tests
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
func TestUpdateMCPDelegationStatus_Success(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create sqlmock: %v", err)
|
||||
}
|
||||
prevDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
|
||||
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("completed", "", "ws-src", "del-789").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Should not panic, should not error
|
||||
updateMCPDelegationStatus(context.Background(), mockDB, "ws-src", "del-789", "completed", "")
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateMCPDelegationStatus_WithErrorDetail(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create sqlmock: %v", err)
|
||||
}
|
||||
prevDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
|
||||
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("failed", "timeout", "ws-src", "del-000").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
updateMCPDelegationStatus(context.Background(), mockDB, "ws-src", "del-000", "failed", "timeout")
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateMCPDelegationStatus_DBError_LoggedNotReturned(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create sqlmock: %v", err)
|
||||
}
|
||||
prevDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
|
||||
|
||||
mock.ExpectExec(`UPDATE activity_logs`).
|
||||
WithArgs("failed", sqlmock.AnyArg(), "ws-src", "del-abc").
|
||||
WillReturnError(context.DeadlineExceeded)
|
||||
|
||||
// Function returns no value — error is logged, not propagated.
|
||||
// Verify it does not panic.
|
||||
updateMCPDelegationStatus(context.Background(), mockDB, "ws-src", "del-abc", "failed", "connection refused")
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user