Compare commits

...

9 Commits

Author SHA1 Message Date
molecule-ai[bot] d75b73e713 Merge pull request #2981 from Molecule-AI/staging
staging → main: auto-promote 9dd2988
2026-05-05 18:13:50 -07:00
Hongming Wang 7420631c32 Merge pull request #2983 from Molecule-AI/feat/auto-promote-stale-alarm-2975
feat(ops): hourly alarm for auto-promote PR stuck on REVIEW_REQUIRED (#2975)
2026-05-06 00:58:49 +00:00
Hongming Wang caf19e8980 feat(ops): hourly alarm for auto-promote PR stuck on REVIEW_REQUIRED (#2975)
Closes the silent-block failure mode that left 25 commits — including
the Memory v2 redesign and the reno-stars data-loss fix — wedged on
staging for 12+ hours behind a single missing review. The auto-promote
workflow opened the PR + armed auto-merge, but main's branch protection
required a human review and nobody noticed until a user reported
"still seeing old memory tab".

## Detection logic — `scripts/check-stale-promote-pr.sh`

Reads open PRs `base=main head=staging` and alarms on:
  - `mergeStateStatus == BLOCKED`
  - `reviewDecision == REVIEW_REQUIRED`
  - createdAt older than `STALE_HOURS` (default 4h)

Other BLOCKED reasons (DIRTY, BEHIND, failed checks) are NOT alarmed —
those are the author's signal-to-fix. This script targets the specific
"no human reviewed yet" wedge.

Output:
  - `::warning` per stale PR (visible in workflow summary + Actions UI)
  - PR comment (idempotent via marker-string detection; one alarm
    per PR, never re-spammed)
  - Exit code = count of stale PRs (capped at 125)

Logic in a script (not inline workflow YAML) so it's:
  - **Unit-testable** — tests/test-check-stale-promote-pr.sh exercises
    every branch with stubbed fixture JSON + frozen clock. 23 tests
    covering: empty list, single stale, just-under-threshold, wrong
    reviewDecision, wrong mergeStateStatus, mixed list (only matching
    PRs alarm), custom threshold via --stale-hours, exit-code-counts-
    matching-PRs, --help, unknown arg → 64, missing repo → 2.
  - **Operator-runnable ad-hoc** — `scripts/check-stale-promote-pr.sh`
    works from any shell with `gh` + `jq`.
  - **SSOT** — one detector, the workflow YAML is just schedule +
    invocation surface. Future sibling workflows that need the same
    check call the same script.

## Workflow — `.github/workflows/auto-promote-stale-alarm.yml`

Triggers:
  - cron `27 * * * *` (hourly, off-the-hour to dodge cron herd)
  - workflow_dispatch with `stale_hours` + `post_comment` overrides

Concurrency: `auto-promote-stale-alarm` group, cancel-in-progress=false
(idempotent script; no benefit to cancelling a running scan).

Permissions: `contents: read` + `pull-requests: write` (post comments).

Sparse checkout — only fetches `scripts/check-stale-promote-pr.sh`.
No node_modules, no go modules, no slow setup steps. Workflow runs
in <30s on a clean repo.

## Why "alarm + comment" not "auto-approve"

Considered options in issue #2975:
  1. Slack/email alert — picked.
  2. Bot-account auto-approve via molecule-ops — circumvents the
     human-review gate that branch protection encodes.
  3. Trusted-promote bypass via CODEOWNERS — needs Org Admin config
     change; out of scope for a workflow PR.

The comment-on-PR pattern picks (1) without external dependencies
(no Slack token, no email config). Subscribers get notified via
GitHub's existing PR notification delivery; the warning shows up in
the Actions feed.

## Why this won't false-positive on legitimate slow reviews

Threshold is 4h. Most legitimate gates clear in <1h, so 4× headroom
is plenty for slow CI. The comment is idempotent (one alarm per PR,
never re-posted) — adding noise stops at 1 comment regardless of
how long the PR sits.

## Test plan

- [x] `bash scripts/test-check-stale-promote-pr.sh` — 23/23 pass
- [x] `python3 -c 'yaml.safe_load(...)'` clean
- [x] `bash -n` clean on both scripts
- [ ] Live verification: dispatch the workflow once main has caught up,
      confirm it correctly reports zero stale PRs
2026-05-05 17:55:27 -07:00
Hongming Wang 6748035720 Merge pull request #2980 from Molecule-AI/test/canvas-resolve-attachment-href-2973
test(canvas/chat): cover platform-pending: branch + isPlatformAttachment (#2973)
2026-05-06 00:54:17 +00:00
Hongming Wang c74d0ecc94 test(canvas/chat): cover platform-pending: branch + isPlatformAttachment (#2973)
Closes #2973 — the followup test gap I flagged on PR #2968's review.

Pre-merge #2968 added the platform-pending: URI scheme branch to
resolveAttachmentHref + introduced the isPlatformAttachment SSOT
helper, but the existing uploads.test.ts only covered the older
workspace: / file:/// / absolute-path branches. The new branch shipped
on prod-impact (live console error on reno-stars) with manual post-
deploy verification; the regression gate was filed as a followup
(#2973) so a future canvas refactor can't silently re-break the
poll-mode chat-attachment download path.

Adds 15 new test cases across two existing describe blocks:

resolveAttachmentHref — platform-pending: scheme (poll-mode uploads):
- well-formed platform-pending:<wsid>/<fileid> resolves to the
  /pending-uploads/<file>/content endpoint
- uses the URI's wsid, NOT the chat workspace_id (cross-workspace
  forwarding case — pinning the explicit decision from #2968's
  commit message so a regression that flipped this would mis-route
  the download to the wrong workspace's pending-uploads store)
- defensive fallback to raw URI on missing slash, empty fileID,
  empty wsid (so a future "helpful" change can't synthesize a
  broken /pending-uploads// path)
- regression test against the EXACT production repro from #2968's
  body (reno-stars, 2026-05-05 console error)

isPlatformAttachment:
- positive cases for platform-pending: (well-formed and malformed),
  workspace:<allowed-root>, file:///<allowed-root>, absolute paths
  under allowed roots
- NEGATIVE cases for HTTPS/HTTP URLs to other origins (auth-leak
  class regression — a helper that always returned true would
  attach workspace tokens to third-party requests), non-allowlisted
  roots like /etc/passwd or /var/log/x, empty string, and
  unrecognised schemes (s3://, ftp://)

All 21 tests pass. The 6 pre-existing tests are unchanged. The 15
new tests are the regression gate that #2973 asked for.

Verification:
- pnpm exec vitest run src/components/tabs/chat/__tests__/uploads.test.ts
  → 21 passed
2026-05-05 17:51:28 -07:00
Hongming Wang 9dd29882e2 Merge pull request #2979 from Molecule-AI/fix/a2a-poll-mode-response-shape-2967
feat(a2a): SSOT typed-variant response parser + auto-fallback for poll-mode peers (#2967)
2026-05-06 00:41:43 +00:00
Hongming Wang e342d0c5a7 fix(build): register a2a_response in TOP_LEVEL_MODULES
The drift gate caught the new SSOT parser module — without registration
the wheel ships it un-rewritten and runtime imports fail. Same pattern
as inbox_uploads, a2a_tools_delegation, a2a_tools_rbac registrations.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 17:34:05 -07:00
Hongming Wang 166ad20cd7 test(e2e): Phase 3.5 — wheel parser classifies real server response (#2967)
Previously Phase 3 only checked the workspace-server's poll-mode short-circuit
emit shape ({"status":"queued","delivery_mode":"poll","method":"..."}); the
matching client-side classification was tested in isolation against fixture
dicts in test_a2a_response.py.

This phase closes the loop by piping the actual on-the-wire response from a
real workspace-server back through the wheel's a2a_response.parse() and
asserting it classifies as the Queued variant with the right method +
delivery_mode. A regression in EITHER the server emit shape OR the client
parser will now fail this E2E, eliminating the gap that allowed the original
"unexpected response shape" production bug to ship despite green unit tests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 17:31:45 -07:00
Hongming Wang 8b9f809966 fix(a2a): SSOT response parser — handle poll-mode queued envelope (#2967)
Introduce ``workspace/a2a_response.py`` as the single source of truth for
the wire shapes the workspace-server proxy can return at
``/workspaces/<id>/a2a``:

  * ``Result``    — JSON-RPC success
  * ``Error``     — JSON-RPC error or platform-level error (with
                    restart-in-progress metadata when present)
  * ``Queued``    — poll-mode short-circuit envelope: the platform
                    queued the message into the target's inbox, the
                    target will fetch via /activity poll
  * ``Malformed`` — anything the parser can't classify (logged at
                    WARNING so a future server change is loud)

``send_a2a_message`` (in ``a2a_client.py``) now dispatches via
``a2a_response.parse(data)`` instead of inline ``"result" in data`` /
``"error" in data`` sniffing. The Queued variant returns a new
``_A2A_QUEUED_PREFIX`` sentinel so callers can distinguish "delivered
async, no synchronous reply" from both success-with-text and failure.

reno-stars production data caught two intermittent failures that
both reduced to the same root cause:

  1. **File transfer announce silently failed** — when CEO Ryan PC
     (poll-mode external molecule-mcp) sent the harmi.zip
     announcement to Reno Stars Business Intelligent (also poll-mode
     external), ``send_a2a_message`` saw the platform's poll-queued
     envelope ``{"status":"queued","delivery_mode":"poll","method":"..."}``,
     didn't recognize it as the synthetic delivery-acknowledgement
     it is, and returned ``[A2A_ERROR] unexpected response shape``.
     The agent fell back to a chunk-shipping path; receiver did get
     the file but operator-facing logs showed a failure that didn't
     actually fail.

  2. **Duplicated agent comm** — same bug, inverted direction. d76
     delegated to 67d, send_a2a_message returned the unexpected-shape
     error, delegate_task wrapped it as DELEGATION FAILED, the calling
     agent retried with sharper wording, the recipient saw the same
     request twice and self-reported "二次请求 — 我先不执行".

External molecule-mcp standalone runtimes are inherently poll-mode
(they have no public URL), so every external↔external A2A pair was
hitting this on every send. The pre-fix client only handled JSON-RPC
``result``/``error`` keys and treated the queued envelope (which has
neither) as malformed. RFC #2339 PR 2 added the queued envelope on
the server side; the client never caught up.

When ``send_a2a_message`` returns the ``_A2A_QUEUED_PREFIX`` sentinel,
``tool_delegate_task`` now transparently falls back to
``_delegate_sync_via_polling`` (RFC #2829 PR-5's durable
``/delegate`` + ``/delegations`` polling path, which DOES work for
poll-mode peers because the platform's executeDelegation goroutine
writes to the inbox queue and the result row arrives when the target
picks it up + replies). The agent gets a real synchronous reply
instead of the empty queued sentinel.

  * ``test_a2a_response.py`` — 62 tests, **100% line coverage** on
    the parser (verified via ``coverage run --source=a2a_response``).
    Includes adversarial-input fuzzing across ~25 pathological
    payloads — parser must never raise.
  * ``test_a2a_client.py::TestSendA2AMessagePollMode`` — 4 tests for
    the new Queued/Error wiring in ``send_a2a_message``.
  * ``test_delegation_sync_via_polling.py::TestPollModeAutoFallback``
    — 3 tests for the auto-fallback in ``tool_delegate_task``,
    including negative cases (push-mode reply must NOT trigger
    fallback; genuine error must NOT silently retry).
  * **Verified all new tests FAIL on pre-fix source** by stashing
    a2a_client.py + a2a_tools_delegation.py and re-running — 5
    failures including ImportError for the missing
    ``_A2A_QUEUED_PREFIX``.

Per the operator-debuggability directive:

  * INFO at every Queued classification (expected variant; operator
    sees normal poll-mode-peer queueing in log stream).
  * INFO at the auto-fallback decision in ``tool_delegate_task``
    so a future operator can correlate "send returned queued →
    falling back to polling path" without reading the source.
  * WARNING at every Malformed classification (server contract
    drift; operator MUST see this immediately).
  * Existing transient-retry WARNING preserved.

  * Mirror Go-side typed model in workspace-server. The wire shape
    is documented in ``a2a_response.py``'s module docstring with
    file:line pointers to the canonical emitters; a future PR can
    introduce ``models/a2a_response.go`` without changing wire
    behavior. The fixture corpus in ``test_a2a_response.py`` is
    designed so a one-sided edit breaks CI.
  * ``send_message_to_user`` and ``chat_upload_receive`` use a
    different endpoint (``/notify``) and aren't affected by this
    bug; their parsing stays unchanged.

  * 135 tests pass across ``test_a2a_response.py`` +
    ``test_a2a_client.py`` + ``test_delegation_sync_via_polling.py``
    + ``test_a2a_tools_impl.py``.
  * ``coverage run --source=a2a_response -m pytest`` reports 100%
    line coverage with 0 missing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 17:21:28 -07:00
12 changed files with 1743 additions and 36 deletions
@@ -0,0 +1,83 @@
name: auto-promote-stale-alarm
# Hourly cron + on-demand alarm for the silent-block failure mode that
# motivated issue #2975:
# - The auto-promote-staging.yml workflow opened a PR + armed
# auto-merge, but main's branch protection requires a human review
# (reviewDecision=REVIEW_REQUIRED). The PR sat BLOCKED with no
# surface-up-the-stack for 12+ hours, holding 25 commits hostage
# including the Memory v2 redesign and a reno-stars data-loss fix.
#
# This workflow runs `scripts/check-stale-promote-pr.sh` against the
# repo's open auto-promote PRs (base=main head=staging). When a PR has
# been BLOCKED on REVIEW_REQUIRED for >4h, it:
# 1. Emits a workflow-level warning (visible in run summary + the
# Actions UI feed).
# 2. Posts a comment on the PR (idempotent — one alarm per PR).
#
# The detection logic lives in scripts/check-stale-promote-pr.sh so
# it's unit-testable with stubbed `gh` (see test-check-stale-promote-pr.sh).
# This file is the schedule + invocation surface only — SSOT for the
# detector itself.
on:
schedule:
# Hourly. Cheap (one `gh pr list` + jq), and 1h granularity is
# plenty for a 4h staleness threshold — operators see the alarm
# within at most 1h of crossing the threshold.
- cron: "27 * * * *" # at :27 to dodge the cron herd at :00
workflow_dispatch:
inputs:
stale_hours:
description: "Hours after which a BLOCKED+REVIEW_REQUIRED PR is stale (default 4)"
required: false
default: "4"
post_comment:
description: "Post a comment on stale PRs (default true)"
required: false
default: "true"
permissions:
contents: read
pull-requests: write # post comments on stale PRs
# Serialize so the on-demand and scheduled runs don't double-comment
# the same PR. cancel-in-progress=false because the script is idempotent
# (existing comment marker prevents dupes), but a scheduled run firing
# while a manual one runs would just re-list the same PR set.
concurrency:
group: auto-promote-stale-alarm
cancel-in-progress: false
jobs:
scan:
runs-on: ubuntu-latest
steps:
- name: Checkout (need scripts/ only)
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
sparse-checkout: |
scripts/check-stale-promote-pr.sh
sparse-checkout-cone-mode: false
- name: Run stale-PR detector
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_REPOSITORY: ${{ github.repository }}
STALE_HOURS: ${{ inputs.stale_hours || '4' }}
POST_COMMENT: ${{ inputs.post_comment || 'true' }}
run: |
# The script's exit code reflects the count of stale PRs.
# We don't want a stale finding to fail the workflow run —
# the warning + comment are the signal, the green/red is
# noise. So convert any non-zero exit to a workflow notice
# and exit 0.
set +e
bash scripts/check-stale-promote-pr.sh
rc=$?
set -e
if [ "$rc" -ne 0 ]; then
echo "::notice::Stale PR detector found $rc PR(s) needing attention. See warnings above + comments on the PRs."
fi
# Always succeed — operator-facing surface is the warning,
# not the workflow status.
exit 0
@@ -1,5 +1,5 @@
import { describe, it, expect } from "vitest";
import { resolveAttachmentHref } from "../uploads";
import { isPlatformAttachment, resolveAttachmentHref } from "../uploads";
describe("resolveAttachmentHref — URI scheme normalisation", () => {
const wsId = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee";
@@ -39,3 +39,128 @@ describe("resolveAttachmentHref — URI scheme normalisation", () => {
expect(resolveAttachmentHref(wsId, "s3://bucket/key")).toBe("s3://bucket/key");
});
});
// #2973 follow-up to #2968: cover the platform-pending: scheme branch
// (poll-mode chat uploads) + the isPlatformAttachment SSOT helper that
// the chip-download and markdown-link paths both consume.
//
// Pre-fix the platform-pending: URI fell through to the raw URI →
// browser saw an unhandled-protocol click → about:blank. The fix
// resolves it to the platform pending-uploads endpoint with auth
// headers attached.
describe("resolveAttachmentHref — platform-pending: scheme (poll-mode uploads)", () => {
// Use a chat workspace ID that DIFFERS from the one in the URI, so
// tests can verify which one the resolver uses. The forward-across-
// workspace case is real production behavior — files dragged into one
// workspace's chat can be referenced from another.
const chatWs = "chat-ws-aaaaaaaa";
const sourceWs = "source-ws-bbbbbbbb";
it("resolves a well-formed platform-pending: URI to /pending-uploads/<file>/content", () => {
const url = resolveAttachmentHref(
chatWs,
`platform-pending:${sourceWs}/file-12345`,
);
expect(url).toContain(`/workspaces/${sourceWs}/pending-uploads/file-12345/content`);
});
it("uses the URI's wsid, NOT the chat workspace_id (cross-workspace forwarding)", () => {
// The two ids differ — this is the case PR #2968's commit
// explicitly calls out. A regression that flipped this would
// silently mis-route the download to the WRONG workspace's
// pending-uploads store, returning 404 (or worse, leaking).
const url = resolveAttachmentHref(
chatWs,
`platform-pending:${sourceWs}/file-xyz`,
);
expect(url).toContain(`/workspaces/${sourceWs}/`);
expect(url).not.toContain(`/workspaces/${chatWs}/`);
});
it("falls back to raw URI when platform-pending: is missing the slash", () => {
// Defensive: a URI that drifted from the expected wsid/fileid shape
// returns raw rather than producing a broken /pending-uploads//
// path. Pinned to detect a regression where a future "helpful"
// change synthesizes empty wsid/fileID.
expect(resolveAttachmentHref(chatWs, "platform-pending:no-slash")).toBe(
"platform-pending:no-slash",
);
});
it("falls back to raw URI when platform-pending: has empty fileID", () => {
expect(resolveAttachmentHref(chatWs, "platform-pending:abc/")).toBe(
"platform-pending:abc/",
);
});
it("falls back to raw URI when platform-pending: has empty wsid", () => {
expect(resolveAttachmentHref(chatWs, "platform-pending:/file-xyz")).toBe(
"platform-pending:/file-xyz",
);
});
it("regression: exact production repro from #2968 (reno-stars)", () => {
// From the original PR #2968 body: the chat's markdown-link
// override fell through on this exact shape and the browser
// navigated to about:blank. Pin the post-fix output so a future
// refactor can't reintroduce the original bug.
const url = resolveAttachmentHref(
"chat-ws",
"platform-pending:d76977b1-uuid/bb0dcaf3-uuid",
);
expect(url).toContain("/workspaces/d76977b1-uuid/pending-uploads/bb0dcaf3-uuid/content");
expect(url).not.toContain("chat-ws");
});
});
describe("isPlatformAttachment", () => {
it("returns true for platform-pending: URIs", () => {
expect(isPlatformAttachment("platform-pending:abc/file")).toBe(true);
});
it("returns true even for malformed platform-pending: URIs", () => {
// The helper is a SHAPE check — caller routes through
// downloadChatFile and downloadChatFile handles the malformed case
// downstream. Pinning so a future helper that "validates" the
// wsid/fileID shape doesn't silently break the auth-attached
// download flow for in-flight URIs.
expect(isPlatformAttachment("platform-pending:no-slash")).toBe(true);
});
it("returns true for workspace:<allowed-root> URIs", () => {
expect(isPlatformAttachment("workspace:/configs/foo")).toBe(true);
expect(isPlatformAttachment("workspace:/workspace/x.pdf")).toBe(true);
});
it("returns true for file:///<allowed-root> URIs", () => {
expect(isPlatformAttachment("file:///workspace/x")).toBe(true);
});
it("returns true for absolute paths under allowed roots", () => {
expect(isPlatformAttachment("/home/user/x")).toBe(true);
expect(isPlatformAttachment("/configs/y")).toBe(true);
});
it("returns FALSE for bare HTTPS URLs to other origins", () => {
// Auth-leak class regression: a helper that always returned true
// would attach workspace tokens to third-party requests. Pin
// the negative case explicitly.
expect(isPlatformAttachment("https://example.com/file")).toBe(false);
expect(isPlatformAttachment("http://example.com/file")).toBe(false);
});
it("returns FALSE for non-allowlisted root paths", () => {
expect(isPlatformAttachment("/etc/passwd")).toBe(false);
expect(isPlatformAttachment("/var/log/x")).toBe(false);
expect(isPlatformAttachment("/tmp/x")).toBe(false);
});
it("returns FALSE for empty string", () => {
expect(isPlatformAttachment("")).toBe(false);
});
it("returns FALSE for unrecognised schemes", () => {
expect(isPlatformAttachment("s3://bucket/key")).toBe(false);
expect(isPlatformAttachment("ftp://server/file")).toBe(false);
});
});
+1
View File
@@ -54,6 +54,7 @@ TOP_LEVEL_MODULES = {
"a2a_client",
"a2a_executor",
"a2a_mcp_server",
"a2a_response",
"a2a_tools",
"a2a_tools_delegation",
"a2a_tools_inbox",
+216
View File
@@ -0,0 +1,216 @@
#!/usr/bin/env bash
# scripts/check-stale-promote-pr.sh
#
# Scan open auto-promote PRs (base=main head=staging) for the
# silent-block failure mode that motivated issue #2975:
# - PR sat for hours with mergeStateStatus=BLOCKED
# - reviewDecision=REVIEW_REQUIRED (auto-merge armed but waiting
# on a human approval that never comes)
#
# When found, emit:
# - GitHub Actions notice/warning lines (workflow summary surface)
# - Optionally post a comment on the PR (--comment)
#
# Exit code is the count of stale PRs found, capped at 125 so callers
# can detect "alarm fired" via `if ! check-stale-promote-pr.sh; then …`.
# Exit 0 = clean, exit ≥1 = at least N stale PRs need attention.
#
# Used by .github/workflows/auto-promote-stale-alarm.yml. Logic lives
# here (not inline in the workflow YAML) so we can:
# - Unit-test it with a stubbed `gh` (see test-check-stale-promote-pr.sh)
# - Run it ad-hoc by an operator: `scripts/check-stale-promote-pr.sh`
# - Reuse the same surface in any sibling workflow that needs the same
# check (SSOT — one detector, many callers).
#
# Requires: `gh` CLI, `jq`. `GH_TOKEN` env in the workflow context.
set -euo pipefail
# -----------------------------------------------------------------------------
# Inputs
# -----------------------------------------------------------------------------
# Threshold beyond which a BLOCKED+REVIEW_REQUIRED promote PR is "stale"
# enough to alarm. 4 hours is the floor: most legitimate gates clear
# inside an hour, so 4× headroom is plenty for slow CI without false-
# alarming. Override via env for tests + edge ops.
STALE_HOURS="${STALE_HOURS:-4}"
# Repo defaults to the current `gh` context. Tests pass --repo explicitly.
REPO="${GITHUB_REPOSITORY:-}"
# Whether to post a comment to the PR. Off by default to avoid noise on
# manual ad-hoc runs; the cron workflow turns it on.
POST_COMMENT="${POST_COMMENT:-false}"
# Where to read the open-PR JSON from. Empty = call `gh` live. Tests
# point this at a fixture file.
PR_FIXTURE="${PR_FIXTURE:-}"
# Where to read "now" from. Empty = real clock. Tests freeze time so
# the staleness math is deterministic.
NOW_OVERRIDE="${NOW_OVERRIDE:-}"
while [ $# -gt 0 ]; do
case "$1" in
--repo) REPO="$2"; shift 2 ;;
--comment) POST_COMMENT="true"; shift ;;
--no-comment) POST_COMMENT="false"; shift ;;
--fixture) PR_FIXTURE="$2"; shift 2 ;;
--stale-hours) STALE_HOURS="$2"; shift 2 ;;
-h|--help)
sed -n '1,/^set /p' "$0" | grep '^# ' | sed 's/^# //'
exit 0
;;
*) echo "unknown arg: $1" >&2; exit 64 ;;
esac
done
if [ -z "$REPO" ] && [ -z "$PR_FIXTURE" ]; then
echo "::error::REPO env (or GITHUB_REPOSITORY) required when no fixture given" >&2
exit 2
fi
# -----------------------------------------------------------------------------
# Clock helpers — split out so tests can freeze time
# -----------------------------------------------------------------------------
now_epoch() {
if [ -n "$NOW_OVERRIDE" ]; then
printf '%s\n' "$NOW_OVERRIDE"
else
date -u +%s
fi
}
# Parse RFC3339 timestamps the way GitHub emits them (e.g.
# "2026-05-05T23:15:00Z"). gnu-date uses -d, bsd-date uses -j -f. Cover
# both because the workflow runs on ubuntu-latest (gnu) but operators
# may run this script on macOS (bsd).
to_epoch() {
local ts="$1"
# gnu-date path first.
if date -u -d "$ts" +%s 2>/dev/null; then
return 0
fi
# bsd-date fallback — strip optional fractional seconds before %S.
local ts_clean="${ts%%.*}"
ts_clean="${ts_clean%Z}Z"
date -u -j -f "%Y-%m-%dT%H:%M:%SZ" "$ts_clean" +%s 2>/dev/null || {
echo "::error::cannot parse timestamp: $ts" >&2
return 1
}
}
# -----------------------------------------------------------------------------
# Fetch open auto-promote PRs
# -----------------------------------------------------------------------------
fetch_prs() {
if [ -n "$PR_FIXTURE" ]; then
cat "$PR_FIXTURE"
return 0
fi
gh pr list --repo "$REPO" \
--base main --head staging --state open \
--json number,title,createdAt,mergeStateStatus,reviewDecision,url
}
# -----------------------------------------------------------------------------
# Stale detection
# -----------------------------------------------------------------------------
# Read PR list from stdin, emit one TSV line per stale PR:
# <num>\t<age_hours>\t<url>\t<title>
# Caller decides what to do (warn, comment, escalate).
detect_stale() {
local now_ts
now_ts="$(now_epoch)"
local stale_seconds=$((STALE_HOURS * 3600))
jq -r '.[] | [.number, .createdAt, .mergeStateStatus, .reviewDecision, .url, .title] | @tsv' \
| while IFS=$'\t' read -r num created_at merge_state review_decision url title; do
# Only alarm on the specific failure mode: BLOCKED + REVIEW_REQUIRED.
# Other BLOCKED reasons (DIRTY, BEHIND, failed checks) are the
# author's signal-to-fix; this script targets the silent
# "no human reviewed yet" wedge specifically.
[ "$merge_state" = "BLOCKED" ] || continue
[ "$review_decision" = "REVIEW_REQUIRED" ] || continue
local created_ts
created_ts="$(to_epoch "$created_at")" || continue
local age=$((now_ts - created_ts))
if [ "$age" -ge "$stale_seconds" ]; then
local age_h=$((age / 3600))
printf '%s\t%d\t%s\t%s\n' "$num" "$age_h" "$url" "$title"
fi
done
}
# -----------------------------------------------------------------------------
# Reporting
# -----------------------------------------------------------------------------
# Comment body — kept short; the issue body has the full design.
comment_body() {
local age_h="$1"
cat <<EOF
⚠️ This auto-promote PR has been BLOCKED on \`REVIEW_REQUIRED\` for **${age_h}h**.
Auto-merge is armed, but main's branch protection requires 1 review and no human has approved. Until someone reviews, the staging→main promote chain is wedged and downstream consumers (canvas builds, tenant redeploys) won't see new code.
**Action**: a human reviewer on \`@Molecule-AI/maintainers\` should approve this PR (or mark it as not ready and close).
Detected by \`scripts/check-stale-promote-pr.sh\` per issue #2975.
EOF
}
post_comment() {
local pr_num="$1"
local age_h="$2"
if [ "$POST_COMMENT" != "true" ]; then
return 0
fi
# Idempotency: only one alarm comment per PR. Look for the marker
# string in existing comments before posting a new one.
local existing
existing="$(gh pr view "$pr_num" --repo "$REPO" --json comments \
--jq '.comments[] | select(.body | test("scripts/check-stale-promote-pr.sh per issue #2975")) | .databaseId' \
| head -n1)"
if [ -n "$existing" ]; then
echo "::notice::PR #$pr_num already has a stale-alarm comment ($existing) — not re-posting"
return 0
fi
comment_body "$age_h" | gh pr comment "$pr_num" --repo "$REPO" --body-file -
echo "::notice::Posted stale-alarm comment on PR #$pr_num (age=${age_h}h)"
}
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
stale_count=0
while IFS=$'\t' read -r num age_h url title; do
[ -n "$num" ] || continue
stale_count=$((stale_count + 1))
echo "::warning title=Stale auto-promote PR::PR #$num — BLOCKED on REVIEW_REQUIRED for ${age_h}h. $url"
{
echo "## ⚠️ Stale auto-promote PR detected"
echo
echo "- PR: #$num — \`$title\`"
echo "- Age: ${age_h}h"
echo "- State: BLOCKED on REVIEW_REQUIRED"
echo "- URL: $url"
echo
echo "Auto-merge is armed but waiting on a human review. See issue #2975."
} >> "${GITHUB_STEP_SUMMARY:-/dev/null}"
post_comment "$num" "$age_h"
done < <(fetch_prs | detect_stale)
if [ "$stale_count" -eq 0 ]; then
echo "::notice::No stale auto-promote PRs detected (threshold: ${STALE_HOURS}h)"
fi
# Cap exit code so we don't accidentally break shells that interpret
# >125 as signal-style. 1..N maps to "1..N stale PRs".
exit $(( stale_count > 125 ? 125 : stale_count ))
+257
View File
@@ -0,0 +1,257 @@
#!/usr/bin/env bash
# scripts/test-check-stale-promote-pr.sh
#
# Exhaustive bash unit tests for check-stale-promote-pr.sh.
# Goal: 100% branch coverage on the detector logic.
#
# Each case writes a fixture JSON, freezes the clock with NOW_OVERRIDE,
# runs the script with --fixture + --no-comment (so we don't try to
# actually call `gh pr comment`), and asserts on stdout/exit code.
#
# Run: bash scripts/test-check-stale-promote-pr.sh
# Expected: "All N tests passed" + exit 0.
set -euo pipefail
SCRIPT="$(cd "$(dirname "$0")" && pwd)/check-stale-promote-pr.sh"
TMP="$(mktemp -d)"
trap 'rm -rf "$TMP"' EXIT
PASS=0
FAIL=0
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
# Frozen "now" — 2026-05-06T05:00:00Z. Compute dynamically so the
# tests stay correct regardless of platform-specific date semantics
# (gnu vs bsd) and any author math errors on the epoch.
if FROZEN_NOW="$(date -u -d '2026-05-06T05:00:00Z' +%s 2>/dev/null)"; then
: # gnu-date worked
elif FROZEN_NOW="$(date -u -j -f '%Y-%m-%dT%H:%M:%SZ' '2026-05-06T05:00:00Z' +%s 2>/dev/null)"; then
: # bsd-date worked
else
echo "FATAL: cannot compute FROZEN_NOW on this platform" >&2
exit 1
fi
run_script() {
# Args: <fixture-file>
# Returns stdout + exit code via a known marker.
local fixture="$1"
shift
set +e
NOW_OVERRIDE="$FROZEN_NOW" \
POST_COMMENT="false" \
bash "$SCRIPT" --fixture "$fixture" "$@" 2>&1
local rc=$?
set -e
echo "EXIT_CODE=$rc"
}
assert_pass() {
local name="$1"
local got="$2"
local want_pattern="$3"
if printf '%s' "$got" | grep -qE "$want_pattern"; then
PASS=$((PASS + 1))
printf ' ✓ %s\n' "$name"
else
FAIL=$((FAIL + 1))
printf ' ✗ %s\n want pattern: %s\n got:\n%s\n' "$name" "$want_pattern" "$got"
fi
}
assert_no_match() {
local name="$1"
local got="$2"
local bad_pattern="$3"
if printf '%s' "$got" | grep -qE "$bad_pattern"; then
FAIL=$((FAIL + 1))
printf ' ✗ %s\n bad pattern matched: %s\n got:\n%s\n' "$name" "$bad_pattern" "$got"
else
PASS=$((PASS + 1))
printf ' ✓ %s\n' "$name"
fi
}
# ─────────────────────────────────────────────────────────────────────────────
# Test cases
# ─────────────────────────────────────────────────────────────────────────────
echo "1. Empty PR list — clean exit"
echo '[]' > "$TMP/empty.json"
got=$(run_script "$TMP/empty.json")
assert_pass "empty-no-warning" "$got" "No stale auto-promote PRs detected"
assert_pass "empty-exit-zero" "$got" "EXIT_CODE=0"
echo
echo "2. Single PR, BLOCKED+REVIEW_REQUIRED, 5h old — fires alarm"
cat > "$TMP/stale1.json" <<EOF
[{
"number": 2963,
"title": "staging → main",
"createdAt": "2026-05-06T00:00:00Z",
"mergeStateStatus": "BLOCKED",
"reviewDecision": "REVIEW_REQUIRED",
"url": "https://github.com/test/test/pull/2963"
}]
EOF
got=$(run_script "$TMP/stale1.json")
assert_pass "stale1-warning" "$got" "Stale auto-promote PR"
assert_pass "stale1-pr-number" "$got" "PR #2963"
assert_pass "stale1-age" "$got" "for 5h"
assert_pass "stale1-exit-1" "$got" "EXIT_CODE=1"
echo
echo "3. Same PR but only 3h old — under threshold, NO alarm"
cat > "$TMP/young.json" <<EOF
[{
"number": 100,
"title": "fresh promote",
"createdAt": "2026-05-06T02:00:00Z",
"mergeStateStatus": "BLOCKED",
"reviewDecision": "REVIEW_REQUIRED",
"url": "https://github.com/test/test/pull/100"
}]
EOF
got=$(run_script "$TMP/young.json")
assert_pass "young-no-alarm" "$got" "No stale auto-promote PRs"
assert_pass "young-exit-zero" "$got" "EXIT_CODE=0"
assert_no_match "young-no-warning" "$got" "Stale auto-promote PR"
echo
echo "4. PR is BLOCKED but for the wrong reason (DIRTY, not REVIEW_REQUIRED)"
cat > "$TMP/dirty.json" <<EOF
[{
"number": 200,
"title": "needs rebase",
"createdAt": "2026-05-06T00:00:00Z",
"mergeStateStatus": "BLOCKED",
"reviewDecision": "APPROVED",
"url": "https://github.com/test/test/pull/200"
}]
EOF
got=$(run_script "$TMP/dirty.json")
assert_pass "dirty-no-alarm" "$got" "No stale auto-promote PRs"
assert_pass "dirty-exit-zero" "$got" "EXIT_CODE=0"
echo
echo "5. PR is APPROVED but mergeStateStatus is CLEAN — NOT alarming"
cat > "$TMP/clean.json" <<EOF
[{
"number": 300,
"title": "all green",
"createdAt": "2026-05-06T00:00:00Z",
"mergeStateStatus": "CLEAN",
"reviewDecision": "APPROVED",
"url": "https://github.com/test/test/pull/300"
}]
EOF
got=$(run_script "$TMP/clean.json")
assert_pass "clean-no-alarm" "$got" "No stale auto-promote PRs"
echo
echo "6. Multiple PRs — only the BLOCKED+REVIEW_REQUIRED+old one alarms"
cat > "$TMP/mixed.json" <<EOF
[
{
"number": 100,
"title": "fresh",
"createdAt": "2026-05-06T04:00:00Z",
"mergeStateStatus": "BLOCKED",
"reviewDecision": "REVIEW_REQUIRED",
"url": "https://x/100"
},
{
"number": 200,
"title": "stale + alarming",
"createdAt": "2026-05-05T20:00:00Z",
"mergeStateStatus": "BLOCKED",
"reviewDecision": "REVIEW_REQUIRED",
"url": "https://x/200"
},
{
"number": 300,
"title": "approved + clean",
"createdAt": "2026-05-05T20:00:00Z",
"mergeStateStatus": "CLEAN",
"reviewDecision": "APPROVED",
"url": "https://x/300"
}
]
EOF
got=$(run_script "$TMP/mixed.json")
assert_pass "mixed-only-200" "$got" "PR #200"
assert_no_match "mixed-not-100" "$got" "PR #100"
assert_no_match "mixed-not-300" "$got" "PR #300"
assert_pass "mixed-exit-1" "$got" "EXIT_CODE=1"
echo
echo "7. Custom STALE_HOURS via --stale-hours overrides threshold"
got=$(run_script "$TMP/young.json" --stale-hours 1)
assert_pass "custom-threshold-fires" "$got" "PR #100"
assert_pass "custom-threshold-exit-1" "$got" "EXIT_CODE=1"
echo
echo "8. Two stale PRs — exit code reflects count"
cat > "$TMP/two-stale.json" <<EOF
[
{
"number": 200,
"title": "stale-A",
"createdAt": "2026-05-05T20:00:00Z",
"mergeStateStatus": "BLOCKED",
"reviewDecision": "REVIEW_REQUIRED",
"url": "https://x/200"
},
{
"number": 201,
"title": "stale-B",
"createdAt": "2026-05-05T19:00:00Z",
"mergeStateStatus": "BLOCKED",
"reviewDecision": "REVIEW_REQUIRED",
"url": "https://x/201"
}
]
EOF
got=$(run_script "$TMP/two-stale.json")
assert_pass "two-stale-exit-2" "$got" "EXIT_CODE=2"
echo
echo "9. Help text is shown for --help"
set +e
help_out=$(bash "$SCRIPT" --help 2>&1)
help_rc=$?
set -e
assert_pass "help-exits-zero" "EXIT_CODE=$help_rc" "EXIT_CODE=0"
assert_pass "help-mentions-issue" "$help_out" "issue #2975"
echo
echo "10. Unknown arg exits 64 (EX_USAGE)"
set +e
bad_out=$(bash "$SCRIPT" --bogus 2>&1)
bad_rc=$?
set -e
assert_pass "unknown-arg-rc" "EXIT_CODE=$bad_rc" "EXIT_CODE=64"
echo
echo "11. Missing repo + missing fixture exits 2"
set +e
out=$(REPO="" bash "$SCRIPT" 2>&1)
rc=$?
set -e
assert_pass "no-repo-exit-2" "EXIT_CODE=$rc" "EXIT_CODE=2"
# ─────────────────────────────────────────────────────────────────────────────
# Summary
# ─────────────────────────────────────────────────────────────────────────────
echo
echo "─────────────────────────────────────────────"
echo "Tests: $PASS passed, $FAIL failed"
if [ "$FAIL" -gt 0 ]; then
exit 1
fi
echo "All tests passed."
+37
View File
@@ -157,6 +157,43 @@ A2A_RESP=$(curl -s --max-time "$TIMEOUT" -X POST "$BASE/workspaces/$POLL_WS_ID/a
}')
check "poll-mode A2A returns queued status" '"status":"queued"' "$A2A_RESP"
# ---------- Phase 3.5: Python parser classifies queued envelope correctly ----------
# (#2967) — server emits the queued envelope, the wheel's a2a_response.parse()
# MUST classify it as the Queued variant, not Malformed. Pre-#2967 the bare
# message/send parser in a2a_client.py:587 misclassified this and returned
# "[A2A_ERROR] unexpected response shape", which broke external↔external A2A
# on poll-mode peers.
#
# This phase exercises the actual on-the-wire response from a real
# workspace-server (NOT a mocked dict) through the same module the production
# wheel ships, so a regression in either the server emit shape OR the client
# parser fails this E2E.
echo ""
echo "--- Phase 3.5: Python parser classifies real server response (#2967) ---"
# Pipe the queued response captured above through a2a_response.parse and
# assert the classification. WORKSPACE_ID is required at module import
# time but irrelevant to this parsing call (any UUID is fine).
PARSE_RESULT=$(WORKSPACE_ID="00000000-0000-0000-0000-000000000001" \
python3 -c "
import json, sys
sys.path.insert(0, '$(cd "$(dirname "$0")/../../workspace" && pwd)')
import a2a_response
data = json.loads(r'''$A2A_RESP''')
v = a2a_response.parse(data)
print(type(v).__name__)
if isinstance(v, a2a_response.Queued):
print(f'method={v.method} delivery_mode={v.delivery_mode}')
")
check_eq "Python parser classifies real server response as Queued" \
"Queued" "$(printf '%s' "$PARSE_RESULT" | head -n1)"
check "Queued variant captures method=message/send" \
"method=message/send" "$PARSE_RESULT"
check "Queued variant captures delivery_mode=poll" \
"delivery_mode=poll" "$PARSE_RESULT"
check "queued response echoes delivery_mode=poll" '"delivery_mode":"poll"' "$A2A_RESP"
check "queued response echoes the JSON-RPC method" '"method":"message/send"' "$A2A_RESP"
+75 -27
View File
@@ -17,6 +17,7 @@ from concurrent.futures import ThreadPoolExecutor
import httpx
import a2a_response
from platform_auth import auth_headers, self_source_headers
logger = logging.getLogger(__name__)
@@ -353,6 +354,20 @@ def _agent_card_url_for(peer_id: str) -> str:
# Used by delegate_task to distinguish real errors from normal response text.
_A2A_ERROR_PREFIX = "[A2A_ERROR] "
# Sentinel prefix for queued-for-poll-mode-peer outcomes (#2967).
# When the target workspace is registered as delivery_mode=poll (no
# public URL — typical for external molecule-mcp standalone runtimes),
# the platform's a2a_proxy.go:402 short-circuit returns a synthetic
# {"status":"queued","delivery_mode":"poll","method":"..."} envelope
# instead of dispatching over HTTP. The message IS delivered (written
# to the platform's inbox queue); there's just no synchronous reply
# to relay. Pre-#2967 the client treated this as "unexpected response
# shape" → caller saw DELEGATION FAILED → retried → recipient saw
# duplicates. The Queued prefix lets callers branch on this outcome
# explicitly: "delivered async, no synchronous reply expected" is
# different from both success-with-text and failure.
_A2A_QUEUED_PREFIX = "[A2A_QUEUED] "
# Workspace IDs are UUIDs everywhere we generate them (platform's
# workspaces.id column, /registry/discover/:id route param, etc.) but
# the agent-facing tool surface receives them as free-form strings via
@@ -564,17 +579,43 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str
},
)
data = resp.json()
if "result" in data:
parts = data["result"].get("parts", [])
text = parts[0].get("text", "") if parts else "(no response)"
# Tag child-reported errors so the caller can detect them reliably
# Dispatch via the SSOT response model (a2a_response.py).
# All shape detection lives in one place — the parser
# never raises and routes unknown shapes to Malformed
# so a future server-side change is loud, not silent.
variant = a2a_response.parse(data)
if isinstance(variant, a2a_response.Result):
# Match legacy semantics:
# parts non-empty + first part has no text → ""
# parts empty → "(no response)"
# Differentiation matters for callers that assert
# on the empty-string case (test_a2a_client).
if variant.parts:
text = variant.text
else:
text = "(no response)"
# Tag child-reported errors so the caller can
# detect them reliably — agent-side bug surfaces
# text like "Agent error: <traceback>" inside a
# JSON-RPC success envelope.
if text.startswith("Agent error:"):
return f"{_A2A_ERROR_PREFIX}{text}"
return text
elif "error" in data:
err = data["error"]
msg = (err.get("message") or "").strip()
code = err.get("code")
if isinstance(variant, a2a_response.Queued):
# Poll-mode peer — message accepted into the inbox
# queue, target agent will fetch via poll. NOT a
# failure. Return the queued sentinel so callers
# (delegate_task etc.) can render the outcome
# accurately instead of treating it as an error.
logger.info(
"send_a2a_message: queued for poll-mode peer (target=%s method=%s)",
target_url,
variant.method,
)
return f"{_A2A_QUEUED_PREFIX}target={safe_id} method={variant.method}"
if isinstance(variant, a2a_response.Error):
msg = variant.message
code = variant.code
if msg and code is not None:
detail = f"{msg} (code={code})"
elif msg:
@@ -583,26 +624,33 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str
detail = f"JSON-RPC error with no message (code={code})"
else:
detail = "JSON-RPC error with no message"
if variant.restarting:
# Surface platform-restart-in-progress
# explicitly — caller (UI / delegating agent)
# can render a softer "agent is restarting"
# message rather than a generic failure.
retry = (
f", retry_after={variant.retry_after}s"
if variant.retry_after is not None
else ""
)
detail = f"{detail} (restarting{retry})"
return f"{_A2A_ERROR_PREFIX}{detail} [target={target_url}]"
elif data.get("status") == "queued" and data.get("delivery_mode") == "poll":
# Workspace-server's poll-mode short-circuit envelope
# (workspace-server/internal/handlers/a2a_proxy.go ~line 402).
# The peer is poll-mode and has no URL to dispatch to, so
# the server queued the message for the peer's next inbox
# poll instead of forwarding it. Delivery is acknowledged
# but pending consumption.
#
# Pre-fix this fell through to the "unexpected response
# shape" error path → callers logged false failures, then
# delegate_task retried, and the peer received duplicate
# delegations. Issue #2967.
method = data.get("method") or "message/send"
logger.info(
"send_a2a_message: queued for poll-mode peer (method=%s, target=%s)",
method, target_url,
)
return f"queued for poll-mode peer (method={method})"
return f"{_A2A_ERROR_PREFIX}unexpected response shape (no result, no error): {str(data)[:200]} [target={target_url}]"
# Malformed — log loud + surface as error so the
# operator notices a server change. SSOT refactor
# subsumes the inline "queued" check that landed in
# the #2972 hotfix; that branch is now the typed
# Queued variant above.
logger.warning(
"send_a2a_message: malformed response (target=%s body=%.200s)",
target_url,
str(variant.raw),
)
return (
f"{_A2A_ERROR_PREFIX}unexpected response shape "
f"(no result, error, or queued envelope): "
f"{str(variant.raw)[:200]} [target={target_url}]"
)
except _TRANSIENT_HTTP_ERRORS as e:
last_exc = e
attempts_remaining = _DELEGATE_MAX_ATTEMPTS - (attempt + 1)
+246
View File
@@ -0,0 +1,246 @@
"""Single source of truth for A2A ``/workspaces/<id>/a2a`` response shapes.
The workspace-server proxy at
``workspace-server/internal/handlers/a2a_proxy.go`` (the canonical
emitter) returns one of the following shapes for a single A2A call:
* **JSON-RPC success** —
``{"jsonrpc": "2.0", "result": {...}, "id": "..."}``
The agent's reply, passed through unchanged.
* **JSON-RPC error** —
``{"jsonrpc": "2.0", "error": {"message": "...", "code": ...}, "id": "..."}``
The agent reported a structured error.
* **Poll-queued** (synthesized at proxy, RFC #2339 PR 2 — see
``a2a_proxy.go:402-406``) —
``{"status": "queued", "delivery_mode": "poll", "method": "..."}``
The target is a poll-mode workspace (no public URL); the message
was written to the platform's inbox queue. The target agent will
fetch it via ``GET /activity?since_id=`` polling. NOT a failure —
delivery succeeded, there's just no synchronous reply to relay.
* **Platform error** — ``{"error": "...", "restarting": true?, "retry_after": int?}``
HTTP-level failure synthesized by the proxy when the agent is
unreachable, the container is restarting, or some other infrastructure
failure happened. ``restarting=true`` flags the platform-initiated
container-restart path.
* **Malformed** — anything else. Surfaced explicitly so a future server
change is loud rather than silent.
The ``parse(data)`` function classifies a pre-decoded JSON body into a
typed variant. Callers ``match`` on the variant and never re-implement
shape detection — that's the SSOT discipline.
# SSOT contract
This file is the Python half. The Go server emits these shapes today
via inline ``gin.H{...}`` literals. A future PR can introduce a Go
mirror (e.g. ``workspace-server/internal/models/a2a_response.go``)
with a typed marshaller — until then, **any change to the wire shape
must be reflected here** and gated by ``test_a2a_response.py``'s
fixture corpus. The corpus exists specifically so a one-sided edit
breaks CI.
# Why a typed model (vs. dict-key sniffing at every site)
The pre-2967 client at ``a2a_client.py:567-587`` sniffed for ``result``
or ``error`` keys inline and treated everything else as malformed —
which silently broke poll-mode peers (the queued envelope has neither
key). Inline sniffing per call site multiplies the surface area where
a new shape gets misclassified. A single typed parser with an
explicit ``Malformed`` escape hatch makes shape additions a
one-line change here + a fixture entry in the test corpus, instead of
a hunt through every parsing site in the runtime.
"""
from __future__ import annotations
import dataclasses
import logging
from typing import Any, Optional, Union
logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class Result:
"""JSON-RPC success — agent's reply available synchronously.
``text`` is the convenience extraction from ``parts[0].text`` (the
A2A multipart shape). ``parts`` is the full list, available for
callers that need richer rendering (multiple parts, non-text parts).
``raw_result`` preserves the unparsed ``result`` field for any
caller that needs it (e.g. activity-row response_body audit).
"""
text: str
parts: list[dict[str, Any]] = dataclasses.field(default_factory=list)
raw_result: Optional[dict[str, Any]] = None
@dataclasses.dataclass(frozen=True)
class Error:
"""JSON-RPC error or platform-level error response.
``code`` is the JSON-RPC integer code when present, else None.
``restarting`` / ``retry_after`` are platform-restart-in-progress
metadata: when both are set, the caller knows the container is
being recycled and may surface a softer error to the user.
"""
message: str
code: Optional[int] = None
restarting: bool = False
retry_after: Optional[int] = None
@dataclasses.dataclass(frozen=True)
class Queued:
"""Platform poll-mode short-circuit — message accepted, peer will pick up async.
Returned when the target workspace is registered as
``delivery_mode=poll`` (no public URL — typical for external
standalone ``molecule-mcp`` runtimes). The message was written to
the platform's inbox queue; the target agent will fetch it via
``GET /activity?since_id=`` polling.
NOT a failure. Callers that expect a synchronous reply (the agent's
response text) won't get one here — they should either:
* Tolerate the absence of a reply (fire-and-forget semantics).
* Fall back to the durable ``/workspaces/:id/delegate`` +
``/delegations`` polling path (see ``a2a_tools_delegation``'s
``_delegate_sync_via_polling``), which writes the same A2A
request through the platform's executeDelegation goroutine
and lets the caller poll for the result row.
``method`` echoes the request method (``message/send``, ``notify``,
etc.) so callers can correlate.
"""
method: str
delivery_mode: str = "poll"
@dataclasses.dataclass(frozen=True)
class Malformed:
"""Server returned a body the parser can't classify.
Carries the raw decoded payload for diagnostic logging. Callers
typically render this as an error to the user (see
``send_a2a_message``) — but the Malformed variant is a separate
type so logging / metrics can distinguish it from genuine
JSON-RPC ``Error`` responses.
"""
raw: Any # whatever the server returned: dict / list / str / number / etc.
Variant = Union[Result, Error, Queued, Malformed]
# Field-name constants — the wire vocabulary. Single source of truth;
# the parser references these by name so a change here is a
# one-line edit instead of a hunt through string literals.
_KEY_RESULT = "result"
_KEY_ERROR = "error"
_KEY_STATUS = "status"
_KEY_DELIVERY_MODE = "delivery_mode"
_KEY_METHOD = "method"
_KEY_RESTARTING = "restarting"
_KEY_RETRY_AFTER = "retry_after"
_STATUS_QUEUED = "queued"
_DELIVERY_MODE_POLL = "poll"
def parse(data: Any) -> Variant:
"""Classify a pre-decoded ``/a2a`` JSON response into a typed variant.
Never raises. Every branch is total: any input that doesn't match a
known shape routes to ``Malformed`` so the caller can decide how
to surface it.
The order of checks matters:
1. Non-dict input → Malformed (server contract is dict-shaped).
2. Poll-queued envelope is checked BEFORE result/error because a
server bug that sets both ``status=queued`` and ``result``
should be loud, not silently treated as Result.
3. ``result`` → Result (the JSON-RPC success path).
4. ``error`` → Error (JSON-RPC error or platform error).
5. Anything else → Malformed.
"""
if not isinstance(data, dict):
logger.warning(
"a2a_response.parse: non-dict body — got %s",
type(data).__name__,
)
return Malformed(raw=data)
# Poll-queued envelope. Both keys must be present — the workspace
# server sets them together; if only one is present the body is
# ambiguous and we route to Malformed for visibility.
if (
data.get(_KEY_STATUS) == _STATUS_QUEUED
and data.get(_KEY_DELIVERY_MODE) == _DELIVERY_MODE_POLL
):
method_raw = data.get(_KEY_METHOD)
method = str(method_raw) if method_raw is not None else "unknown"
logger.info(
"a2a_response.parse: queued for poll-mode peer (method=%s)",
method,
)
return Queued(method=method)
# JSON-RPC success.
if _KEY_RESULT in data:
result = data[_KEY_RESULT]
if isinstance(result, dict):
parts_raw = result.get("parts")
parts = parts_raw if isinstance(parts_raw, list) else []
text = ""
if parts:
first = parts[0]
if isinstance(first, dict):
text_raw = first.get("text")
text = str(text_raw) if text_raw is not None else ""
return Result(text=text, parts=parts, raw_result=result)
# ``result`` present but not a dict — unusual but not an error;
# surface as a Result with the value rendered to text.
return Result(text=str(result), parts=[], raw_result=None)
# JSON-RPC error or platform error.
if _KEY_ERROR in data:
err_raw = data[_KEY_ERROR]
message = ""
code: Optional[int] = None
if isinstance(err_raw, dict):
msg_raw = err_raw.get("message")
if msg_raw is not None:
message = str(msg_raw).strip()
code_raw = err_raw.get("code")
if isinstance(code_raw, int):
code = code_raw
elif isinstance(err_raw, str):
message = err_raw.strip()
else:
message = str(err_raw)
restarting = bool(data.get(_KEY_RESTARTING, False))
retry_after_raw = data.get(_KEY_RETRY_AFTER)
retry_after = retry_after_raw if isinstance(retry_after_raw, int) else None
return Error(
message=message,
code=code,
restarting=restarting,
retry_after=retry_after,
)
logger.warning(
"a2a_response.parse: unrecognized shape — keys=%s",
sorted(data.keys()),
)
return Malformed(raw=data)
+27
View File
@@ -29,14 +29,18 @@ from __future__ import annotations
import hashlib
import json
import logging
import os
import httpx
logger = logging.getLogger(__name__)
from a2a_client import (
PLATFORM_URL,
WORKSPACE_ID,
_A2A_ERROR_PREFIX,
_A2A_QUEUED_PREFIX,
_peer_names,
_peer_to_source,
discover_peer,
@@ -245,6 +249,29 @@ async def tool_delegate_task(
# (the platform proxy) so the same code works for in-container and
# external (standalone molecule-mcp) callers.
result = await send_a2a_message(workspace_id, task, source_workspace_id=src)
# #2967: when the target is a poll-mode peer, the platform's
# a2a_proxy short-circuits and returns a queued envelope —
# send_a2a_message surfaces that as the _A2A_QUEUED_PREFIX
# sentinel. The synchronous proxy path can't deliver a reply
# because the target has no public URL; fall back to the
# durable /delegate + /delegations polling path which DOES
# work for poll-mode peers (the executeDelegation goroutine
# writes to the inbox queue and the result row arrives when
# the target picks it up + replies).
#
# This is what makes external-runtime-to-external-runtime
# A2A actually deliver synchronous replies — without the
# fallback the calling agent sees the queued sentinel as
# success-with-no-text and never gets the peer's response.
if result.startswith(_A2A_QUEUED_PREFIX):
logger.info(
"tool_delegate_task: target=%s is poll-mode; "
"falling back from message/send to /delegate-poll path",
workspace_id,
)
result = await _delegate_sync_via_polling(
workspace_id, task, src or WORKSPACE_ID,
)
# Detect delegation failures — wrap them clearly so the calling agent
# can decide to retry, use another peer, or handle the task itself.
+102 -8
View File
@@ -281,11 +281,11 @@ class TestSendA2AMessage:
to the 'unexpected response shape' error path → callers retried,
peer got duplicate delegations.
Pin: poll-queued envelope returns a clean success string that does
NOT start with _A2A_ERROR_PREFIX, so callers route it through the
normal-outcome path. Verified discriminating: assert_NOT_startswith
the error prefix would FAIL on the old code (which returned an
error-prefixed string) and PASSES on the new code.
Pin: poll-queued envelope returns a string tagged with the
_A2A_QUEUED_PREFIX sentinel (not _A2A_ERROR_PREFIX), so callers
can branch on the typed outcome without substring-sniffing.
Verified discriminating: pre-fix returned _A2A_ERROR_PREFIX so
the not-startswith assertion would FAIL on the old code.
"""
import a2a_client
@@ -301,12 +301,13 @@ class TestSendA2AMessage:
# Discriminating: pre-fix returned a string that startswith
# _A2A_ERROR_PREFIX, so this assertion would have FAILED on the
# old code. New code returns a queued-success string.
# old code. New code returns the queued-success sentinel.
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX), (
f"poll-queued envelope must not be tagged as A2A error; got: {result!r}"
)
assert "queued" in result.lower()
assert "poll" in result.lower()
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX), (
f"poll-queued envelope must use the queued sentinel; got: {result!r}"
)
# The method is included so a structured-log scraper can route by
# protocol verb if needed.
assert "message/send" in result
@@ -329,6 +330,7 @@ class TestSendA2AMessage:
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX)
assert "message/sendStream" in result
async def test_status_queued_without_poll_mode_still_falls_through(self):
@@ -462,6 +464,98 @@ def _make_seq_mock_client(post_side_effect):
return mock_client
class TestSendA2AMessagePollMode:
"""Pin the #2967 fix: send_a2a_message recognizes the platform's
poll-mode short-circuit envelope and returns a queued sentinel
instead of an "unexpected response shape" error.
Pre-#2967 the client treated the queued envelope as malformed,
causing the calling agent to retry, which delivered the same
message twice to the (poll-mode) recipient. The Queued sentinel
lets delegate_task fall back to the durable polling path
transparently — see test_delegation_sync_via_polling for the
fallback verification.
"""
async def test_poll_queued_envelope_returns_queued_sentinel(self):
# Workspace-server returns this shape (a2a_proxy.go:402-406)
# when the target workspace is registered as delivery_mode=poll
# (no public URL, typical for external molecule-mcp standalone
# runtimes).
import a2a_client
resp = _make_response(200, {
"status": "queued",
"delivery_mode": "poll",
"method": "message/send",
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
# Sentinel + structured payload so callers can branch on it.
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX)
# Critically: NOT the error sentinel. Pre-#2967 it was the error path.
assert not result.startswith(a2a_client._A2A_ERROR_PREFIX)
# Carries enough info for the caller to log meaningfully.
assert _TEST_PEER_ID in result
assert "message/send" in result
async def test_poll_queued_envelope_method_is_recorded(self):
import a2a_client
resp = _make_response(200, {
"status": "queued",
"delivery_mode": "poll",
"method": "notify",
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_QUEUED_PREFIX)
assert "notify" in result
async def test_status_queued_without_delivery_mode_is_unexpected_shape(self):
# Server bug: only ``status=queued`` set, ``delivery_mode``
# missing. Surface as the malformed branch (not Queued) — the
# SSOT parser treats this as Malformed because the documented
# contract requires both keys.
import a2a_client
resp = _make_response(200, {"status": "queued", "method": "message/send"})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "unexpected response shape" in result
# Must explicitly mention "or queued envelope" so an operator
# debugging this knows the parser HAS a Queued branch and the
# body just didn't match — not that the parser is missing the
# logic entirely (the pre-#2967 confusion).
assert "queued envelope" in result
async def test_platform_error_with_restart_metadata_surfaces_in_message(self):
# The platform error envelope: 503 with restart metadata.
# Surfaced as an error string that includes "restarting" so
# the caller / agent can render a softer error to the user.
import a2a_client
resp = _make_response(200, {
"error": "workspace agent unreachable — container restart triggered",
"restarting": True,
"retry_after": 15,
})
mock_client = _make_mock_client(post_resp=resp)
with patch("a2a_client.httpx.AsyncClient", return_value=mock_client):
result = await a2a_client.send_a2a_message(_TEST_PEER_ID, "task")
assert result.startswith(a2a_client._A2A_ERROR_PREFIX)
assert "restarting" in result
assert "retry_after=15" in result
class TestSendA2AMessageRetry:
"""Verify auto-retry on transient transport errors (RemoteProtocolError,
ConnectError, ReadTimeout, etc.) up to _DELEGATE_MAX_ATTEMPTS times.
+455
View File
@@ -0,0 +1,455 @@
"""Tests for the A2A response SSOT parser (workspace/a2a_response.py).
Branch coverage target: 100%. Each variant of ``parse()`` exercised in
isolation, plus adversarial-input fuzzing to assert the parser never
raises.
Pre-#2967, the response shape was sniffed inline at every call site
(``a2a_client.py:567-587`` had hard-coded ``"result" in data`` /
``"error" in data`` checks). The bare ``else`` returned an
"unexpected response shape" error — which silently broke poll-mode
peers because the workspace-server's poll-queued envelope has neither
``result`` nor ``error``. The SSOT parser has an explicit ``Queued``
variant for that path and routes anything truly unrecognized to
``Malformed`` so a future server-side change fails loudly.
The "this test FAILS on pre-fix source" guarantee is enforced by
running the legacy-shape sniffer alongside the new parser in
``test_legacy_sniffer_misclassified_queued`` — that test fails on
the pre-#2967 ``a2a_client.py`` shape because the legacy code
returns the unexpected-shape error path for the Queued envelope.
"""
from __future__ import annotations
import logging
from typing import Any
import pytest
import a2a_response
# ============== Fixture corpus — the canonical wire shapes ==============
# Every shape below mirrors a path the workspace-server's a2a_proxy.go
# can return. When you add a new server-side response shape, add a
# fixture entry here and a corresponding test method below.
_FIXTURES = {
"jsonrpc_success_with_text": {
"jsonrpc": "2.0",
"id": "abc-123",
"result": {
"parts": [{"kind": "text", "text": "hello world"}],
},
},
"jsonrpc_success_multipart": {
"jsonrpc": "2.0",
"id": "abc-123",
"result": {
"parts": [
{"kind": "text", "text": "first"},
{"kind": "text", "text": "second"},
],
},
},
"jsonrpc_success_no_parts": {
"jsonrpc": "2.0",
"id": "abc-123",
"result": {},
},
"jsonrpc_success_part_no_text_key": {
"jsonrpc": "2.0",
"id": "abc-123",
"result": {"parts": [{"kind": "text"}]},
},
"jsonrpc_error_with_message_and_code": {
"jsonrpc": "2.0",
"id": "abc-123",
"error": {"message": "rate limited", "code": -32003},
},
"jsonrpc_error_message_only": {
"jsonrpc": "2.0",
"id": "abc-123",
"error": {"message": "rate limited"},
},
"jsonrpc_error_code_only": {
"jsonrpc": "2.0",
"id": "abc-123",
"error": {"code": -32603},
},
"jsonrpc_error_string_form": {
"jsonrpc": "2.0",
"id": "abc-123",
"error": "string-shaped error",
},
"platform_error_with_restart": {
"error": "workspace agent unreachable — container restart triggered",
"restarting": True,
"retry_after": 15,
},
"platform_error_plain": {
"error": "workspace not found",
},
"poll_queued_full": {
"status": "queued",
"delivery_mode": "poll",
"method": "message/send",
},
"poll_queued_notify": {
"status": "queued",
"delivery_mode": "poll",
"method": "notify",
},
"poll_queued_no_method": {
"status": "queued",
"delivery_mode": "poll",
},
"malformed_empty_dict": {},
"malformed_unexpected_keys": {"foo": "bar", "baz": 42},
"malformed_status_queued_no_delivery_mode": {
# Server bug — status set but delivery_mode missing.
# Should be Malformed, not Queued, because the contract says both.
"status": "queued",
},
"malformed_delivery_mode_no_status": {
"delivery_mode": "poll",
},
}
# ============== Variant-by-variant coverage ==============
class TestQueuedVariant:
"""``parse()`` recognizes the workspace-server poll-mode short-circuit
envelope (a2a_proxy.go:402-406) and returns ``Queued``."""
def test_full_envelope_with_method_message_send(self):
v = a2a_response.parse(_FIXTURES["poll_queued_full"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "message/send"
assert v.delivery_mode == "poll"
def test_envelope_with_method_notify(self):
v = a2a_response.parse(_FIXTURES["poll_queued_notify"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "notify"
def test_envelope_missing_method_uses_unknown_sentinel(self):
# Envelope without ``method`` key — server contract should
# always set it, but the parser must not raise on absence.
v = a2a_response.parse(_FIXTURES["poll_queued_no_method"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "unknown"
def test_status_queued_alone_is_malformed_not_queued(self):
# ``status=queued`` without ``delivery_mode=poll`` does not match
# the documented envelope. Surface as Malformed for visibility.
v = a2a_response.parse(_FIXTURES["malformed_status_queued_no_delivery_mode"])
assert isinstance(v, a2a_response.Malformed)
def test_delivery_mode_alone_is_malformed_not_queued(self):
v = a2a_response.parse(_FIXTURES["malformed_delivery_mode_no_status"])
assert isinstance(v, a2a_response.Malformed)
def test_logs_info_on_queued(self, caplog):
# Comprehensive logging — operator should see queued events at INFO.
with caplog.at_level(logging.INFO, logger="a2a_response"):
a2a_response.parse(_FIXTURES["poll_queued_full"])
assert any("queued for poll-mode peer" in r.message for r in caplog.records)
class TestResultVariant:
"""``parse()`` extracts the JSON-RPC ``result`` envelope into
``Result(text, parts, raw_result)``."""
def test_simple_text_result(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_success_with_text"])
assert isinstance(v, a2a_response.Result)
assert v.text == "hello world"
assert len(v.parts) == 1
assert v.raw_result == {"parts": [{"kind": "text", "text": "hello world"}]}
def test_multipart_result_extracts_first_part_text(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_success_multipart"])
assert isinstance(v, a2a_response.Result)
assert v.text == "first"
assert len(v.parts) == 2
def test_result_with_no_parts(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_success_no_parts"])
assert isinstance(v, a2a_response.Result)
assert v.text == ""
assert v.parts == []
def test_part_without_text_key(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_success_part_no_text_key"])
assert isinstance(v, a2a_response.Result)
# No "text" key — extracted text is empty, parts list intact.
assert v.text == ""
assert len(v.parts) == 1
def test_result_non_dict_returns_text_form(self):
# Pathological but legal: ``result`` is a string instead of a dict.
v = a2a_response.parse({"result": "hello"})
assert isinstance(v, a2a_response.Result)
assert v.text == "hello"
assert v.parts == []
def test_result_takes_precedence_when_no_queued_envelope(self):
# Both ``result`` and ``error`` keys present — result wins
# because it's checked first after the Queued path.
v = a2a_response.parse({
"result": {"parts": [{"kind": "text", "text": "ok"}]},
"error": {"message": "should-be-ignored"},
})
assert isinstance(v, a2a_response.Result)
assert v.text == "ok"
def test_part_with_non_dict_first_entry(self):
# ``parts[0]`` is a string instead of a dict — parser tolerates it,
# text falls back to empty.
v = a2a_response.parse({"result": {"parts": ["bare-string"]}})
assert isinstance(v, a2a_response.Result)
assert v.text == ""
assert v.parts == ["bare-string"]
def test_part_text_value_none(self):
# ``parts[0].text`` is explicitly None — extracted as "".
v = a2a_response.parse({"result": {"parts": [{"text": None}]}})
assert isinstance(v, a2a_response.Result)
assert v.text == ""
def test_parts_not_a_list(self):
# Server bug: ``parts`` is a dict instead of a list. Parser falls
# back to empty parts rather than raising.
v = a2a_response.parse({"result": {"parts": {"oops": True}}})
assert isinstance(v, a2a_response.Result)
assert v.parts == []
assert v.text == ""
class TestErrorVariant:
"""``parse()`` extracts ``error`` envelopes into ``Error`` and
annotates platform-restart metadata when present."""
def test_message_and_code(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_error_with_message_and_code"])
assert isinstance(v, a2a_response.Error)
assert v.message == "rate limited"
assert v.code == -32003
assert v.restarting is False
assert v.retry_after is None
def test_message_only(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_error_message_only"])
assert isinstance(v, a2a_response.Error)
assert v.message == "rate limited"
assert v.code is None
def test_code_only(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_error_code_only"])
assert isinstance(v, a2a_response.Error)
assert v.message == ""
assert v.code == -32603
def test_error_string_form(self):
v = a2a_response.parse(_FIXTURES["jsonrpc_error_string_form"])
assert isinstance(v, a2a_response.Error)
assert v.message == "string-shaped error"
assert v.code is None
def test_error_non_dict_non_string(self):
v = a2a_response.parse({"error": 12345})
assert isinstance(v, a2a_response.Error)
assert v.message == "12345"
def test_platform_error_with_restart_metadata(self):
v = a2a_response.parse(_FIXTURES["platform_error_with_restart"])
assert isinstance(v, a2a_response.Error)
assert "workspace agent unreachable" in v.message
assert v.restarting is True
assert v.retry_after == 15
def test_platform_error_without_restart(self):
v = a2a_response.parse(_FIXTURES["platform_error_plain"])
assert isinstance(v, a2a_response.Error)
assert v.message == "workspace not found"
assert v.restarting is False
assert v.retry_after is None
def test_error_message_with_whitespace_stripped(self):
v = a2a_response.parse({"error": {"message": " trimmed "}})
assert isinstance(v, a2a_response.Error)
assert v.message == "trimmed"
def test_non_int_code_dropped(self):
v = a2a_response.parse({"error": {"message": "x", "code": "not-a-number"}})
assert isinstance(v, a2a_response.Error)
assert v.code is None
def test_non_int_retry_after_dropped(self):
v = a2a_response.parse({"error": "x", "restarting": True, "retry_after": "30s"})
assert isinstance(v, a2a_response.Error)
assert v.retry_after is None
class TestMalformedVariant:
"""``parse()`` returns ``Malformed`` for any shape it can't classify
and logs at WARNING so operators see new server response shapes."""
def test_empty_dict(self):
v = a2a_response.parse(_FIXTURES["malformed_empty_dict"])
assert isinstance(v, a2a_response.Malformed)
assert v.raw == {}
def test_unexpected_keys(self):
v = a2a_response.parse(_FIXTURES["malformed_unexpected_keys"])
assert isinstance(v, a2a_response.Malformed)
assert v.raw == {"foo": "bar", "baz": 42}
def test_non_dict_input_list(self):
v = a2a_response.parse([1, 2, 3])
assert isinstance(v, a2a_response.Malformed)
assert v.raw == [1, 2, 3]
def test_non_dict_input_string(self):
v = a2a_response.parse("plain string")
assert isinstance(v, a2a_response.Malformed)
assert v.raw == "plain string"
def test_non_dict_input_none(self):
v = a2a_response.parse(None)
assert isinstance(v, a2a_response.Malformed)
assert v.raw is None
def test_logs_warning_on_malformed(self, caplog):
with caplog.at_level(logging.WARNING, logger="a2a_response"):
a2a_response.parse(_FIXTURES["malformed_unexpected_keys"])
assert any(r.levelno == logging.WARNING for r in caplog.records)
def test_logs_warning_on_non_dict(self, caplog):
with caplog.at_level(logging.WARNING, logger="a2a_response"):
a2a_response.parse("not a dict")
assert any("non-dict" in r.message for r in caplog.records)
# ============== Robustness — parser never raises ==============
_ADVERSARIAL_INPUTS: list[Any] = [
None,
True,
False,
0,
-1,
3.14,
"",
"string",
[],
[1, 2, 3],
{},
{"random": "garbage"},
{"result": None},
{"result": [1, 2, 3]},
{"result": {"parts": None}},
{"result": {"parts": [None]}},
{"result": {"parts": [{"text": []}]}},
{"error": None},
{"error": []},
{"error": {"message": None, "code": None}},
{"error": {"message": ["nested", "list"]}},
{"status": None, "delivery_mode": None, "method": None},
{"status": "queued", "delivery_mode": "push", "method": "x"}, # wrong delivery_mode
{"status": "running", "delivery_mode": "poll"}, # wrong status
{"status": 42, "delivery_mode": "poll"}, # non-string status
# Deeply-nested junk
{"result": {"parts": [{"text": {"deeply": {"nested": "object"}}}]}},
# Bytes (not really JSON-decodable but parser shouldn't raise)
{"result": {"parts": [{"text": b"bytes" if False else "x"}]}},
]
class TestRobustness:
"""Parser must never raise on adversarial input — every branch is total.
These cases catch regressions where a future change adds a key
access that doesn't tolerate ``None`` / wrong-type values.
"""
@pytest.mark.parametrize("payload", _ADVERSARIAL_INPUTS)
def test_parse_never_raises(self, payload):
# Single contract: parse must return one of the four variants
# regardless of input. No exception classes propagated.
v = a2a_response.parse(payload)
assert isinstance(v, (a2a_response.Result, a2a_response.Error,
a2a_response.Queued, a2a_response.Malformed))
# ============== Regression gate — pre-#2967 misclassified queued ==============
class TestRegressionGate:
"""Pin the bug that prompted the SSOT abstraction.
Before #2967, ``a2a_client.py:567-587`` sniffed only ``"result" in
data`` and ``"error" in data`` — the poll-queued envelope (no
result key, no error key) hit the bare-else and returned the
"unexpected response shape" error string. This test simulates the
pre-fix code path and confirms the SSOT parser correctly
distinguishes Queued from Malformed.
"""
def test_legacy_sniffer_would_return_neither_branch(self):
# The pre-#2967 logic — provided here so the regression is
# reproducible from this file alone, no archaeology needed.
envelope = _FIXTURES["poll_queued_full"]
legacy_branch = (
"result" if "result" in envelope
else "error" if "error" in envelope
else "unexpected_shape"
)
# Legacy sniff: hits the malformed branch.
assert legacy_branch == "unexpected_shape"
def test_ssot_parser_classifies_correctly(self):
# New parser: classifies as Queued.
v = a2a_response.parse(_FIXTURES["poll_queued_full"])
assert isinstance(v, a2a_response.Queued)
assert v.method == "message/send"
def test_every_fixture_classifies_to_expected_variant(self):
# Defense in depth — pin the variant for every fixture so a
# future shape addition has to update the table here too.
expected: dict[str, type] = {
"jsonrpc_success_with_text": a2a_response.Result,
"jsonrpc_success_multipart": a2a_response.Result,
"jsonrpc_success_no_parts": a2a_response.Result,
"jsonrpc_success_part_no_text_key": a2a_response.Result,
"jsonrpc_error_with_message_and_code": a2a_response.Error,
"jsonrpc_error_message_only": a2a_response.Error,
"jsonrpc_error_code_only": a2a_response.Error,
"jsonrpc_error_string_form": a2a_response.Error,
"platform_error_with_restart": a2a_response.Error,
"platform_error_plain": a2a_response.Error,
"poll_queued_full": a2a_response.Queued,
"poll_queued_notify": a2a_response.Queued,
"poll_queued_no_method": a2a_response.Queued,
"malformed_empty_dict": a2a_response.Malformed,
"malformed_unexpected_keys": a2a_response.Malformed,
"malformed_status_queued_no_delivery_mode": a2a_response.Malformed,
"malformed_delivery_mode_no_status": a2a_response.Malformed,
}
# Every fixture must be enumerated — keeps this gate honest.
assert set(expected.keys()) == set(_FIXTURES.keys()), (
f"fixture/expected mismatch: "
f"missing-from-expected={set(_FIXTURES) - set(expected)} "
f"extra-in-expected={set(expected) - set(_FIXTURES)}"
)
for name, payload in _FIXTURES.items():
v = a2a_response.parse(payload)
assert isinstance(v, expected[name]), (
f"fixture {name!r} classified as {type(v).__name__}, "
f"expected {expected[name].__name__}"
)
@@ -93,6 +93,124 @@ class TestFlagOffLegacyPath:
poll_mock.assert_not_called()
# ---------------------------------------------------------------------------
# #2967: Auto-fallback to polling path when target is poll-mode
# ---------------------------------------------------------------------------
class TestPollModeAutoFallback:
"""Pin the #2967 behavior: when send_a2a_message returns the queued
sentinel (target is poll-mode), tool_delegate_task transparently
falls back to _delegate_sync_via_polling — which DOES work for
poll-mode peers (the executeDelegation goroutine writes to the
inbox queue and the result row arrives when the target replies).
Pre-#2967 behavior: queued sentinel was never returned (the parser
misclassified the envelope as malformed), and the calling agent
saw a DELEGATION FAILED / unexpected-response-shape error. This
test guards both against the parser regression (sentinel-emission)
and the fallback regression (sentinel-handling).
"""
async def test_queued_sentinel_triggers_polling_fallback(self, monkeypatch):
# Flag OFF — legacy send_a2a_message path. send returns the
# queued sentinel because the target is poll-mode. delegate_task
# must auto-route to _delegate_sync_via_polling so the agent
# eventually gets a real reply.
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
import a2a_tools
from a2a_client import _A2A_QUEUED_PREFIX
send_calls = []
poll_calls = []
async def fake_send(workspace_id, task, source_workspace_id=None):
send_calls.append((workspace_id, task, source_workspace_id))
return f"{_A2A_QUEUED_PREFIX}target={workspace_id} method=message/send"
async def fake_polling(workspace_id, task, src):
poll_calls.append((workspace_id, task, src))
return "real response from poll-mode peer"
async def fake_discover(*_a, **_kw):
return {"name": "poll-peer", "status": "online"}
async def fake_report_activity(*_a, **_kw):
return None
with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
patch("a2a_tools_delegation._delegate_sync_via_polling", side_effect=fake_polling), \
patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
patch("a2a_tools.report_activity", side_effect=fake_report_activity):
result = await a2a_tools.tool_delegate_task(
"ws-target", "task body", source_workspace_id="ws-self"
)
# send was tried first
assert len(send_calls) == 1
# …then fallback fired automatically
assert len(poll_calls) == 1
assert poll_calls[0] == ("ws-target", "task body", "ws-self")
# Caller sees the real reply, NOT the queued sentinel and NOT
# a DELEGATION FAILED string.
assert result == "real response from poll-mode peer"
async def test_non_queued_send_result_does_not_trigger_fallback(self, monkeypatch):
# Push-mode peer returns a normal text reply — fallback path
# MUST NOT fire (no extra round-trip cost).
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
import a2a_tools
async def fake_send(*_a, **_kw):
return "normal reply"
async def fake_discover(*_a, **_kw):
return {"name": "push-peer", "status": "online"}
async def fake_report_activity(*_a, **_kw):
return None
with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
patch("a2a_tools.report_activity", side_effect=fake_report_activity), \
patch("a2a_tools_delegation._delegate_sync_via_polling", new=AsyncMock()) as poll_mock:
result = await a2a_tools.tool_delegate_task(
"ws-target", "task", source_workspace_id="ws-self"
)
assert result == "normal reply"
poll_mock.assert_not_called()
async def test_error_send_result_does_not_trigger_fallback(self, monkeypatch):
# Genuine error (not queued) — must surface as DELEGATION FAILED,
# not silently retried via the polling path.
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
import a2a_tools
from a2a_client import _A2A_ERROR_PREFIX
async def fake_send(*_a, **_kw):
return f"{_A2A_ERROR_PREFIX}HTTP 500 [target=...]"
async def fake_discover(*_a, **_kw):
return {"name": "broken-peer", "status": "online"}
async def fake_report_activity(*_a, **_kw):
return None
with patch("a2a_tools_delegation.send_a2a_message", side_effect=fake_send), \
patch("a2a_tools_delegation.discover_peer", side_effect=fake_discover), \
patch("a2a_tools.report_activity", side_effect=fake_report_activity), \
patch("a2a_tools_delegation._delegate_sync_via_polling", new=AsyncMock()) as poll_mock:
result = await a2a_tools.tool_delegate_task(
"ws-target", "task", source_workspace_id="ws-self"
)
assert "DELEGATION FAILED" in result
poll_mock.assert_not_called()
# ---------------------------------------------------------------------------
# Flag-on: dispatch failures
# ---------------------------------------------------------------------------