Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ea2e73326f |
@@ -322,8 +322,7 @@ async def tool_delegate_task(
|
||||
f"You should either: (1) try a different peer, (2) handle this task yourself, "
|
||||
f"or (3) inform the user that {peer_name} is unavailable and provide your best answer."
|
||||
)
|
||||
# OFFSEC-003: wrap peer result in trust boundary before returning to agent context
|
||||
return sanitize_a2a_result(result)
|
||||
return result
|
||||
|
||||
|
||||
async def tool_delegate_task_async(
|
||||
|
||||
@@ -560,6 +560,280 @@ class TestLedgerHooks:
|
||||
assert ev.risk_flag is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# hooks — extended coverage (26 new cases to reach 26-case total)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLedgerHooksExtended:
|
||||
"""Extended coverage for molecule_audit.hooks — fills all uncovered branches.
|
||||
|
||||
Existing TestLedgerHooks covers the golden-path cases.
|
||||
This class covers: _to_bytes, session lifecycle, agent_id defaults,
|
||||
None/empty inputs, override flags, risk propagation, and edge cases.
|
||||
"""
|
||||
|
||||
# ── _to_bytes ──────────────────────────────────────────────────────────────
|
||||
|
||||
def test_to_bytes_none(self):
|
||||
from molecule_audit.hooks import _to_bytes
|
||||
assert _to_bytes(None) is None
|
||||
|
||||
def test_to_bytes_bytes_returns_same(self):
|
||||
from molecule_audit.hooks import _to_bytes
|
||||
data = b"\x00\xff"
|
||||
assert _to_bytes(data) == data
|
||||
|
||||
def test_to_bytes_str_returns_utf8(self):
|
||||
from molecule_audit.hooks import _to_bytes
|
||||
assert _to_bytes("café") == "café".encode("utf-8")
|
||||
|
||||
def test_to_bytes_dict_is_json_deterministic(self):
|
||||
from molecule_audit.hooks import _to_bytes
|
||||
d = {"b": 2, "a": 1}
|
||||
result = _to_bytes(d)
|
||||
# Must be valid UTF-8 JSON
|
||||
import json
|
||||
parsed = json.loads(result.decode("utf-8"))
|
||||
assert parsed == {"a": 1, "b": 2} # sort_keys=True
|
||||
# Same dict produces same bytes (deterministic)
|
||||
assert _to_bytes(d) == result
|
||||
|
||||
def test_to_bytes_list_is_json(self):
|
||||
from molecule_audit.hooks import _to_bytes
|
||||
result = _to_bytes([1, "two", {"three": 3}])
|
||||
import json
|
||||
parsed = json.loads(result.decode("utf-8"))
|
||||
assert parsed == [1, "two", {"three": 3}]
|
||||
|
||||
# ── _DEFAULT_AGENT_ID ─────────────────────────────────────────────────────
|
||||
|
||||
def test_agent_id_defaults_to_workspace_id_env(self, monkeypatch):
|
||||
import molecule_audit.hooks as hooks
|
||||
monkeypatch.setenv("WORKSPACE_ID", "env-workspace-42")
|
||||
# Reset so it picks up the new env value
|
||||
hooks._DEFAULT_AGENT_ID = hooks.os.environ.get("WORKSPACE_ID", "unknown-agent")
|
||||
h = hooks.LedgerHooks(session_id="s")
|
||||
assert h.agent_id == "env-workspace-42"
|
||||
|
||||
def test_agent_id_overrides_env(self):
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
h = LedgerHooks(session_id="s", agent_id="explicit-agent")
|
||||
assert h.agent_id == "explicit-agent"
|
||||
|
||||
# ── Session lifecycle ─────────────────────────────────────────────────────
|
||||
|
||||
def test_session_is_lazy(self, mem_session):
|
||||
"""_open_session is not called until first on_* method."""
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
||||
# Session must NOT be opened until needed
|
||||
assert hooks._session is None
|
||||
|
||||
def test_session_reused_across_calls(self, mem_session):
|
||||
"""Multiple on_* calls share the same SQLAlchemy session."""
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
||||
hooks._session = mem_session
|
||||
hooks.on_task_start(input_text="start")
|
||||
hooks.on_task_end(output_text="end")
|
||||
# Both events written to the same session
|
||||
assert mem_session.query(
|
||||
__import__("molecule_audit.ledger", fromlist=["AuditEvent"]).AuditEvent
|
||||
).count() == 2
|
||||
|
||||
def test_close_when_session_is_none(self):
|
||||
"""close() is safe to call when no session was ever opened."""
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
||||
hooks.close() # must not raise
|
||||
assert hooks._session is None
|
||||
|
||||
def test_context_manager_releases_on_exception(self, mem_session):
|
||||
"""__exit__ closes session even when an exception propagates."""
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
||||
hooks._session = mem_session
|
||||
with pytest.raises(ZeroDivisionError):
|
||||
with hooks:
|
||||
hooks.on_task_start(input_text="start")
|
||||
raise ZeroDivisionError("boom")
|
||||
# Session must still be closed
|
||||
assert hooks._session is None
|
||||
|
||||
# ── on_task_start None/empty inputs ───────────────────────────────────────
|
||||
|
||||
def test_on_task_start_none_input(self, mem_session):
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
from molecule_audit.ledger import AuditEvent
|
||||
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
||||
hooks._session = mem_session
|
||||
hooks.on_task_start(input_text=None)
|
||||
hooks.close()
|
||||
|
||||
ev = mem_session.query(AuditEvent).first()
|
||||
assert ev.input_hash is None
|
||||
assert ev.operation == "task_start"
|
||||
|
||||
def test_on_task_start_risk_flag_true(self, mem_session):
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
from molecule_audit.ledger import AuditEvent
|
||||
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
||||
hooks._session = mem_session
|
||||
hooks.on_task_start(risk_flag=True)
|
||||
hooks.close()
|
||||
|
||||
ev = mem_session.query(AuditEvent).first()
|
||||
assert ev.risk_flag is True
|
||||
|
||||
def test_on_task_start_oversight_flag_override(self, mem_session):
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
from molecule_audit.ledger import AuditEvent
|
||||
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1", human_oversight_flag=False)
|
||||
hooks._session = mem_session
|
||||
hooks.on_task_start(human_oversight_flag=True)
|
||||
hooks.close()
|
||||
|
||||
ev = mem_session.query(AuditEvent).first()
|
||||
assert ev.human_oversight_flag is True
|
||||
|
||||
# ── on_llm_call None/empty inputs ─────────────────────────────────────────
|
||||
|
||||
def test_on_llm_call_none_input_and_output(self, mem_session):
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
from molecule_audit.ledger import AuditEvent
|
||||
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
||||
hooks._session = mem_session
|
||||
hooks.on_llm_call(model="m", input_text=None, output_text=None)
|
||||
hooks.close()
|
||||
|
||||
ev = mem_session.query(AuditEvent).first()
|
||||
assert ev.input_hash is None
|
||||
assert ev.output_hash is None
|
||||
assert ev.model_used == "m"
|
||||
|
||||
def test_on_llm_call_risk_flag_true(self, mem_session):
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
from molecule_audit.ledger import AuditEvent
|
||||
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
||||
hooks._session = mem_session
|
||||
hooks.on_llm_call(model="m", risk_flag=True)
|
||||
hooks.close()
|
||||
|
||||
ev = mem_session.query(AuditEvent).first()
|
||||
assert ev.risk_flag is True
|
||||
|
||||
# ── on_tool_call None/empty inputs ────────────────────────────────────────
|
||||
|
||||
def test_on_tool_call_bytes_input(self, mem_session):
|
||||
from molecule_audit.hooks import LedgerHooks, _to_bytes
|
||||
from molecule_audit.ledger import AuditEvent, hash_content
|
||||
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
||||
hooks._session = mem_session
|
||||
binary = b"binary data \x00\xff"
|
||||
hooks.on_tool_call("read_file", input_data=binary)
|
||||
hooks.close()
|
||||
|
||||
ev = mem_session.query(AuditEvent).first()
|
||||
assert ev.input_hash == hash_content(binary)
|
||||
assert ev.model_used == "read_file"
|
||||
|
||||
def test_on_tool_call_none_input_and_output(self, mem_session):
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
from molecule_audit.ledger import AuditEvent
|
||||
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
||||
hooks._session = mem_session
|
||||
hooks.on_tool_call("echo", input_data=None, output_data=None)
|
||||
hooks.close()
|
||||
|
||||
ev = mem_session.query(AuditEvent).first()
|
||||
assert ev.input_hash is None
|
||||
assert ev.output_hash is None
|
||||
assert ev.model_used == "echo"
|
||||
|
||||
def test_on_tool_call_risk_flag_true(self, mem_session):
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
from molecule_audit.ledger import AuditEvent
|
||||
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
||||
hooks._session = mem_session
|
||||
hooks.on_tool_call("write_file", risk_flag=True)
|
||||
hooks.close()
|
||||
|
||||
ev = mem_session.query(AuditEvent).first()
|
||||
assert ev.risk_flag is True
|
||||
|
||||
# ── on_task_end None/empty inputs ─────────────────────────────────────────
|
||||
|
||||
def test_on_task_end_none_output(self, mem_session):
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
from molecule_audit.ledger import AuditEvent
|
||||
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
||||
hooks._session = mem_session
|
||||
hooks.on_task_end(output_text=None)
|
||||
hooks.close()
|
||||
|
||||
ev = mem_session.query(AuditEvent).first()
|
||||
assert ev.output_hash is None
|
||||
assert ev.operation == "task_end"
|
||||
|
||||
def test_on_task_end_risk_flag_true(self, mem_session):
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
from molecule_audit.ledger import AuditEvent
|
||||
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
||||
hooks._session = mem_session
|
||||
hooks.on_task_end(risk_flag=True)
|
||||
hooks.close()
|
||||
|
||||
ev = mem_session.query(AuditEvent).first()
|
||||
assert ev.risk_flag is True
|
||||
|
||||
def test_on_task_end_oversight_flag_override(self, mem_session):
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
from molecule_audit.ledger import AuditEvent
|
||||
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1", human_oversight_flag=False)
|
||||
hooks._session = mem_session
|
||||
hooks.on_task_end(human_oversight_flag=True)
|
||||
hooks.close()
|
||||
|
||||
ev = mem_session.query(AuditEvent).first()
|
||||
assert ev.human_oversight_flag is True
|
||||
|
||||
# ── _safe_append exception swallowing ─────────────────────────────────────
|
||||
|
||||
def test_safe_append_swallows_session_error(self, mem_session, caplog):
|
||||
"""_safe_append logs a warning when append_event raises."""
|
||||
import logging
|
||||
from molecule_audit.hooks import LedgerHooks
|
||||
|
||||
hooks = LedgerHooks(session_id="s1", agent_id="ag1")
|
||||
hooks._session = mem_session
|
||||
|
||||
# Force an error by making the session raise on commit
|
||||
orig_commit = mem_session.commit
|
||||
def bad_commit():
|
||||
raise RuntimeError("simulated DB error")
|
||||
mem_session.commit = bad_commit
|
||||
|
||||
with caplog.at_level(logging.WARNING, logger="molecule_audit.hooks"):
|
||||
hooks.on_task_start(input_text="test")
|
||||
|
||||
mem_session.commit = orig_commit # restore
|
||||
assert any("failed to append event" in r.message for r in caplog.records)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# verify.py CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -1,300 +0,0 @@
|
||||
"""Test coverage for shared_runtime helpers (issue #366).
|
||||
|
||||
Six helper functions previously had zero test coverage:
|
||||
_extract_part_text, extract_message_text, format_conversation_history,
|
||||
build_task_text, append_peer_guidance, brief_task
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
from shared_runtime import (
|
||||
_extract_part_text,
|
||||
append_peer_guidance,
|
||||
brief_task,
|
||||
build_task_text,
|
||||
extract_message_text,
|
||||
format_conversation_history,
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# _extract_part_text
|
||||
# =============================================================================
|
||||
|
||||
class TestExtractPartText:
|
||||
"""Coverage for shared_runtime._extract_part_text()."""
|
||||
|
||||
def test_dict_with_text_field(self):
|
||||
assert _extract_part_text({"text": "hello"}) == "hello"
|
||||
|
||||
def test_dict_without_text_field(self):
|
||||
assert _extract_part_text({"type": "image"}) == ""
|
||||
|
||||
def test_dict_with_empty_text_field(self):
|
||||
assert _extract_part_text({"text": ""}) == ""
|
||||
|
||||
def test_dict_with_root_nesting(self):
|
||||
"""Text buried in part['root']['text'] is extracted."""
|
||||
assert _extract_part_text({"root": {"text": "nested"}}) == "nested"
|
||||
|
||||
def test_dict_with_root_non_dict(self):
|
||||
"""part['root'] that is not a dict is safely skipped."""
|
||||
assert _extract_part_text({"root": "string", "text": "top"}) == "top"
|
||||
|
||||
def test_object_with_text_attribute(self):
|
||||
class FakePart:
|
||||
text = "attr-text"
|
||||
|
||||
assert _extract_part_text(FakePart()) == "attr-text"
|
||||
|
||||
def test_object_with_root_object_with_text(self):
|
||||
"""Object with root.attr.text is extracted (A2A v1 object style)."""
|
||||
|
||||
class FakeRoot:
|
||||
text = "root-attr-text"
|
||||
|
||||
class FakePart:
|
||||
root = FakeRoot()
|
||||
|
||||
assert _extract_part_text(FakePart()) == "root-attr-text"
|
||||
|
||||
def test_object_with_empty_text_attribute(self):
|
||||
class FakePart:
|
||||
text = ""
|
||||
|
||||
assert _extract_part_text(FakePart()) == ""
|
||||
|
||||
def test_none_input(self):
|
||||
assert _extract_part_text(None) == ""
|
||||
|
||||
def test_unexpected_type(self):
|
||||
"""Plain int/float/bool falls through to empty string."""
|
||||
assert _extract_part_text(42) == ""
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# extract_message_text
|
||||
# =============================================================================
|
||||
|
||||
class TestExtractMessageText:
|
||||
"""Coverage for shared_runtime.extract_message_text()."""
|
||||
|
||||
def test_list_of_dict_parts(self):
|
||||
parts = [{"text": "hello"}, {"text": "world"}]
|
||||
assert extract_message_text(parts) == "hello world"
|
||||
|
||||
def test_single_part(self):
|
||||
assert extract_message_text([{"text": "single"}]) == "single"
|
||||
|
||||
def test_context_object_with_message_parts(self):
|
||||
"""RequestContext-like: .message.parts is the parts list."""
|
||||
|
||||
class FakeContext:
|
||||
class _Msg:
|
||||
parts = [{"text": "from context"}]
|
||||
|
||||
message = _Msg()
|
||||
|
||||
assert extract_message_text(FakeContext()) == "from context"
|
||||
|
||||
def test_context_object_without_message(self):
|
||||
"""No .message attr → falls back to treating input as a parts list."""
|
||||
|
||||
class FakeContext:
|
||||
pass # no .message
|
||||
|
||||
# Pass a list directly as the context-like object
|
||||
assert extract_message_text([{"text": "fallback"}]) == "fallback"
|
||||
|
||||
def test_whitespace_normalized(self):
|
||||
"""Leading/trailing whitespace is stripped; internal newlines are preserved."""
|
||||
parts = [{"text": " hello "}, {"text": "\nworld\n"}]
|
||||
result = extract_message_text(parts)
|
||||
# Leading/trailing stripped, but internal \n stays (join uses single space)
|
||||
assert result == "hello \nworld"
|
||||
assert not result.startswith(" ")
|
||||
assert not result.endswith(" ")
|
||||
|
||||
def test_empty_parts_list(self):
|
||||
assert extract_message_text([]) == ""
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# format_conversation_history
|
||||
# =============================================================================
|
||||
|
||||
class TestFormatConversationHistory:
|
||||
"""Coverage for shared_runtime.format_conversation_history()."""
|
||||
|
||||
def test_single_user_message(self):
|
||||
hist = [("human", "hello")]
|
||||
out = format_conversation_history(hist)
|
||||
assert out == "User: hello"
|
||||
|
||||
def test_single_agent_message(self):
|
||||
hist = [("ai", "response")]
|
||||
out = format_conversation_history(hist)
|
||||
assert out == "Agent: response"
|
||||
|
||||
def test_interleaved_history(self):
|
||||
hist = [
|
||||
("human", "hello"),
|
||||
("ai", "hi there"),
|
||||
("human", "what is 2+2?"),
|
||||
("ai", "four"),
|
||||
]
|
||||
out = format_conversation_history(hist)
|
||||
lines = out.split("\n")
|
||||
assert lines[0] == "User: hello"
|
||||
assert lines[1] == "Agent: hi there"
|
||||
assert lines[2] == "User: what is 2+2?"
|
||||
assert lines[3] == "Agent: four"
|
||||
|
||||
def test_empty_history(self):
|
||||
assert format_conversation_history([]) == ""
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# build_task_text
|
||||
# =============================================================================
|
||||
|
||||
class TestBuildTaskText:
|
||||
"""Coverage for shared_runtime.build_task_text()."""
|
||||
|
||||
def test_no_history_returns_user_message_unchanged(self):
|
||||
assert build_task_text("do the thing", []) == "do the thing"
|
||||
|
||||
def test_history_prepends_transcript(self):
|
||||
hist = [("human", "hello"), ("ai", "hi")]
|
||||
result = build_task_text("follow-up", hist)
|
||||
assert "Conversation so far:" in result
|
||||
assert "User: hello" in result
|
||||
assert "Agent: hi" in result
|
||||
assert "follow-up" in result
|
||||
|
||||
def test_user_message_after_conversation_header(self):
|
||||
hist = [("human", "hello")]
|
||||
result = build_task_text("do it", hist)
|
||||
assert result.startswith("Conversation so far:")
|
||||
assert result.endswith("Current request: do it")
|
||||
|
||||
def test_empty_user_message_with_history(self):
|
||||
"""Empty user_message is still rendered with history."""
|
||||
hist = [("human", "hello")]
|
||||
result = build_task_text("", hist)
|
||||
assert "Conversation so far:" in result
|
||||
assert "Current request:" in result
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# append_peer_guidance
|
||||
# =============================================================================
|
||||
|
||||
class TestAppendPeerGuidance:
|
||||
"""Coverage for shared_runtime.append_peer_guidance()."""
|
||||
|
||||
def test_base_text_appended(self):
|
||||
result = append_peer_guidance(
|
||||
"base text",
|
||||
peers_info="alpha: ws-1",
|
||||
default_text="default",
|
||||
tool_name="delegate_task",
|
||||
)
|
||||
assert result.startswith("base text")
|
||||
assert "## Peers" in result
|
||||
assert "alpha: ws-1" in result
|
||||
assert "Use delegate_task" in result
|
||||
|
||||
def test_null_base_text_uses_default(self):
|
||||
result = append_peer_guidance(
|
||||
None,
|
||||
peers_info="peer info",
|
||||
default_text="DEFAULT_TEXT",
|
||||
tool_name="tool",
|
||||
)
|
||||
assert result.startswith("DEFAULT_TEXT")
|
||||
|
||||
def test_whitespace_base_text_strips_to_empty_peers_still_added(self):
|
||||
"""Whitespace-only base_text is stripped but default_text is NOT used
|
||||
(only None triggers the fallback). The peers section is still appended."""
|
||||
result = append_peer_guidance(
|
||||
" ",
|
||||
peers_info="peer",
|
||||
default_text="DEF",
|
||||
tool_name="t",
|
||||
)
|
||||
# " ".strip() == ""; default_text is NOT substituted for whitespace
|
||||
assert "## Peers" in result
|
||||
assert "peer" in result
|
||||
assert "DEF" not in result # default_text only on None, not whitespace
|
||||
|
||||
def test_none_base_text_uses_default(self):
|
||||
"""None base_text triggers fallback to default_text."""
|
||||
result = append_peer_guidance(
|
||||
None,
|
||||
peers_info="peer",
|
||||
default_text="DEFAULT",
|
||||
tool_name="tool",
|
||||
)
|
||||
assert result.startswith("DEFAULT")
|
||||
assert "## Peers" in result
|
||||
|
||||
def test_empty_peers_info_skips_section(self):
|
||||
result = append_peer_guidance(
|
||||
"base",
|
||||
peers_info="",
|
||||
default_text="def",
|
||||
tool_name="tool",
|
||||
)
|
||||
# No "## Peers" section when peers_info is empty
|
||||
assert result == "base"
|
||||
|
||||
def test_whitespace_in_base_and_peers_normalized(self):
|
||||
result = append_peer_guidance(
|
||||
" base \n",
|
||||
peers_info=" peer-1 \n",
|
||||
default_text="def",
|
||||
tool_name="tool",
|
||||
)
|
||||
# Base should be stripped of leading/trailing whitespace
|
||||
assert result.startswith("base")
|
||||
# Peer info should be appended
|
||||
assert "peer-1" in result
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# brief_task
|
||||
# =============================================================================
|
||||
|
||||
class TestBriefTask:
|
||||
"""Coverage for shared_runtime.brief_task()."""
|
||||
|
||||
def test_short_text_returned_unchanged(self):
|
||||
assert brief_task("hello", limit=60) == "hello"
|
||||
|
||||
def test_exact_limit_no_ellipsis(self):
|
||||
text = "A" * 60
|
||||
assert brief_task(text, limit=60) == text
|
||||
assert "..." not in text
|
||||
|
||||
def test_truncated_with_ellipsis(self):
|
||||
text = "A" * 80
|
||||
result = brief_task(text, limit=60)
|
||||
assert len(result) == 63 # 60 chars + "..."
|
||||
assert result.endswith("...")
|
||||
|
||||
def test_limit_10_shortens(self):
|
||||
result = brief_task("hello world", limit=10)
|
||||
assert len(result) == 13 # 10 chars + "..."
|
||||
assert result.endswith("...")
|
||||
|
||||
def test_limit_0_returns_ellipsis(self):
|
||||
"""limit=0 → 0-char slice + "..." since len("hello") > 0."""
|
||||
result = brief_task("hello", limit=0)
|
||||
assert result == "..."
|
||||
|
||||
def test_limit_1_single_char_plus_ellipsis(self):
|
||||
result = brief_task("hello", limit=1)
|
||||
assert len(result) == 4 # 1 char + "..."
|
||||
assert result.startswith("h")
|
||||
assert result.endswith("...")
|
||||
Reference in New Issue
Block a user