Compare commits

..

2 Commits

Author SHA1 Message Date
fullstack-engineer fcb5086132 test(workspace): add 36-case coverage for shared_runtime helpers (closes #366)
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 16s
sop-tier-check / tier-check (pull_request) Failing after 15s
audit-force-merge / audit (pull_request) Has been skipped
Six helper functions previously had zero test coverage:

| Function | Cases |
|----------|-------|
| `_extract_part_text` | 10 — dict/object/root nesting, None, unexpected types |
| `extract_message_text` | 6 — parts list, context objects, whitespace |
| `format_conversation_history` | 4 — user/agent roles, interleaved history |
| `build_task_text` | 4 — history prepending, empty history |
| `append_peer_guidance` | 6 — base text, None/empty/whitespace, empty peers |
| `brief_task` | 6 — limit variants including 0/1 edge cases |

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 16:26:03 +00:00
release-manager 9ce20958a5 fix(a2a): restore OFFSEC-003 trust-boundary wrap on tool_delegate_task return (closes #491) (#492)
Secret scan / Scan diff for credential-shaped strings (push) Successful in 3s
Co-authored-by: Molecule AI Release Manager <release-manager@agents.moleculesai.app>
Co-committed-by: Molecule AI Release Manager <release-manager@agents.moleculesai.app>
2026-05-11 15:01:18 +00:00
4 changed files with 302 additions and 423 deletions
+2 -1
View File
@@ -322,7 +322,8 @@ async def tool_delegate_task(
f"You should either: (1) try a different peer, (2) handle this task yourself, "
f"or (3) inform the user that {peer_name} is unavailable and provide your best answer."
)
return result
# OFFSEC-003: wrap peer result in trust boundary before returning to agent context
return sanitize_a2a_result(result)
async def tool_delegate_task_async(
-2
View File
@@ -27,8 +27,6 @@ async def list_peers() -> list[dict]:
async def delegate_task(workspace_id: str, task: str) -> str:
"""Send a task to a peer workspace via A2A and return the response text."""
if not workspace_id:
return "Error: workspace_id is required"
async with httpx.AsyncClient(timeout=120.0) as client:
# Discover target URL
try:
-420
View File
@@ -1,420 +0,0 @@
"""Test coverage for ``builtin_tools.a2a_tools`` and ``send_message_wrapper``.
Issue #367: 21 new test cases targeting previously-uncovered branches.
Uses ``respx`` for HTTP mocking — httpx.AsyncClient instantiates the client
before the mock can intervene (it resolves the host during __init__), so
patching at the class level is unreliable. respx intercepts at the transport
layer, which is safe regardless of how httpx initializes.
"""
from __future__ import annotations
import asyncio
import html
import os
import sys
from types import ModuleType
import pytest
import respx
# ---------------------------------------------------------------------------
# Session-scoped fixture — reload httpx once at test-session start
# ---------------------------------------------------------------------------
_httpx_reloaded = False
def _reload_httpx_and_real_module():
"""Force-reload httpx so builtin_tools.a2a_tools imports the real client.
conftest.py mocks builtin_tools.a2a_tools, which prevents Python from
importing the real module from disk (sys.modules takes precedence). This
helper removes both sys.modules entries and triggers a fresh import of the
real httpx + builtin_tools.a2a_tools chain.
"""
global _httpx_reloaded
if _httpx_reloaded:
return
_httpx_reloaded = True
# conftest.py set builtin_tools.__path__ = [] — restore so Python can
# find builtin_tools/a2a_tools.py on disk.
real_builtin = sys.modules.get("builtin_tools")
if real_builtin is not None:
builtin_dir = os.path.dirname(
os.path.dirname(os.path.abspath(__file__))
)
real_builtin.__path__ = [os.path.join(builtin_dir, "builtin_tools")]
# Remove the conftest.py mock so the real module loads
sys.modules.pop("builtin_tools.a2a_tools", None)
# Session-scoped: reload httpx once, not per-test. Per-test fixture only
# sets env vars (env vars can be set per-test without disturbing httpx).
@pytest.fixture(scope="session", autouse=True)
def _reload_httpx_session():
_reload_httpx_and_real_module()
yield
@pytest.fixture(autouse=True)
def _require_env(monkeypatch):
"""Per-test: set required env vars. httpx is already reloaded at session start."""
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000001")
monkeypatch.setenv("PLATFORM_URL", "http://test.invalid")
yield
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _run(coro):
return asyncio.get_event_loop().run_until_complete(coro)
# =============================================================================
# builtin_tools/a2a_tools — list_peers
# =============================================================================
class TestListPeers:
"""Coverage for builtin_tools/a2a_tools.list_peers()."""
@respx.mock
def test_returns_peers_on_200(self):
"""Successful GET returns the peer list."""
from builtin_tools.a2a_tools import list_peers
peers = [
{"id": "ws-1", "name": "Alpha", "role": "sre", "status": "online"},
{"id": "ws-2", "name": "Beta", "role": "dev", "status": "busy"},
]
route = respx.get(
"http://test.invalid/registry/00000000-0000-0000-0000-000000000001/peers"
).respond(200, json=peers)
result = _run(list_peers())
assert result == peers
assert route.called
@respx.mock
def test_returns_empty_list_on_non_200(self):
"""list_peers swallows all non-200 responses gracefully."""
from builtin_tools.a2a_tools import list_peers
respx.get(
"http://test.invalid/registry/00000000-0000-0000-0000-000000000001/peers"
).respond(500)
result = _run(list_peers())
assert result == []
@respx.mock
def test_returns_empty_list_on_exception(self):
"""Network errors must not propagate — list_peers returns []. """
from builtin_tools.a2a_tools import list_peers
# Route that raises so httpx propagates an exception
respx.get(
"http://test.invalid/registry/00000000-0000-0000-0000-000000000001/peers"
).mock(side_effect=RuntimeError("dns failure"))
result = _run(list_peers())
assert result == []
# =============================================================================
# builtin_tools/a2a_tools — delegate_task
# =============================================================================
_DISCOVER_ROUTE = "http://test.invalid/registry/discover/ws-target"
class TestDelegateTask:
"""Coverage for builtin_tools/a2a_tools.delegate_task(workspace_id, task)."""
def test_empty_workspace_id_returns_error(self):
"""Empty workspace_id is validated before any network call."""
from builtin_tools.a2a_tools import delegate_task
out = _run(delegate_task("", "do it"))
assert "Error" in out
assert "workspace_id" in out.lower()
@respx.mock
def test_discover_returns_non_200(self):
"""Discovery 4xx/5xx → error message with status code."""
from builtin_tools.a2a_tools import delegate_task
respx.get(_DISCOVER_ROUTE).respond(404)
out = _run(delegate_task("ws-target", "do it"))
assert "Error" in out
assert "404" in out
@respx.mock
def test_discover_returns_200_with_empty_url(self):
"""Discovery 200 but no url field → actionable error."""
from builtin_tools.a2a_tools import delegate_task
respx.get(_DISCOVER_ROUTE).respond(200, json={"name": "orphan"})
out = _run(delegate_task("ws-target", "do it"))
assert "Error" in out
assert "no URL" in out
@respx.mock
def test_a2a_post_returns_500(self):
"""A2A send 5xx → Error: sending A2A message."""
from builtin_tools.a2a_tools import delegate_task
respx.get(_DISCOVER_ROUTE).respond(
200, json={"url": "http://peer.invalid/a2a"}
)
respx.post("http://peer.invalid/a2a").respond(500)
out = _run(delegate_task("ws-target", "do it"))
assert "Error" in out
assert "sending A2A message" in out
@respx.mock
def test_result_parts_empty_dict(self):
"""Regression #279: {"parts": []} → str(result), not "(no text)"."""
from builtin_tools.a2a_tools import delegate_task
respx.get(_DISCOVER_ROUTE).respond(
200, json={"url": "http://peer.invalid/a2a"}
)
respx.post("http://peer.invalid/a2a").respond(
200, json={"result": {"parts": []}}
)
out = _run(delegate_task("ws-target", "do it"))
# Must return str(result), not "(no text)"
assert "parts" in out
assert "(no text)" not in out
@respx.mock
def test_result_is_plain_string(self):
"""A bare string result returns as-is."""
from builtin_tools.a2a_tools import delegate_task
respx.get(_DISCOVER_ROUTE).respond(
200, json={"url": "http://peer.invalid/a2a"}
)
respx.post("http://peer.invalid/a2a").respond(
200, json={"result": "just a plain string"}
)
out = _run(delegate_task("ws-target", "do it"))
assert out == "just a plain string"
@respx.mock
def test_result_is_number(self):
"""Non-dict, non-string result → falls through to "(no text)"."""
from builtin_tools.a2a_tools import delegate_task
respx.get(_DISCOVER_ROUTE).respond(
200, json={"url": "http://peer.invalid/a2a"}
)
respx.post("http://peer.invalid/a2a").respond(
200, json={"result": 12345}
)
out = _run(delegate_task("ws-target", "do it"))
assert out == "(no text)"
@respx.mock
def test_result_parts_non_dict_element(self):
"""parts[0] is not a dict → falls through to "(no text)".
The code checks if parts[0] is a dict; since 123 is an int, it hits
the else-branch and returns "(no text)".
"""
from builtin_tools.a2a_tools import delegate_task
respx.get(_DISCOVER_ROUTE).respond(
200, json={"url": "http://peer.invalid/a2a"}
)
respx.post("http://peer.invalid/a2a").respond(
200, json={"result": {"parts": [123, "also a string"]}}
)
out = _run(delegate_task("ws-target", "do it"))
assert out == "(no text)"
@respx.mock
def test_error_dict_form(self):
"""{"error": {"message": "..."}} → "Error: ..."."""
from builtin_tools.a2a_tools import delegate_task
respx.get(_DISCOVER_ROUTE).respond(
200, json={"url": "http://peer.invalid/a2a"}
)
respx.post("http://peer.invalid/a2a").respond(
200, json={"error": {"message": "peer overloaded", "code": 429}}
)
out = _run(delegate_task("ws-target", "do it"))
assert out == "Error: peer overloaded"
@respx.mock
def test_error_string_form(self):
"""{"error": "string error"} → "Error: string error"."""
from builtin_tools.a2a_tools import delegate_task
respx.get(_DISCOVER_ROUTE).respond(
200, json={"url": "http://peer.invalid/a2a"}
)
respx.post("http://peer.invalid/a2a").respond(
200, json={"error": "workspace offline"}
)
out = _run(delegate_task("ws-target", "do it"))
assert out == "Error: workspace offline"
@respx.mock
def test_error_null(self):
"""{"error": null} → "Error: None" (edge case — str(null) in message)."""
from builtin_tools.a2a_tools import delegate_task
respx.get(_DISCOVER_ROUTE).respond(
200, json={"url": "http://peer.invalid/a2a"}
)
respx.post("http://peer.invalid/a2a").respond(
200, json={"error": None}
)
out = _run(delegate_task("ws-target", "do it"))
assert "Error" in out
@respx.mock
def test_a2a_post_raises_exception(self):
"""Network error during A2A POST → Error: sending A2A message: ..."""
from builtin_tools.a2a_tools import delegate_task
respx.get(_DISCOVER_ROUTE).respond(
200, json={"url": "http://peer.invalid/a2a"}
)
respx.post("http://peer.invalid/a2a").mock(
side_effect=ConnectionError("connection refused")
)
out = _run(delegate_task("ws-target", "do it"))
assert "Error" in out
assert "connection refused" in out
# =============================================================================
# builtin_tools/a2a_tools — get_peers_summary
# =============================================================================
_PEERS_ROUTE = (
"http://test.invalid/registry/00000000-0000-0000-0000-000000000001/peers"
)
class TestGetPeersSummary:
"""Coverage for builtin_tools/a2a_tools.get_peers_summary()."""
@respx.mock
def test_empty_peers_returns_no_peers_available(self):
from builtin_tools.a2a_tools import get_peers_summary
respx.get(_PEERS_ROUTE).respond(200, json=[])
out = _run(get_peers_summary())
assert "No peers" in out
@respx.mock
def test_peer_missing_fields(self):
"""Peers with missing name/id/role/status must not KeyError/TypeError."""
from builtin_tools.a2a_tools import get_peers_summary
# Peer has only 'id'; name, role, status are absent
respx.get(_PEERS_ROUTE).respond(200, json=[{"id": "ws-x"}])
out = _run(get_peers_summary())
assert "ws-x" in out
assert isinstance(out, str)
@respx.mock
def test_healthy_peer_roundtrip(self):
"""Sanity: normal peer dicts produce a formatted list."""
from builtin_tools.a2a_tools import get_peers_summary
peers = [
{"id": "ws-alpha", "name": "Alpha", "role": "sre", "status": "online"},
]
respx.get(_PEERS_ROUTE).respond(200, json=peers)
out = _run(get_peers_summary())
assert "Alpha" in out
assert "ws-alpha" in out
assert "sre" in out
assert "online" in out
# =============================================================================
# send_message_wrapper — safe_send_message
# =============================================================================
from unittest.mock import patch
from adapters.smolagents.send_message_wrapper import safe_send_message
class TestSafeSendMessage:
"""Coverage for adapters.smolagents.send_message_wrapper.safe_send_message()."""
def test_non_string_input_converted(self):
"""Non-str text is str()-converted before escaping."""
delivered = []
safe_send_message(42, send_fn=lambda s: delivered.append(s))
assert delivered == ["[smolagents] 42"]
assert isinstance(delivered[0], str)
def test_html_entities_escaped(self):
"""< > ' are escaped so rendered UIs cannot be injected.
The payload <script>alert('xss')</script> has no literal '&', so &amp;
does not appear. The escape output is: &lt;script&gt;alert(&#x27;xss&#x27;)&lt;/script&gt;
"""
delivered = []
safe_send_message(
"<script>alert('xss')</script>",
send_fn=lambda s: delivered.append(s),
)
assert "&lt;" in delivered[0]
assert "&gt;" in delivered[0]
assert "&#x27;" in delivered[0]
assert "&lt;script&gt;" in delivered[0]
# The angle brackets and quotes must NOT appear unescaped
assert "<script>" not in delivered[0]
assert "alert('" not in delivered[0]
def test_truncation_at_max_len(self):
"""Text > 2000 chars is truncated; caller is warned."""
delivered = []
with patch(
"adapters.smolagents.send_message_wrapper.logger"
) as mock_logger:
long_text = "A" * 2500
safe_send_message(long_text, send_fn=lambda s: delivered.append(s))
assert len(delivered[0]) < len(long_text)
mock_logger.warning.assert_called_once()
assert "truncating" in mock_logger.warning.call_args[0][0]
def test_no_truncation_under_max_len(self):
"""Text ≤ 2000 chars is passed through intact with no warning."""
delivered = []
with patch(
"adapters.smolagents.send_message_wrapper.logger"
) as mock_logger:
text = "A" * 1500
safe_send_message(text, send_fn=lambda s: delivered.append(s))
expected = f"[smolagents] {text}"
assert delivered[0] == expected
mock_logger.warning.assert_not_called()
def test_debug_log_emitted(self):
"""Every delivery logs at DEBUG with final payload length."""
delivered = []
with patch(
"adapters.smolagents.send_message_wrapper.logger"
) as mock_logger:
safe_send_message("hello", send_fn=lambda s: delivered.append(s))
mock_logger.debug.assert_called_once()
assert "delivering" in mock_logger.debug.call_args[0][0]
def test_label_prefix_always_present(self):
"""Every delivered payload starts with '[smolagents]'."""
delivered = []
safe_send_message("x", send_fn=lambda s: delivered.append(s))
assert delivered[0].startswith("[smolagents]")
@@ -0,0 +1,300 @@
"""Test coverage for shared_runtime helpers (issue #366).
Six helper functions previously had zero test coverage:
_extract_part_text, extract_message_text, format_conversation_history,
build_task_text, append_peer_guidance, brief_task
"""
from __future__ import annotations
from shared_runtime import (
_extract_part_text,
append_peer_guidance,
brief_task,
build_task_text,
extract_message_text,
format_conversation_history,
)
# =============================================================================
# _extract_part_text
# =============================================================================
class TestExtractPartText:
"""Coverage for shared_runtime._extract_part_text()."""
def test_dict_with_text_field(self):
assert _extract_part_text({"text": "hello"}) == "hello"
def test_dict_without_text_field(self):
assert _extract_part_text({"type": "image"}) == ""
def test_dict_with_empty_text_field(self):
assert _extract_part_text({"text": ""}) == ""
def test_dict_with_root_nesting(self):
"""Text buried in part['root']['text'] is extracted."""
assert _extract_part_text({"root": {"text": "nested"}}) == "nested"
def test_dict_with_root_non_dict(self):
"""part['root'] that is not a dict is safely skipped."""
assert _extract_part_text({"root": "string", "text": "top"}) == "top"
def test_object_with_text_attribute(self):
class FakePart:
text = "attr-text"
assert _extract_part_text(FakePart()) == "attr-text"
def test_object_with_root_object_with_text(self):
"""Object with root.attr.text is extracted (A2A v1 object style)."""
class FakeRoot:
text = "root-attr-text"
class FakePart:
root = FakeRoot()
assert _extract_part_text(FakePart()) == "root-attr-text"
def test_object_with_empty_text_attribute(self):
class FakePart:
text = ""
assert _extract_part_text(FakePart()) == ""
def test_none_input(self):
assert _extract_part_text(None) == ""
def test_unexpected_type(self):
"""Plain int/float/bool falls through to empty string."""
assert _extract_part_text(42) == ""
# =============================================================================
# extract_message_text
# =============================================================================
class TestExtractMessageText:
"""Coverage for shared_runtime.extract_message_text()."""
def test_list_of_dict_parts(self):
parts = [{"text": "hello"}, {"text": "world"}]
assert extract_message_text(parts) == "hello world"
def test_single_part(self):
assert extract_message_text([{"text": "single"}]) == "single"
def test_context_object_with_message_parts(self):
"""RequestContext-like: .message.parts is the parts list."""
class FakeContext:
class _Msg:
parts = [{"text": "from context"}]
message = _Msg()
assert extract_message_text(FakeContext()) == "from context"
def test_context_object_without_message(self):
"""No .message attr → falls back to treating input as a parts list."""
class FakeContext:
pass # no .message
# Pass a list directly as the context-like object
assert extract_message_text([{"text": "fallback"}]) == "fallback"
def test_whitespace_normalized(self):
"""Leading/trailing whitespace is stripped; internal newlines are preserved."""
parts = [{"text": " hello "}, {"text": "\nworld\n"}]
result = extract_message_text(parts)
# Leading/trailing stripped, but internal \n stays (join uses single space)
assert result == "hello \nworld"
assert not result.startswith(" ")
assert not result.endswith(" ")
def test_empty_parts_list(self):
assert extract_message_text([]) == ""
# =============================================================================
# format_conversation_history
# =============================================================================
class TestFormatConversationHistory:
"""Coverage for shared_runtime.format_conversation_history()."""
def test_single_user_message(self):
hist = [("human", "hello")]
out = format_conversation_history(hist)
assert out == "User: hello"
def test_single_agent_message(self):
hist = [("ai", "response")]
out = format_conversation_history(hist)
assert out == "Agent: response"
def test_interleaved_history(self):
hist = [
("human", "hello"),
("ai", "hi there"),
("human", "what is 2+2?"),
("ai", "four"),
]
out = format_conversation_history(hist)
lines = out.split("\n")
assert lines[0] == "User: hello"
assert lines[1] == "Agent: hi there"
assert lines[2] == "User: what is 2+2?"
assert lines[3] == "Agent: four"
def test_empty_history(self):
assert format_conversation_history([]) == ""
# =============================================================================
# build_task_text
# =============================================================================
class TestBuildTaskText:
"""Coverage for shared_runtime.build_task_text()."""
def test_no_history_returns_user_message_unchanged(self):
assert build_task_text("do the thing", []) == "do the thing"
def test_history_prepends_transcript(self):
hist = [("human", "hello"), ("ai", "hi")]
result = build_task_text("follow-up", hist)
assert "Conversation so far:" in result
assert "User: hello" in result
assert "Agent: hi" in result
assert "follow-up" in result
def test_user_message_after_conversation_header(self):
hist = [("human", "hello")]
result = build_task_text("do it", hist)
assert result.startswith("Conversation so far:")
assert result.endswith("Current request: do it")
def test_empty_user_message_with_history(self):
"""Empty user_message is still rendered with history."""
hist = [("human", "hello")]
result = build_task_text("", hist)
assert "Conversation so far:" in result
assert "Current request:" in result
# =============================================================================
# append_peer_guidance
# =============================================================================
class TestAppendPeerGuidance:
"""Coverage for shared_runtime.append_peer_guidance()."""
def test_base_text_appended(self):
result = append_peer_guidance(
"base text",
peers_info="alpha: ws-1",
default_text="default",
tool_name="delegate_task",
)
assert result.startswith("base text")
assert "## Peers" in result
assert "alpha: ws-1" in result
assert "Use delegate_task" in result
def test_null_base_text_uses_default(self):
result = append_peer_guidance(
None,
peers_info="peer info",
default_text="DEFAULT_TEXT",
tool_name="tool",
)
assert result.startswith("DEFAULT_TEXT")
def test_whitespace_base_text_strips_to_empty_peers_still_added(self):
"""Whitespace-only base_text is stripped but default_text is NOT used
(only None triggers the fallback). The peers section is still appended."""
result = append_peer_guidance(
" ",
peers_info="peer",
default_text="DEF",
tool_name="t",
)
# " ".strip() == ""; default_text is NOT substituted for whitespace
assert "## Peers" in result
assert "peer" in result
assert "DEF" not in result # default_text only on None, not whitespace
def test_none_base_text_uses_default(self):
"""None base_text triggers fallback to default_text."""
result = append_peer_guidance(
None,
peers_info="peer",
default_text="DEFAULT",
tool_name="tool",
)
assert result.startswith("DEFAULT")
assert "## Peers" in result
def test_empty_peers_info_skips_section(self):
result = append_peer_guidance(
"base",
peers_info="",
default_text="def",
tool_name="tool",
)
# No "## Peers" section when peers_info is empty
assert result == "base"
def test_whitespace_in_base_and_peers_normalized(self):
result = append_peer_guidance(
" base \n",
peers_info=" peer-1 \n",
default_text="def",
tool_name="tool",
)
# Base should be stripped of leading/trailing whitespace
assert result.startswith("base")
# Peer info should be appended
assert "peer-1" in result
# =============================================================================
# brief_task
# =============================================================================
class TestBriefTask:
"""Coverage for shared_runtime.brief_task()."""
def test_short_text_returned_unchanged(self):
assert brief_task("hello", limit=60) == "hello"
def test_exact_limit_no_ellipsis(self):
text = "A" * 60
assert brief_task(text, limit=60) == text
assert "..." not in text
def test_truncated_with_ellipsis(self):
text = "A" * 80
result = brief_task(text, limit=60)
assert len(result) == 63 # 60 chars + "..."
assert result.endswith("...")
def test_limit_10_shortens(self):
result = brief_task("hello world", limit=10)
assert len(result) == 13 # 10 chars + "..."
assert result.endswith("...")
def test_limit_0_returns_ellipsis(self):
"""limit=0 → 0-char slice + "..." since len("hello") > 0."""
result = brief_task("hello", limit=0)
assert result == "..."
def test_limit_1_single_char_plus_ellipsis(self):
result = brief_task("hello", limit=1)
assert len(result) == 4 # 1 char + "..."
assert result.startswith("h")
assert result.endswith("...")