Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 57f7d5a178 |
@@ -0,0 +1,299 @@
|
||||
"""Tests for adapters/smolagents/send_message_wrapper and builtin_tools/a2a_tools.
|
||||
|
||||
Covers zero-coverage entry points:
|
||||
- adapters/smolagents/send_message_wrapper: safe_send_message (C1 HIGH security wrapper)
|
||||
- builtin_tools/a2a_tools: list_peers, delegate_task, get_peers_summary
|
||||
|
||||
Uses respx to mock httpx HTTP calls.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
# Ensure workspace root is on path for imports
|
||||
_ws_root = __file__.rsplit("/tests/", 1)[0]
|
||||
if _ws_root not in sys.path:
|
||||
sys.path.insert(0, _ws_root)
|
||||
|
||||
# ─── safe_send_message tests ──────────────────────────────────────────────────
|
||||
|
||||
import respx
|
||||
|
||||
from adapters.smolagents.send_message_wrapper import safe_send_message
|
||||
|
||||
|
||||
class TestSafeSendMessage:
|
||||
def test_basic_message(self):
|
||||
"""Normal message is sanitised and delivered."""
|
||||
mock_fn = MagicMock()
|
||||
safe_send_message("hello world", send_fn=mock_fn)
|
||||
mock_fn.assert_called_once()
|
||||
arg = mock_fn.call_args[0][0]
|
||||
assert arg.startswith("[smolagents]")
|
||||
assert "hello world" in arg
|
||||
|
||||
def test_html_entities_escaped(self):
|
||||
"""Angle brackets and quotes are HTML-escaped."""
|
||||
mock_fn = MagicMock()
|
||||
safe_send_message('<script>alert("xss")</script>', send_fn=mock_fn)
|
||||
payload = mock_fn.call_args[0][0]
|
||||
assert "<script>" not in payload
|
||||
assert "<script>" in payload
|
||||
assert ""xss"" in payload
|
||||
|
||||
def test_ampersand_escaped(self):
|
||||
"""Ampersands are escaped to &."""
|
||||
mock_fn = MagicMock()
|
||||
safe_send_message("foo & bar", send_fn=mock_fn)
|
||||
payload = mock_fn.call_args[0][0]
|
||||
assert "&" in payload
|
||||
assert "&" not in payload.replace("&", "")
|
||||
|
||||
def test_single_quotes_escaped(self):
|
||||
"""Single quotes are escaped."""
|
||||
mock_fn = MagicMock()
|
||||
safe_send_message("it's fine", send_fn=mock_fn)
|
||||
payload = mock_fn.call_args[0][0]
|
||||
assert "'" in payload or "'" in payload or "'" not in payload
|
||||
|
||||
def test_truncation_at_2000_chars(self, caplog):
|
||||
"""Messages over 2000 chars are truncated and warn."""
|
||||
long_text = "x" * 3000
|
||||
mock_fn = MagicMock()
|
||||
with caplog.at_level(logging.WARNING):
|
||||
safe_send_message(long_text, send_fn=mock_fn)
|
||||
assert mock_fn.called
|
||||
payload = mock_fn.call_args[0][0]
|
||||
# Label prefix = 13 chars ("[smolagents] "), content cap = 2000
|
||||
# Total payload should be 2013 chars
|
||||
assert len(payload) <= 2013
|
||||
assert "truncating" in caplog.text.lower() or any(
|
||||
"truncat" in r.message.lower() for r in caplog.records
|
||||
)
|
||||
|
||||
def test_exactly_2000_chars_no_truncation(self):
|
||||
"""At-limit messages are not truncated."""
|
||||
text = "x" * 2000
|
||||
mock_fn = MagicMock()
|
||||
safe_send_message(text, send_fn=mock_fn)
|
||||
payload = mock_fn.call_args[0][0]
|
||||
# Content (2000) + label prefix (13) = 2013
|
||||
assert len(payload) == 2013
|
||||
|
||||
def test_non_string_converted(self):
|
||||
"""Non-strings are converted via str()."""
|
||||
mock_fn = MagicMock()
|
||||
safe_send_message(12345, send_fn=mock_fn)
|
||||
mock_fn.assert_called_once()
|
||||
assert "12345" in mock_fn.call_args[0][0]
|
||||
|
||||
def test_empty_string(self):
|
||||
"""Empty string is delivered with label."""
|
||||
mock_fn = MagicMock()
|
||||
safe_send_message("", send_fn=mock_fn)
|
||||
mock_fn.assert_called_once()
|
||||
assert "[smolagents]" in mock_fn.call_args[0][0]
|
||||
|
||||
def test_unicode_preserved(self):
|
||||
"""Unicode text is preserved."""
|
||||
mock_fn = MagicMock()
|
||||
safe_send_message("こんにちは世界 🎉", send_fn=mock_fn)
|
||||
payload = mock_fn.call_args[0][0]
|
||||
assert "こんにちは世界" in payload
|
||||
assert "🎉" in payload
|
||||
|
||||
def test_send_fn_called_with_single_arg(self):
|
||||
"""send_fn is called with exactly the sanitised string."""
|
||||
mock_fn = MagicMock()
|
||||
safe_send_message("hello", send_fn=mock_fn)
|
||||
call_args = mock_fn.call_args[0]
|
||||
assert len(call_args) == 1
|
||||
assert isinstance(call_args[0], str)
|
||||
|
||||
|
||||
# ─── builtin_tools/a2a_tools tests ───────────────────────────────────────────
|
||||
|
||||
import httpx
|
||||
import importlib.util
|
||||
import os
|
||||
|
||||
# conftest.py mocks builtin_tools.a2a_tools with MagicMock (sync) objects.
|
||||
# Load the real module directly to test its actual async behaviour.
|
||||
# Set PLATFORM_URL to a known value so we can mock it with respx.
|
||||
os.environ["PLATFORM_URL"] = "http://test-platform:8080"
|
||||
os.environ["WORKSPACE_ID"] = "test-ws"
|
||||
|
||||
_spec = importlib.util.spec_from_file_location(
|
||||
"builtin_tools.a2a_tools",
|
||||
__file__.rsplit("/tests/", 1)[0] + "/builtin_tools/a2a_tools.py",
|
||||
)
|
||||
_a2a_real = importlib.util.module_from_spec(_spec)
|
||||
# Register as the module so subsequent imports get the real version
|
||||
import sys
|
||||
sys.modules["builtin_tools.a2a_tools"] = _a2a_real
|
||||
_spec.loader.exec_module(_a2a_real)
|
||||
a2a_tools_mod = _a2a_real
|
||||
|
||||
|
||||
class TestListPeers:
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_peers_returns_peers(self):
|
||||
"""list_peers returns parsed JSON list from registry."""
|
||||
# Patch httpx.AsyncClient at module level for this test
|
||||
mock_response = [{"id": "ws-1", "name": "TestPeer"}]
|
||||
with respx.mock:
|
||||
route = respx.get("http://test-platform:8080/registry/test-ws/peers").mock(
|
||||
return_value=httpx.Response(200, json=mock_response)
|
||||
)
|
||||
result = await a2a_tools_mod.list_peers()
|
||||
assert result == mock_response
|
||||
assert route.called
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_peers_returns_empty_on_http_error(self):
|
||||
"""list_peers returns [] when registry returns non-200."""
|
||||
with respx.mock:
|
||||
respx.get("http://test-platform:8080/registry/test-ws/peers").mock(
|
||||
return_value=httpx.Response(500)
|
||||
)
|
||||
result = await a2a_tools_mod.list_peers()
|
||||
assert result == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_list_peers_returns_empty_on_connection_error(self):
|
||||
"""list_peers returns [] on connection failure."""
|
||||
with respx.mock:
|
||||
respx.get("http://test-platform:8080/registry/test-ws/peers").mock(
|
||||
side_effect=httpx.ConnectError("connection refused")
|
||||
)
|
||||
result = await a2a_tools_mod.list_peers()
|
||||
assert result == []
|
||||
|
||||
|
||||
class TestDelegateTask:
|
||||
@pytest.mark.asyncio
|
||||
async def test_delegate_task_success(self):
|
||||
"""delegate_task returns response text on success."""
|
||||
discovery_resp = {"url": "http://peer:8080/a2a"}
|
||||
a2a_resp = {
|
||||
"jsonrpc": "2.0",
|
||||
"result": {"parts": [{"kind": "text", "text": "delegated result"}]},
|
||||
}
|
||||
with respx.mock:
|
||||
respx.get("http://test-platform:8080/registry/discover/ws-target").mock(
|
||||
return_value=httpx.Response(200, json=discovery_resp)
|
||||
)
|
||||
respx.post("http://peer:8080/a2a").mock(
|
||||
return_value=httpx.Response(200, json=a2a_resp)
|
||||
)
|
||||
result = await a2a_tools_mod.delegate_task("ws-target", "do something")
|
||||
assert result == "delegated result"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delegate_task_error_from_peer(self):
|
||||
"""delegate_task returns error string on A2A error response."""
|
||||
discovery_resp = {"url": "http://peer:8080/a2a"}
|
||||
a2a_resp = {
|
||||
"jsonrpc": "2.0",
|
||||
"error": {"message": "workspace not found"},
|
||||
}
|
||||
with respx.mock:
|
||||
respx.get("http://test-platform:8080/registry/discover/ws-target").mock(
|
||||
return_value=httpx.Response(200, json=discovery_resp)
|
||||
)
|
||||
respx.post("http://peer:8080/a2a").mock(
|
||||
return_value=httpx.Response(200, json=a2a_resp)
|
||||
)
|
||||
result = await a2a_tools_mod.delegate_task("ws-target", "do something")
|
||||
assert "workspace not found" in result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delegate_task_string_error(self):
|
||||
"""delegate_task handles string-form error field (regression #279)."""
|
||||
discovery_resp = {"url": "http://peer:8080/a2a"}
|
||||
a2a_resp = {"jsonrpc": "2.0", "error": "string error message"}
|
||||
with respx.mock:
|
||||
respx.get("http://test-platform:8080/registry/discover/ws-target").mock(
|
||||
return_value=httpx.Response(200, json=discovery_resp)
|
||||
)
|
||||
respx.post("http://peer:8080/a2a").mock(
|
||||
return_value=httpx.Response(200, json=a2a_resp)
|
||||
)
|
||||
result = await a2a_tools_mod.delegate_task("ws-target", "task")
|
||||
assert "string error message" in result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delegate_task_no_target_url(self):
|
||||
"""delegate_task returns error when discovery returns no URL."""
|
||||
discovery_resp = {}
|
||||
with respx.mock:
|
||||
respx.get("http://test-platform:8080/registry/discover/ws-target").mock(
|
||||
return_value=httpx.Response(200, json=discovery_resp)
|
||||
)
|
||||
result = await a2a_tools_mod.delegate_task("ws-target", "task")
|
||||
assert "no URL" in result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delegate_task_discovery_500(self):
|
||||
"""delegate_task returns error on discovery failure."""
|
||||
with respx.mock:
|
||||
respx.get("http://test-platform:8080/registry/discover/ws-target").mock(
|
||||
return_value=httpx.Response(503)
|
||||
)
|
||||
result = await a2a_tools_mod.delegate_task("ws-target", "task")
|
||||
assert "cannot reach workspace" in result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delegate_task_empty_parts_returns_str_result(self):
|
||||
"""Empty parts list returns str(result) not '(no text)' (#279 regression)."""
|
||||
discovery_resp = {"url": "http://peer:8080/a2a"}
|
||||
a2a_resp = {
|
||||
"jsonrpc": "2.0",
|
||||
"result": {"parts": [], "status": "completed"},
|
||||
}
|
||||
with respx.mock:
|
||||
respx.get("http://test-platform:8080/registry/discover/ws-target").mock(
|
||||
return_value=httpx.Response(200, json=discovery_resp)
|
||||
)
|
||||
respx.post("http://peer:8080/a2a").mock(
|
||||
return_value=httpx.Response(200, json=a2a_resp)
|
||||
)
|
||||
result = await a2a_tools_mod.delegate_task("ws-target", "task")
|
||||
# Should return str(result), not "(no text)"
|
||||
assert "parts" in result
|
||||
|
||||
|
||||
class TestGetPeersSummary:
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_peers_summary_formats_peers(self):
|
||||
"""get_peers_summary returns formatted string of available peers."""
|
||||
mock_peers = [
|
||||
{"id": "ws-1", "name": "DevAgent", "role": "developer", "status": "online"},
|
||||
{"id": "ws-2", "name": "QA Agent", "role": "qa", "status": "busy"},
|
||||
]
|
||||
with respx.mock:
|
||||
respx.get("http://test-platform:8080/registry/test-ws/peers").mock(
|
||||
return_value=httpx.Response(200, json=mock_peers)
|
||||
)
|
||||
result = await a2a_tools_mod.get_peers_summary()
|
||||
assert "DevAgent" in result
|
||||
assert "QA Agent" in result
|
||||
assert "developer" in result
|
||||
assert "online" in result
|
||||
assert "Available peers:" in result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_peers_summary_no_peers(self):
|
||||
"""get_peers_summary returns 'No peers available.' when registry returns [."""
|
||||
with respx.mock:
|
||||
respx.get("http://test-platform:8080/registry/test-ws/peers").mock(
|
||||
return_value=httpx.Response(200, json=[])
|
||||
)
|
||||
result = await a2a_tools_mod.get_peers_summary()
|
||||
assert result == "No peers available."
|
||||
@@ -1,266 +0,0 @@
|
||||
"""Tests for shared_runtime helper functions.
|
||||
|
||||
Covers the untested helpers in shared_runtime.py:
|
||||
- _extract_part_text
|
||||
- extract_message_text
|
||||
- format_conversation_history
|
||||
- build_task_text
|
||||
- append_peer_guidance
|
||||
- brief_task
|
||||
|
||||
Does NOT cover set_current_task (async, covered in test_a2a_executor.py).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
|
||||
# Ensure the workspace root is on the path so 'shared_runtime' resolves
|
||||
_ws_root = __file__.rsplit("/tests/", 1)[0]
|
||||
if _ws_root not in sys.path:
|
||||
sys.path.insert(0, _ws_root)
|
||||
|
||||
from shared_runtime import (
|
||||
_extract_part_text,
|
||||
extract_message_text,
|
||||
format_conversation_history,
|
||||
build_task_text,
|
||||
append_peer_guidance,
|
||||
brief_task,
|
||||
)
|
||||
|
||||
|
||||
# ─── _extract_part_text ──────────────────────────────────────────────────────
|
||||
|
||||
class TestExtractPartText:
|
||||
def test_dict_with_text(self):
|
||||
assert _extract_part_text({"text": "hello world"}) == "hello world"
|
||||
|
||||
def test_dict_with_nested_root_text(self):
|
||||
assert _extract_part_text({"root": {"text": "nested text"}}) == "nested text"
|
||||
|
||||
def test_dict_prefers_text_over_root(self):
|
||||
# When both text and root exist, text wins (outer text)
|
||||
assert _extract_part_text({"text": "outer", "root": {"text": "inner"}}) == "outer"
|
||||
|
||||
def test_dict_empty_text_and_root(self):
|
||||
assert _extract_part_text({"kind": "text"}) == ""
|
||||
|
||||
def test_dict_missing_fields(self):
|
||||
assert _extract_part_text({"kind": "image"}) == ""
|
||||
|
||||
def test_dict_mixed_with_extra_fields(self):
|
||||
assert _extract_part_text({"kind": "text", "text": "foo", "url": "http://..."}) == "foo"
|
||||
|
||||
def test_object_with_text_attribute(self):
|
||||
class PartObj:
|
||||
text = "object text"
|
||||
|
||||
assert _extract_part_text(PartObj()) == "object text"
|
||||
|
||||
def test_object_with_root_text_attribute(self):
|
||||
class RootObj:
|
||||
text = "root object text"
|
||||
|
||||
class PartObj:
|
||||
root = RootObj()
|
||||
|
||||
assert _extract_part_text(PartObj()) == "root object text"
|
||||
|
||||
def test_object_empty_text(self):
|
||||
class EmptyObj:
|
||||
text = ""
|
||||
|
||||
assert _extract_part_text(EmptyObj()) == ""
|
||||
|
||||
def test_object_no_text_or_root(self):
|
||||
class NoTextObj:
|
||||
pass
|
||||
|
||||
assert _extract_part_text(NoTextObj()) == ""
|
||||
|
||||
def test_none_like(self):
|
||||
assert _extract_part_text(None) == ""
|
||||
|
||||
|
||||
# ─── extract_message_text ────────────────────────────────────────────────────
|
||||
|
||||
class TestExtractMessageText:
|
||||
def test_list_of_dict_parts(self):
|
||||
parts = [{"text": "hello"}, {"text": "world"}]
|
||||
assert extract_message_text(parts) == "hello world"
|
||||
|
||||
def test_single_part(self):
|
||||
parts = [{"text": "only one"}]
|
||||
assert extract_message_text(parts) == "only one"
|
||||
|
||||
def test_empty_list(self):
|
||||
assert extract_message_text([]) == ""
|
||||
|
||||
def test_none_parts(self):
|
||||
assert extract_message_text(None) == ""
|
||||
|
||||
def test_object_with_message_parts(self):
|
||||
"""Object with .message.parts attribute (A2A RequestContext pattern)."""
|
||||
msg = type("Message", (), {"parts": [{"text": "from context"}, {"text": "message"}]})()
|
||||
ctx = type("Context", (), {"message": msg})()
|
||||
assert extract_message_text(ctx) == "from context message"
|
||||
|
||||
def test_joins_with_single_space(self):
|
||||
# Inter-part join uses single space; internal whitespace within parts is preserved
|
||||
parts = [{"text": "hello"}, {"text": "world"}]
|
||||
assert extract_message_text(parts) == "hello world"
|
||||
|
||||
def test_preserves_within_part_whitespace(self):
|
||||
parts = [{"text": " spaced "}, {"text": "\ttext\t"}]
|
||||
# Leading/trailing whitespace stripped; internal whitespace within parts preserved
|
||||
assert extract_message_text(parts) == "spaced \ttext"
|
||||
|
||||
def test_skips_parts_without_text(self):
|
||||
parts = [{"kind": "image"}, {"text": "visible"}, {"url": "http://x"}]
|
||||
assert extract_message_text(parts) == "visible"
|
||||
|
||||
|
||||
# ─── format_conversation_history ──────────────────────────────────────────────
|
||||
|
||||
class TestFormatConversationHistory:
|
||||
def test_empty_history(self):
|
||||
assert format_conversation_history([]) == ""
|
||||
|
||||
def test_single_user_message(self):
|
||||
result = format_conversation_history([("human", "hello")])
|
||||
assert "User: hello" in result
|
||||
|
||||
def test_single_agent_message(self):
|
||||
result = format_conversation_history([("ai", "hi there")])
|
||||
assert "Agent: hi there" in result
|
||||
|
||||
def test_interleaved_history(self):
|
||||
history = [
|
||||
("human", "first"),
|
||||
("ai", "response one"),
|
||||
("human", "second"),
|
||||
("ai", "response two"),
|
||||
]
|
||||
result = format_conversation_history(history)
|
||||
lines = result.strip().split("\n")
|
||||
assert len(lines) == 4
|
||||
assert lines[0] == "User: first"
|
||||
assert lines[1] == "Agent: response one"
|
||||
assert lines[2] == "User: second"
|
||||
assert lines[3] == "Agent: response two"
|
||||
|
||||
|
||||
# ─── build_task_text ──────────────────────────────────────────────────────────
|
||||
|
||||
class TestBuildTaskText:
|
||||
def test_no_history_returns_user_message(self):
|
||||
assert build_task_text("hello", []) == "hello"
|
||||
|
||||
def test_history_prepends_transcript(self):
|
||||
history = [("human", "hi"), ("ai", "hello")]
|
||||
result = build_task_text("send email", history)
|
||||
assert "Conversation so far:" in result
|
||||
assert "User: hi" in result
|
||||
assert "Agent: hello" in result
|
||||
assert "Current request: send email" in result
|
||||
|
||||
def test_empty_history_returns_user_message(self):
|
||||
# Empty list should behave like no history
|
||||
assert build_task_text("hello", []) == "hello"
|
||||
|
||||
def test_single_history_entry(self):
|
||||
result = build_task_text("bye", [("human", "last")])
|
||||
assert "User: last" in result
|
||||
assert "Current request: bye" in result
|
||||
|
||||
|
||||
# ─── append_peer_guidance ─────────────────────────────────────────────────────
|
||||
|
||||
class TestAppendPeerGuidance:
|
||||
def test_no_base_text_uses_default(self):
|
||||
result = append_peer_guidance(
|
||||
None,
|
||||
"peer info here",
|
||||
default_text="default",
|
||||
tool_name="delegate_task",
|
||||
)
|
||||
assert "peer info here" in result
|
||||
assert "## Peers" in result
|
||||
assert "delegate_task" in result
|
||||
assert "default" in result
|
||||
|
||||
def test_base_text_preserved(self):
|
||||
result = append_peer_guidance(
|
||||
"my prompt",
|
||||
"peer info",
|
||||
default_text="fallback",
|
||||
tool_name="delegate_task",
|
||||
)
|
||||
assert "my prompt" in result
|
||||
assert "## Peers" in result
|
||||
|
||||
def test_empty_peers_info_skipped(self):
|
||||
result = append_peer_guidance(
|
||||
"my prompt",
|
||||
"",
|
||||
default_text="fallback",
|
||||
tool_name="delegate_task",
|
||||
)
|
||||
assert result == "my prompt"
|
||||
|
||||
def test_whitespace_trimmed(self):
|
||||
result = append_peer_guidance(
|
||||
" prompt ",
|
||||
" peers ",
|
||||
default_text="fallback",
|
||||
tool_name="delegate_task",
|
||||
)
|
||||
# Should not double-space
|
||||
assert " " not in result
|
||||
|
||||
def test_tool_name_injected(self):
|
||||
result = append_peer_guidance(
|
||||
None,
|
||||
"peer info",
|
||||
default_text="default",
|
||||
tool_name="my_tool",
|
||||
)
|
||||
assert "my_tool" in result
|
||||
|
||||
|
||||
# ─── brief_task ───────────────────────────────────────────────────────────────
|
||||
|
||||
class TestBriefTask:
|
||||
def test_short_text_unchanged(self):
|
||||
assert brief_task("hello world") == "hello world"
|
||||
|
||||
def test_exactly_at_limit(self):
|
||||
text = "a" * 60
|
||||
assert brief_task(text) == text
|
||||
|
||||
def test_over_limit_truncates(self):
|
||||
text = "a" * 100
|
||||
result = brief_task(text)
|
||||
assert len(result) == 63 # 60 + "..."
|
||||
assert result.endswith("...")
|
||||
|
||||
def test_under_limit_no_ellipsis(self):
|
||||
text = "a" * 59
|
||||
result = brief_task(text)
|
||||
assert result == text
|
||||
assert "..." not in result
|
||||
|
||||
def test_default_limit_60(self):
|
||||
text = "a" * 70
|
||||
result = brief_task(text, limit=60)
|
||||
assert len(result) == 63
|
||||
|
||||
def test_custom_limit(self):
|
||||
text = "a" * 20
|
||||
result = brief_task(text, limit=10)
|
||||
assert len(result) == 13 # 10 + "..."
|
||||
|
||||
def test_empty_string(self):
|
||||
assert brief_task("") == ""
|
||||
assert brief_task("") == "" # no ellipsis for empty
|
||||
Reference in New Issue
Block a user