Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 108b9a54d9 | |||
| 173a642f9e | |||
| 177c4ef18c | |||
| 99f3cf7c8f | |||
| aed164ed6f | |||
| d616381f81 | |||
| 42b867d764 | |||
| 3eb3609b0c | |||
| 0a9b66a3ed | |||
| 8046410eee | |||
| a1ba496926 | |||
| ce479e5ced | |||
| d293a32593 | |||
| 1254337f4f | |||
| b026179476 | |||
| 64bb7352ca | |||
| 1b6c28ebfa | |||
| 98bf294844 |
@@ -139,6 +139,14 @@ jobs:
|
||||
/tmp/smoke/bin/python "$GITHUB_WORKSPACE/scripts/wheel_smoke.py"
|
||||
|
||||
- name: Publish to PyPI
|
||||
# working-directory matches the preceding Build/Verify steps. Without
|
||||
# this, twine runs from the default workspace checkout dir where
|
||||
# `dist/` doesn't exist and fails with:
|
||||
# ERROR InvalidDistribution: Cannot find file (or expand pattern): 'dist/*'
|
||||
# Caught on the first-ever successful dispatch of this workflow
|
||||
# (run 5097, 2026-05-11 02:08Z) — every other step in the publish
|
||||
# job already had this working-directory; Publish was missing it.
|
||||
working-directory: ${{ runner.temp }}/runtime-build
|
||||
env:
|
||||
# PYPI_TOKEN: repository secret scoped to molecule-ai-workspace-runtime.
|
||||
# Set via: Settings → Actions → Variables and Secrets → New Secret.
|
||||
|
||||
@@ -365,7 +365,7 @@ jobs:
|
||||
cache: pip
|
||||
cache-dependency-path: workspace/requirements.txt
|
||||
- if: needs.changes.outputs.python == 'true'
|
||||
run: pip install -r requirements.txt pytest pytest-asyncio pytest-cov
|
||||
run: pip install -r requirements.txt pytest pytest-asyncio pytest-cov sqlalchemy>=2.0.0
|
||||
# Coverage flags + fail-under floor moved into workspace/pytest.ini
|
||||
# (issue #1817) so local `pytest` and CI use identical config.
|
||||
- if: needs.changes.outputs.python == 'true'
|
||||
|
||||
@@ -50,6 +50,7 @@ from pathlib import Path
|
||||
# without updating this set), which broke every workspace startup with
|
||||
# `ModuleNotFoundError: No module named 'transcript_auth'`.
|
||||
TOP_LEVEL_MODULES = {
|
||||
"_sanitize_a2a",
|
||||
"a2a_cli",
|
||||
"a2a_client",
|
||||
"a2a_executor",
|
||||
|
||||
@@ -51,6 +51,7 @@ from shared_runtime import (
|
||||
from executor_helpers import (
|
||||
collect_outbound_files,
|
||||
extract_attached_files,
|
||||
read_delegation_results,
|
||||
)
|
||||
from builtin_tools.telemetry import (
|
||||
A2A_TASK_ID,
|
||||
@@ -215,6 +216,17 @@ class LangGraphA2AExecutor(AgentExecutor):
|
||||
3. Message(final_text) — terminal event
|
||||
"""
|
||||
user_input = extract_message_text(context)
|
||||
# Inject delegation results from prior turns. Heartbeat writes
|
||||
# completed delegation rows to DELEGATION_RESULTS_FILE and sends
|
||||
# a self-message to wake the agent; this consumes the file and
|
||||
# surfaces the results as context so the agent can act on them
|
||||
# without needing an explicit check_task_status call.
|
||||
# Results are prepended so they are visible even when the
|
||||
# self-message text is overwritten by a subsequent user message.
|
||||
pending_results = read_delegation_results()
|
||||
if pending_results:
|
||||
logger.info("A2A execute: injecting %d delegation result(s)", pending_results.count("\n") + 1)
|
||||
user_input = f"[Delegation results available]\n{pending_results}\n\n{user_input}"
|
||||
# Pull attached files from A2A message parts (kind: "file") and
|
||||
# append a manifest to the prompt so the agent knows they exist.
|
||||
# LangGraph tools (filesystem, bash, skills) can then open the
|
||||
|
||||
@@ -194,7 +194,7 @@ def parse(data: Any) -> Variant:
|
||||
method,
|
||||
data.get("queue_id", "?"),
|
||||
)
|
||||
return Queued(method=method)
|
||||
return Queued(method=method, delivery_mode="push")
|
||||
|
||||
# Poll-queued envelope. Both keys must be present — the workspace
|
||||
# server sets them together; if only one is present the body is
|
||||
|
||||
@@ -1201,3 +1201,94 @@ async def test_terminal_error_routes_via_updater_failed():
|
||||
assert not eq._complete_calls, (
|
||||
"complete() should not fire when execute() raises"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Issue #354 — delegation results auto-resume gap
|
||||
# ---------------------------------------------------------------------------
|
||||
# heartbeat.py's _check_delegations writes completed delegation rows to
|
||||
# DELEGATION_RESULTS_FILE and sends a self-message to wake the agent.
|
||||
# read_delegation_results() in executor_helpers.py atomically reads+consumes
|
||||
# that file. The fix wires this consumer into _core_execute so the agent
|
||||
# receives delegation results as context in the next turn — closing the gap
|
||||
# where parallel delegate_task calls return after the SDK turn ends and the
|
||||
# agent has no way to discover the results.
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delegation_results_injected_into_user_input(monkeypatch):
|
||||
"""When delegation results exist, they are prepended to the user input
|
||||
passed to the agent so the agent can act on them without an explicit
|
||||
check_task_status call."""
|
||||
import a2a_executor
|
||||
from unittest.mock import patch
|
||||
|
||||
pending_results = (
|
||||
"- [completed] Delegation abc123: Checked 3 issues\n"
|
||||
" Response: 3 open, 0 critical\n"
|
||||
"- [failed] Delegation def456: Scan PR #352\n"
|
||||
" Error: peer workspace offline"
|
||||
)
|
||||
|
||||
# Patch read_delegation_results at the module level where a2a_executor
|
||||
# imported it so the _core_execute call picks it up.
|
||||
with patch.object(a2a_executor, "read_delegation_results", return_value=pending_results):
|
||||
agent = MagicMock()
|
||||
agent.astream_events = MagicMock(return_value=_stream(_text_chunk("Got it")))
|
||||
executor = LangGraphA2AExecutor(agent)
|
||||
|
||||
part = MagicMock()
|
||||
part.text = "What's the status?"
|
||||
context = _make_context([part], "ctx-deleg", task_id="task-deleg")
|
||||
eq = _make_event_queue()
|
||||
eq._complete_calls = []
|
||||
eq._failed_calls = []
|
||||
|
||||
await executor.execute(context, eq)
|
||||
|
||||
# Verify the agent received the injected context
|
||||
agent.astream_events.assert_called_once()
|
||||
call_args = agent.astream_events.call_args
|
||||
messages = call_args[0][0]["messages"]
|
||||
|
||||
# The last message should be a human turn with the injected context
|
||||
human_turn = messages[-1]
|
||||
assert human_turn[0] == "human"
|
||||
# Must contain the delegation results marker
|
||||
assert "[Delegation results available]" in human_turn[1]
|
||||
# Must contain the completed delegation
|
||||
assert "abc123" in human_turn[1]
|
||||
assert "3 open" in human_turn[1]
|
||||
# Must contain the failed delegation
|
||||
assert "def456" in human_turn[1]
|
||||
# Must contain the original user message
|
||||
assert "What's the status?" in human_turn[1]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_delegation_results_no_injection(monkeypatch):
|
||||
"""When no delegation results exist, user input is passed through unchanged."""
|
||||
import a2a_executor
|
||||
from unittest.mock import patch
|
||||
|
||||
with patch.object(a2a_executor, "read_delegation_results", return_value=""):
|
||||
agent = MagicMock()
|
||||
agent.astream_events = MagicMock(return_value=_stream(_text_chunk("ok")))
|
||||
executor = LangGraphA2AExecutor(agent)
|
||||
|
||||
part = MagicMock()
|
||||
part.text = "Hello"
|
||||
context = _make_context([part], "ctx-clean", task_id="task-clean")
|
||||
eq = _make_event_queue()
|
||||
eq._complete_calls = []
|
||||
eq._failed_calls = []
|
||||
|
||||
await executor.execute(context, eq)
|
||||
|
||||
agent.astream_events.assert_called_once()
|
||||
call_args = agent.astream_events.call_args
|
||||
messages = call_args[0][0]["messages"]
|
||||
human_turn = messages[-1]
|
||||
assert human_turn[0] == "human"
|
||||
# Must NOT contain the injection marker
|
||||
assert "[Delegation results available]" not in human_turn[1]
|
||||
assert human_turn[1] == "Hello"
|
||||
|
||||
@@ -105,6 +105,27 @@ _FIXTURES = {
|
||||
"status": "queued",
|
||||
"delivery_mode": "poll",
|
||||
},
|
||||
# Push-mode queue envelope: returned when a push-mode workspace is at
|
||||
# capacity. The platform queues the request and returns
|
||||
# {queued: true, message: "...", queue_id: "..."}. The ``delivery_mode``
|
||||
# field is not present in this envelope (distinguishes it from poll-mode).
|
||||
"push_queued_full": {
|
||||
"queued": True,
|
||||
"method": "message/send",
|
||||
"queue_id": "q-abc-123",
|
||||
},
|
||||
"push_queued_notify": {
|
||||
"queued": True,
|
||||
"method": "notify",
|
||||
},
|
||||
"push_queued_no_method": {
|
||||
"queued": True,
|
||||
},
|
||||
"push_queued_no_queue_id": {
|
||||
# queue_id is purely informational — parser must not raise on its absence.
|
||||
"queued": True,
|
||||
"method": "message/send",
|
||||
},
|
||||
"malformed_empty_dict": {},
|
||||
"malformed_unexpected_keys": {"foo": "bar", "baz": 42},
|
||||
"malformed_status_queued_no_delivery_mode": {
|
||||
@@ -159,6 +180,62 @@ class TestQueuedVariant:
|
||||
a2a_response.parse(_FIXTURES["poll_queued_full"])
|
||||
assert any("queued for poll-mode peer" in r.message for r in caplog.records)
|
||||
|
||||
# --- Push-mode queue (handleA2ADispatchError → EnqueueA2A → 202 {queued: true}) ---
|
||||
|
||||
def test_push_queued_full_returns_queued_with_delivery_mode_push(self):
|
||||
# The push-mode path must set delivery_mode="push", not silently default to "poll".
|
||||
# Callers that branch on v.delivery_mode will mis-route poll-mode responses
|
||||
# as push-mode (and vice versa) if this field is wrong.
|
||||
v = a2a_response.parse(_FIXTURES["push_queued_full"])
|
||||
assert isinstance(v, a2a_response.Queued)
|
||||
assert v.method == "message/send"
|
||||
assert v.delivery_mode == "push"
|
||||
|
||||
def test_push_queued_notify(self):
|
||||
v = a2a_response.parse(_FIXTURES["push_queued_notify"])
|
||||
assert isinstance(v, a2a_response.Queued)
|
||||
assert v.method == "notify"
|
||||
assert v.delivery_mode == "push"
|
||||
|
||||
def test_push_queued_missing_method_defaults_to_message_send(self):
|
||||
# Push-mode servers should always send method, but we handle absence gracefully.
|
||||
v = a2a_response.parse(_FIXTURES["push_queued_no_method"])
|
||||
assert isinstance(v, a2a_response.Queued)
|
||||
assert v.method == "message/send"
|
||||
assert v.delivery_mode == "push"
|
||||
|
||||
def test_push_queued_missing_queue_id_still_parsed(self):
|
||||
# queue_id is purely informational — its absence must not break parsing.
|
||||
v = a2a_response.parse(_FIXTURES["push_queued_no_queue_id"])
|
||||
assert isinstance(v, a2a_response.Queued)
|
||||
assert v.method == "message/send"
|
||||
assert v.delivery_mode == "push"
|
||||
|
||||
def test_push_queued_is_distinct_from_poll_queued(self):
|
||||
# Both paths return Queued, but from different wire envelopes.
|
||||
# Verify both parse correctly and are independent.
|
||||
push_v = a2a_response.parse(_FIXTURES["push_queued_full"])
|
||||
poll_v = a2a_response.parse(_FIXTURES["poll_queued_full"])
|
||||
assert isinstance(push_v, a2a_response.Queued)
|
||||
assert isinstance(poll_v, a2a_response.Queued)
|
||||
assert push_v.method == poll_v.method == "message/send"
|
||||
assert push_v.delivery_mode == "push"
|
||||
assert poll_v.delivery_mode == "poll"
|
||||
|
||||
def test_push_queued_logs_queue_id(self, caplog):
|
||||
with caplog.at_level(logging.INFO, logger="a2a_response"):
|
||||
a2a_response.parse(_FIXTURES["push_queued_full"])
|
||||
assert any("q-abc-123" in r.message for r in caplog.records)
|
||||
|
||||
def test_queued_string_yes_is_malformed_not_push_queued(self):
|
||||
# ``{"queued": "yes"}`` is not True, so it must NOT enter the push branch.
|
||||
v = a2a_response.parse({"queued": "yes"})
|
||||
assert isinstance(v, a2a_response.Malformed)
|
||||
|
||||
def test_queued_false_is_malformed(self):
|
||||
v = a2a_response.parse({"queued": False})
|
||||
assert isinstance(v, a2a_response.Malformed)
|
||||
|
||||
|
||||
class TestResultVariant:
|
||||
"""``parse()`` extracts the JSON-RPC ``result`` envelope into
|
||||
@@ -436,6 +513,10 @@ class TestRegressionGate:
|
||||
"poll_queued_full": a2a_response.Queued,
|
||||
"poll_queued_notify": a2a_response.Queued,
|
||||
"poll_queued_no_method": a2a_response.Queued,
|
||||
"push_queued_full": a2a_response.Queued,
|
||||
"push_queued_notify": a2a_response.Queued,
|
||||
"push_queued_no_method": a2a_response.Queued,
|
||||
"push_queued_no_queue_id": a2a_response.Queued,
|
||||
"malformed_empty_dict": a2a_response.Malformed,
|
||||
"malformed_unexpected_keys": a2a_response.Malformed,
|
||||
"malformed_status_queued_no_delivery_mode": a2a_response.Malformed,
|
||||
|
||||
Reference in New Issue
Block a user