Compare commits

...

1 Commits

Author SHA1 Message Date
fullstack-engineer b0523a4608 test(workspace): add 39-case coverage for shared_runtime helper functions
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 15s
sop-tier-check / tier-check (pull_request) Successful in 26s
audit-force-merge / audit (pull_request) Has been skipped
Add comprehensive tests for the 6 remaining untested helpers in
shared_runtime.py:
- _extract_part_text: 10 cases covering dict, object, root nesting
- extract_message_text: 6 cases for parts extraction and context objects
- format_conversation_history: 4 cases for role formatting
- build_task_text: 4 cases for history prepending
- append_peer_guidance: 5 cases for peer info injection
- brief_task: 6 cases for truncation

Net new: 39 tests for previously zero-covered helpers.

🤖 Generated with Claude Code

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 03:27:43 +00:00
@@ -0,0 +1,266 @@
"""Tests for shared_runtime helper functions.
Covers the untested helpers in shared_runtime.py:
- _extract_part_text
- extract_message_text
- format_conversation_history
- build_task_text
- append_peer_guidance
- brief_task
Does NOT cover set_current_task (async, covered in test_a2a_executor.py).
"""
from __future__ import annotations
import sys
# Ensure the workspace root is on the path so 'shared_runtime' resolves
_ws_root = __file__.rsplit("/tests/", 1)[0]
if _ws_root not in sys.path:
sys.path.insert(0, _ws_root)
from shared_runtime import (
_extract_part_text,
extract_message_text,
format_conversation_history,
build_task_text,
append_peer_guidance,
brief_task,
)
# ─── _extract_part_text ──────────────────────────────────────────────────────
class TestExtractPartText:
def test_dict_with_text(self):
assert _extract_part_text({"text": "hello world"}) == "hello world"
def test_dict_with_nested_root_text(self):
assert _extract_part_text({"root": {"text": "nested text"}}) == "nested text"
def test_dict_prefers_text_over_root(self):
# When both text and root exist, text wins (outer text)
assert _extract_part_text({"text": "outer", "root": {"text": "inner"}}) == "outer"
def test_dict_empty_text_and_root(self):
assert _extract_part_text({"kind": "text"}) == ""
def test_dict_missing_fields(self):
assert _extract_part_text({"kind": "image"}) == ""
def test_dict_mixed_with_extra_fields(self):
assert _extract_part_text({"kind": "text", "text": "foo", "url": "http://..."}) == "foo"
def test_object_with_text_attribute(self):
class PartObj:
text = "object text"
assert _extract_part_text(PartObj()) == "object text"
def test_object_with_root_text_attribute(self):
class RootObj:
text = "root object text"
class PartObj:
root = RootObj()
assert _extract_part_text(PartObj()) == "root object text"
def test_object_empty_text(self):
class EmptyObj:
text = ""
assert _extract_part_text(EmptyObj()) == ""
def test_object_no_text_or_root(self):
class NoTextObj:
pass
assert _extract_part_text(NoTextObj()) == ""
def test_none_like(self):
assert _extract_part_text(None) == ""
# ─── extract_message_text ────────────────────────────────────────────────────
class TestExtractMessageText:
def test_list_of_dict_parts(self):
parts = [{"text": "hello"}, {"text": "world"}]
assert extract_message_text(parts) == "hello world"
def test_single_part(self):
parts = [{"text": "only one"}]
assert extract_message_text(parts) == "only one"
def test_empty_list(self):
assert extract_message_text([]) == ""
def test_none_parts(self):
assert extract_message_text(None) == ""
def test_object_with_message_parts(self):
"""Object with .message.parts attribute (A2A RequestContext pattern)."""
msg = type("Message", (), {"parts": [{"text": "from context"}, {"text": "message"}]})()
ctx = type("Context", (), {"message": msg})()
assert extract_message_text(ctx) == "from context message"
def test_joins_with_single_space(self):
# Inter-part join uses single space; internal whitespace within parts is preserved
parts = [{"text": "hello"}, {"text": "world"}]
assert extract_message_text(parts) == "hello world"
def test_preserves_within_part_whitespace(self):
parts = [{"text": " spaced "}, {"text": "\ttext\t"}]
# Leading/trailing whitespace stripped; internal whitespace within parts preserved
assert extract_message_text(parts) == "spaced \ttext"
def test_skips_parts_without_text(self):
parts = [{"kind": "image"}, {"text": "visible"}, {"url": "http://x"}]
assert extract_message_text(parts) == "visible"
# ─── format_conversation_history ──────────────────────────────────────────────
class TestFormatConversationHistory:
def test_empty_history(self):
assert format_conversation_history([]) == ""
def test_single_user_message(self):
result = format_conversation_history([("human", "hello")])
assert "User: hello" in result
def test_single_agent_message(self):
result = format_conversation_history([("ai", "hi there")])
assert "Agent: hi there" in result
def test_interleaved_history(self):
history = [
("human", "first"),
("ai", "response one"),
("human", "second"),
("ai", "response two"),
]
result = format_conversation_history(history)
lines = result.strip().split("\n")
assert len(lines) == 4
assert lines[0] == "User: first"
assert lines[1] == "Agent: response one"
assert lines[2] == "User: second"
assert lines[3] == "Agent: response two"
# ─── build_task_text ──────────────────────────────────────────────────────────
class TestBuildTaskText:
def test_no_history_returns_user_message(self):
assert build_task_text("hello", []) == "hello"
def test_history_prepends_transcript(self):
history = [("human", "hi"), ("ai", "hello")]
result = build_task_text("send email", history)
assert "Conversation so far:" in result
assert "User: hi" in result
assert "Agent: hello" in result
assert "Current request: send email" in result
def test_empty_history_returns_user_message(self):
# Empty list should behave like no history
assert build_task_text("hello", []) == "hello"
def test_single_history_entry(self):
result = build_task_text("bye", [("human", "last")])
assert "User: last" in result
assert "Current request: bye" in result
# ─── append_peer_guidance ─────────────────────────────────────────────────────
class TestAppendPeerGuidance:
def test_no_base_text_uses_default(self):
result = append_peer_guidance(
None,
"peer info here",
default_text="default",
tool_name="delegate_task",
)
assert "peer info here" in result
assert "## Peers" in result
assert "delegate_task" in result
assert "default" in result
def test_base_text_preserved(self):
result = append_peer_guidance(
"my prompt",
"peer info",
default_text="fallback",
tool_name="delegate_task",
)
assert "my prompt" in result
assert "## Peers" in result
def test_empty_peers_info_skipped(self):
result = append_peer_guidance(
"my prompt",
"",
default_text="fallback",
tool_name="delegate_task",
)
assert result == "my prompt"
def test_whitespace_trimmed(self):
result = append_peer_guidance(
" prompt ",
" peers ",
default_text="fallback",
tool_name="delegate_task",
)
# Should not double-space
assert " " not in result
def test_tool_name_injected(self):
result = append_peer_guidance(
None,
"peer info",
default_text="default",
tool_name="my_tool",
)
assert "my_tool" in result
# ─── brief_task ───────────────────────────────────────────────────────────────
class TestBriefTask:
def test_short_text_unchanged(self):
assert brief_task("hello world") == "hello world"
def test_exactly_at_limit(self):
text = "a" * 60
assert brief_task(text) == text
def test_over_limit_truncates(self):
text = "a" * 100
result = brief_task(text)
assert len(result) == 63 # 60 + "..."
assert result.endswith("...")
def test_under_limit_no_ellipsis(self):
text = "a" * 59
result = brief_task(text)
assert result == text
assert "..." not in result
def test_default_limit_60(self):
text = "a" * 70
result = brief_task(text, limit=60)
assert len(result) == 63
def test_custom_limit(self):
text = "a" * 20
result = brief_task(text, limit=10)
assert len(result) == 13 # 10 + "..."
def test_empty_string(self):
assert brief_task("") == ""
assert brief_task("") == "" # no ellipsis for empty