Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| bed7966f9d | |||
| 151b6021fb | |||
| f7da399595 | |||
| 6be36906f4 | |||
| c814aa2210 | |||
| 716ec95b7d |
@@ -139,14 +139,6 @@ jobs:
|
||||
/tmp/smoke/bin/python "$GITHUB_WORKSPACE/scripts/wheel_smoke.py"
|
||||
|
||||
- name: Publish to PyPI
|
||||
# working-directory matches the preceding Build/Verify steps. Without
|
||||
# this, twine runs from the default workspace checkout dir where
|
||||
# `dist/` doesn't exist and fails with:
|
||||
# ERROR InvalidDistribution: Cannot find file (or expand pattern): 'dist/*'
|
||||
# Caught on the first-ever successful dispatch of this workflow
|
||||
# (run 5097, 2026-05-11 02:08Z) — every other step in the publish
|
||||
# job already had this working-directory; Publish was missing it.
|
||||
working-directory: ${{ runner.temp }}/runtime-build
|
||||
env:
|
||||
# PYPI_TOKEN: repository secret scoped to molecule-ai-workspace-runtime.
|
||||
# Set via: Settings → Actions → Variables and Secrets → New Secret.
|
||||
|
||||
@@ -77,23 +77,6 @@ jobs:
|
||||
# works if we never check out PR HEAD. Same SHA the workflow
|
||||
# itself was loaded from.
|
||||
ref: ${{ github.event.pull_request.base.sha }}
|
||||
- name: Install jq
|
||||
# Gitea Actions runners (ubuntu-latest label) do not bundle jq.
|
||||
# The sop-tier-check script uses jq for all JSON API parsing.
|
||||
# Install jq before the script runs so sop-tier-check can pass.
|
||||
#
|
||||
# Method: download binary directly from GitHub releases (faster and
|
||||
# more reliable than apt-get in containerized environments). Falls
|
||||
# back to apt-get if the download fails. The smoke test confirms
|
||||
# jq is on PATH before the main script runs.
|
||||
run: |
|
||||
set -e
|
||||
timeout 60 curl -sSL \
|
||||
"https://github.com/jqlang/jq/releases/download/jq-1.7.1/jq-linux-amd64" \
|
||||
-o /usr/local/bin/jq && chmod +x /usr/local/bin/jq \
|
||||
|| apt-get update -qq && apt-get install -y -qq jq
|
||||
jq --version
|
||||
|
||||
- name: Verify tier label + reviewer team membership
|
||||
env:
|
||||
# SOP_TIER_CHECK_TOKEN is the org-level secret for the
|
||||
|
||||
@@ -365,7 +365,7 @@ jobs:
|
||||
cache: pip
|
||||
cache-dependency-path: workspace/requirements.txt
|
||||
- if: needs.changes.outputs.python == 'true'
|
||||
run: pip install -r requirements.txt pytest pytest-asyncio pytest-cov sqlalchemy>=2.0.0
|
||||
run: pip install -r requirements.txt pytest pytest-asyncio pytest-cov
|
||||
# Coverage flags + fail-under floor moved into workspace/pytest.ini
|
||||
# (issue #1817) so local `pytest` and CI use identical config.
|
||||
- if: needs.changes.outputs.python == 'true'
|
||||
|
||||
@@ -50,7 +50,6 @@ from pathlib import Path
|
||||
# without updating this set), which broke every workspace startup with
|
||||
# `ModuleNotFoundError: No module named 'transcript_auth'`.
|
||||
TOP_LEVEL_MODULES = {
|
||||
"_sanitize_a2a",
|
||||
"a2a_cli",
|
||||
"a2a_client",
|
||||
"a2a_executor",
|
||||
|
||||
@@ -91,10 +91,6 @@ func expandWithEnv(s string, env map[string]string) string {
|
||||
// loadWorkspaceEnv reads the org root .env and the workspace-specific .env
|
||||
// (workspace overrides org root). Used by both secret injection and channel
|
||||
// config expansion.
|
||||
//
|
||||
// SECURITY: filesDir is sourced from untrusted org YAML input (ws.FilesDir).
|
||||
// resolveInsideRoot guard prevents path traversal (CWE-22) where a malicious
|
||||
// filesDir like "../../../etc" could escape the org root.
|
||||
func loadWorkspaceEnv(orgBaseDir, filesDir string) map[string]string {
|
||||
envVars := map[string]string{}
|
||||
if orgBaseDir == "" {
|
||||
@@ -102,14 +98,7 @@ func loadWorkspaceEnv(orgBaseDir, filesDir string) map[string]string {
|
||||
}
|
||||
parseEnvFile(filepath.Join(orgBaseDir, ".env"), envVars)
|
||||
if filesDir != "" {
|
||||
safeFilesDir, err := resolveInsideRoot(orgBaseDir, filesDir)
|
||||
if err != nil {
|
||||
// Reject traversal attempt silently — callers expect an empty map
|
||||
// on any read failure.
|
||||
log.Printf("loadWorkspaceEnv: rejecting filesDir %q: %v", filesDir, err)
|
||||
return envVars
|
||||
}
|
||||
parseEnvFile(filepath.Join(safeFilesDir, ".env"), envVars)
|
||||
parseEnvFile(filepath.Join(orgBaseDir, filesDir, ".env"), envVars)
|
||||
}
|
||||
return envVars
|
||||
}
|
||||
|
||||
@@ -1,104 +0,0 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestLoadWorkspaceEnv_RejectsTraversal asserts that loadWorkspaceEnv refuses
|
||||
// to read workspace-specific .env files when filesDir contains CWE-22 traversal
|
||||
// patterns (../../../etc, absolute paths, etc.). This is the primary security
|
||||
// control for the ws.FilesDir attack surface in POST /org/import.
|
||||
|
||||
func TestLoadWorkspaceEnv_RejectsTraversal(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
orgRoot := filepath.Join(tmp, "my-org")
|
||||
if err := os.Mkdir(orgRoot, 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
filesDir string
|
||||
}{
|
||||
{"traversal_parent", "../../../etc"},
|
||||
{"traversal_deep", "../../../../../../../../../etc"},
|
||||
{"traversal_sibling", "../sibling"},
|
||||
{"traversal_mixed", "foo/../../bar"},
|
||||
{"absolute_path", "/etc/passwd"},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Write an org-level .env to confirm it loads even when the
|
||||
// workspace .env is rejected.
|
||||
orgEnv := filepath.Join(orgRoot, ".env")
|
||||
if err := os.WriteFile(orgEnv, []byte("ORG_KEY=org-value\n"), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
got := loadWorkspaceEnv(orgRoot, tc.filesDir)
|
||||
|
||||
// Org-level .env must be loaded regardless of workspace rejection.
|
||||
if got["ORG_KEY"] != "org-value" {
|
||||
t.Errorf("org-level .env not loaded: got %v", got)
|
||||
}
|
||||
// Traversal path must NOT have been read.
|
||||
if val, ok := got["TRAVERSAL_KEY"]; ok {
|
||||
t.Errorf("traversal escaped: got TRAVERSAL_KEY=%q", val)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestLoadWorkspaceEnv_HappyPath verifies that legitimate filesDir values
|
||||
// resolve correctly and workspace .env overrides org-level values.
|
||||
|
||||
func TestLoadWorkspaceEnv_HappyPath(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
orgRoot := filepath.Join(tmp, "my-org")
|
||||
wsDir := filepath.Join(orgRoot, "workspaces", "dev-workspace")
|
||||
if err := os.MkdirAll(wsDir, 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
orgEnv := filepath.Join(orgRoot, ".env")
|
||||
wsEnv := filepath.Join(wsDir, ".env")
|
||||
if err := os.WriteFile(orgEnv, []byte("ORG_KEY=org-val\nSHARED=org-wins\n"), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.WriteFile(wsEnv, []byte("WS_KEY=ws-val\nSHARED=ws-wins\n"), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
got := loadWorkspaceEnv(orgRoot, filepath.Join("workspaces", "dev-workspace"))
|
||||
|
||||
if got["ORG_KEY"] != "org-val" {
|
||||
t.Errorf("org-level key missing: %v", got)
|
||||
}
|
||||
if got["WS_KEY"] != "ws-val" {
|
||||
t.Errorf("workspace key missing: %v", got)
|
||||
}
|
||||
if got["SHARED"] != "ws-wins" {
|
||||
t.Errorf("workspace should override org-level: got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestLoadWorkspaceEnv_EmptyFilesDirOnlyLoadsOrgLevel verifies that an empty
|
||||
// filesDir only loads the org-level .env (no workspace override).
|
||||
|
||||
func TestLoadWorkspaceEnv_EmptyFilesDir(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
orgRoot := filepath.Join(tmp, "my-org")
|
||||
if err := os.Mkdir(orgRoot, 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(orgRoot, ".env"), []byte("KEY=only-org\n"), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
got := loadWorkspaceEnv(orgRoot, "")
|
||||
if got["KEY"] != "only-org" {
|
||||
t.Errorf("expected only-org, got %v", got)
|
||||
}
|
||||
}
|
||||
@@ -490,13 +490,8 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, absX
|
||||
// 1. Org root .env (shared defaults)
|
||||
parseEnvFile(filepath.Join(orgBaseDir, ".env"), envVars)
|
||||
// 2. Workspace-specific .env (overrides)
|
||||
// SECURITY: ws.FilesDir is untrusted YAML input — guard against CWE-22
|
||||
// traversal so a crafted filesDir like "../../../etc" cannot escape orgBaseDir.
|
||||
if ws.FilesDir != "" {
|
||||
if safeFilesDir, err := resolveInsideRoot(orgBaseDir, ws.FilesDir); err == nil {
|
||||
parseEnvFile(filepath.Join(safeFilesDir, ".env"), envVars)
|
||||
}
|
||||
// Traversal rejection: silently skip — callers expect partial env on failure.
|
||||
parseEnvFile(filepath.Join(orgBaseDir, ws.FilesDir, ".env"), envVars)
|
||||
}
|
||||
}
|
||||
// Store as workspace secrets via DB (encrypted if key is set, raw otherwise)
|
||||
|
||||
@@ -25,10 +25,11 @@ _WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
|
||||
if not _WORKSPACE_ID_raw:
|
||||
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
|
||||
WORKSPACE_ID = _WORKSPACE_ID_raw
|
||||
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
else:
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
|
||||
# Platform URL: always host.docker.internal inside containers (Docker or not).
|
||||
# The if/else is kept structurally for historical context; both paths now
|
||||
# use the same default — the platform API is only reachable via the Docker
|
||||
# network mesh from inside a workspace container regardless of runtime env.
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
|
||||
|
||||
async def discover(target_id: str) -> dict | None:
|
||||
|
||||
@@ -26,10 +26,11 @@ _WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
|
||||
if not _WORKSPACE_ID_raw:
|
||||
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
|
||||
WORKSPACE_ID = _WORKSPACE_ID_raw
|
||||
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
else:
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
|
||||
# Platform URL: always host.docker.internal inside containers (Docker or not).
|
||||
# The if/else is kept structurally for historical context; both paths now
|
||||
# use the same default — the platform API is only reachable via the Docker
|
||||
# network mesh from inside a workspace container regardless of runtime env.
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
|
||||
# Cache workspace ID → name mappings (populated by list_peers calls)
|
||||
_peer_names: dict[str, str] = {}
|
||||
|
||||
@@ -51,7 +51,6 @@ from shared_runtime import (
|
||||
from executor_helpers import (
|
||||
collect_outbound_files,
|
||||
extract_attached_files,
|
||||
read_delegation_results,
|
||||
)
|
||||
from builtin_tools.telemetry import (
|
||||
A2A_TASK_ID,
|
||||
@@ -216,17 +215,6 @@ class LangGraphA2AExecutor(AgentExecutor):
|
||||
3. Message(final_text) — terminal event
|
||||
"""
|
||||
user_input = extract_message_text(context)
|
||||
# Inject delegation results from prior turns. Heartbeat writes
|
||||
# completed delegation rows to DELEGATION_RESULTS_FILE and sends
|
||||
# a self-message to wake the agent; this consumes the file and
|
||||
# surfaces the results as context so the agent can act on them
|
||||
# without needing an explicit check_task_status call.
|
||||
# Results are prepended so they are visible even when the
|
||||
# self-message text is overwritten by a subsequent user message.
|
||||
pending_results = read_delegation_results()
|
||||
if pending_results:
|
||||
logger.info("A2A execute: injecting %d delegation result(s)", pending_results.count("\n") + 1)
|
||||
user_input = f"[Delegation results available]\n{pending_results}\n\n{user_input}"
|
||||
# Pull attached files from A2A message parts (kind: "file") and
|
||||
# append a manifest to the prompt so the agent knows they exist.
|
||||
# LangGraph tools (filesystem, bash, skills) can then open the
|
||||
|
||||
@@ -77,8 +77,6 @@ async def delegate_task(workspace_id: str, task: str) -> str:
|
||||
return str(result) if isinstance(result, str) else "(no text)"
|
||||
elif "error" in data:
|
||||
err = data["error"]
|
||||
# Handle both string-form errors ("error": "some string")
|
||||
# and object-form errors ("error": {"message": "...", "code": ...}).
|
||||
msg = ""
|
||||
if isinstance(err, dict):
|
||||
msg = err.get("message", "")
|
||||
|
||||
@@ -54,6 +54,19 @@ import httpx
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _platform_url() -> str:
|
||||
"""Return the platform URL, defaulting to host.docker.internal.
|
||||
|
||||
The workspace runtime always runs inside a Docker container, so
|
||||
``localhost`` refers to the container itself, not the platform host.
|
||||
The platform API is only reachable via ``host.docker.internal`` from
|
||||
within a workspace container, regardless of how the container was started.
|
||||
The legacy non-Docker branch is removed (it would have returned
|
||||
``localhost:8080`` which is unreachable from inside the container).
|
||||
"""
|
||||
return os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Constants
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
@@ -79,12 +92,12 @@ async def _fetch_latest_checkpoint(workspace_id: str) -> Optional[dict]:
|
||||
workspace_id: The workspace to query.
|
||||
|
||||
Reads:
|
||||
PLATFORM_URL Platform base URL (default ``http://localhost:8080``).
|
||||
PLATFORM_URL Platform base URL (default ``http://host.docker.internal:8080``).
|
||||
"""
|
||||
try:
|
||||
from platform_auth import auth_headers as _auth_headers # type: ignore[import]
|
||||
|
||||
platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080")
|
||||
platform_url = _platform_url()
|
||||
url = f"{platform_url}/workspaces/{workspace_id}/checkpoints/latest"
|
||||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||
resp = await client.get(url, headers=_auth_headers())
|
||||
@@ -125,12 +138,12 @@ async def _save_checkpoint(
|
||||
payload: Optional JSON-serialisable dict stored as JSONB.
|
||||
|
||||
Reads:
|
||||
PLATFORM_URL Platform base URL (default ``http://localhost:8080``).
|
||||
PLATFORM_URL Platform base URL (default ``http://host.docker.internal:8080``).
|
||||
"""
|
||||
try:
|
||||
from platform_auth import auth_headers as _auth_headers # type: ignore[import]
|
||||
|
||||
platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080")
|
||||
platform_url = _platform_url()
|
||||
url = f"{platform_url}/workspaces/{workspace_id}/checkpoints"
|
||||
body: dict = {
|
||||
"workflow_id": workflow_id,
|
||||
|
||||
@@ -18,10 +18,11 @@ from platform_auth import auth_headers
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
else:
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
|
||||
# Platform URL: always host.docker.internal inside containers (Docker or not).
|
||||
# The if/else is kept structurally for historical context; both paths now
|
||||
# use the same default — the platform API is only reachable via the Docker
|
||||
# network mesh from inside a workspace container regardless of runtime env.
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
_WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
|
||||
if not _WORKSPACE_ID_raw:
|
||||
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
|
||||
|
||||
@@ -22,10 +22,11 @@ from policies.routing import build_team_routing_payload
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
else:
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
|
||||
# Platform URL: always host.docker.internal inside containers (Docker or not).
|
||||
# The if/else is kept structurally for historical context; both paths now
|
||||
# use the same default — the platform API is only reachable via the Docker
|
||||
# network mesh from inside a workspace container regardless of runtime env.
|
||||
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
_WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
|
||||
if not _WORKSPACE_ID_raw:
|
||||
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
|
||||
|
||||
+4
-4
@@ -60,10 +60,10 @@ async def main(): # pragma: no cover
|
||||
config_path = os.environ.get("WORKSPACE_CONFIG_PATH", "/configs")
|
||||
# Docker-aware default — host.docker.internal resolves the platform service
|
||||
# from inside the Docker network mesh; falls back to localhost for local dev.
|
||||
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
|
||||
platform_url = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
else:
|
||||
platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080")
|
||||
# Both branches now use the same default (architectural decision: the platform
|
||||
# API is only reachable via host.docker.internal from within a workspace
|
||||
# container, regardless of how the container was started).
|
||||
platform_url = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
|
||||
awareness_config = get_awareness_config()
|
||||
|
||||
# 0. Initialise OpenTelemetry (no-op if packages not installed)
|
||||
|
||||
@@ -51,6 +51,32 @@ class AdaptorSource:
|
||||
|
||||
def _load_module_from_path(module_name: str, path: Path):
|
||||
"""Import a Python file by absolute path. Returns the module or None on failure."""
|
||||
|
||||
# KI-296: Before exec'ing plugin-adapter files (which import
|
||||
# ``from plugins_registry import ...`` as a top-level name), register
|
||||
# the molecule-runtime subpackage as ``plugins_registry`` in sys.modules.
|
||||
# In the molecule-core workspace source this is already a top-level package,
|
||||
# so the setdefault is a no-op. In the PyPI-installed runtime wheel
|
||||
# (molecule-ai-workspace-runtime 0.1.129+), the package ships as
|
||||
# ``molecule_runtime.plugins_registry`` and without this shim every
|
||||
# plugin adapter would fail with ModuleNotFoundError.
|
||||
import sys as _sys
|
||||
|
||||
if "plugins_registry" not in _sys.modules:
|
||||
try:
|
||||
_mr_pr = __import__("molecule_runtime.plugins_registry", fromlist=[""])
|
||||
_sys.modules["plugins_registry"] = _mr_pr
|
||||
# Also register submodules the adapters commonly import directly.
|
||||
for _sub in ("builtins", "protocol", "raw_drop"):
|
||||
_submod = getattr(_mr_pr, _sub, None)
|
||||
if _submod is not None:
|
||||
_sys.modules[f"plugins_registry.{_sub}"] = _submod
|
||||
except ImportError:
|
||||
# molecule-runtime not installed (e.g. test environment with
|
||||
# workspace/ on sys.path directly) — skip shim; the top-level
|
||||
# workspace/plugins_registry package is already findable.
|
||||
pass
|
||||
|
||||
spec = importlib.util.spec_from_file_location(module_name, path)
|
||||
if spec is None or spec.loader is None:
|
||||
return None
|
||||
|
||||
@@ -1201,94 +1201,3 @@ async def test_terminal_error_routes_via_updater_failed():
|
||||
assert not eq._complete_calls, (
|
||||
"complete() should not fire when execute() raises"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Issue #354 — delegation results auto-resume gap
|
||||
# ---------------------------------------------------------------------------
|
||||
# heartbeat.py's _check_delegations writes completed delegation rows to
|
||||
# DELEGATION_RESULTS_FILE and sends a self-message to wake the agent.
|
||||
# read_delegation_results() in executor_helpers.py atomically reads+consumes
|
||||
# that file. The fix wires this consumer into _core_execute so the agent
|
||||
# receives delegation results as context in the next turn — closing the gap
|
||||
# where parallel delegate_task calls return after the SDK turn ends and the
|
||||
# agent has no way to discover the results.
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delegation_results_injected_into_user_input(monkeypatch):
|
||||
"""When delegation results exist, they are prepended to the user input
|
||||
passed to the agent so the agent can act on them without an explicit
|
||||
check_task_status call."""
|
||||
import a2a_executor
|
||||
from unittest.mock import patch
|
||||
|
||||
pending_results = (
|
||||
"- [completed] Delegation abc123: Checked 3 issues\n"
|
||||
" Response: 3 open, 0 critical\n"
|
||||
"- [failed] Delegation def456: Scan PR #352\n"
|
||||
" Error: peer workspace offline"
|
||||
)
|
||||
|
||||
# Patch read_delegation_results at the module level where a2a_executor
|
||||
# imported it so the _core_execute call picks it up.
|
||||
with patch.object(a2a_executor, "read_delegation_results", return_value=pending_results):
|
||||
agent = MagicMock()
|
||||
agent.astream_events = MagicMock(return_value=_stream(_text_chunk("Got it")))
|
||||
executor = LangGraphA2AExecutor(agent)
|
||||
|
||||
part = MagicMock()
|
||||
part.text = "What's the status?"
|
||||
context = _make_context([part], "ctx-deleg", task_id="task-deleg")
|
||||
eq = _make_event_queue()
|
||||
eq._complete_calls = []
|
||||
eq._failed_calls = []
|
||||
|
||||
await executor.execute(context, eq)
|
||||
|
||||
# Verify the agent received the injected context
|
||||
agent.astream_events.assert_called_once()
|
||||
call_args = agent.astream_events.call_args
|
||||
messages = call_args[0][0]["messages"]
|
||||
|
||||
# The last message should be a human turn with the injected context
|
||||
human_turn = messages[-1]
|
||||
assert human_turn[0] == "human"
|
||||
# Must contain the delegation results marker
|
||||
assert "[Delegation results available]" in human_turn[1]
|
||||
# Must contain the completed delegation
|
||||
assert "abc123" in human_turn[1]
|
||||
assert "3 open" in human_turn[1]
|
||||
# Must contain the failed delegation
|
||||
assert "def456" in human_turn[1]
|
||||
# Must contain the original user message
|
||||
assert "What's the status?" in human_turn[1]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_delegation_results_no_injection(monkeypatch):
|
||||
"""When no delegation results exist, user input is passed through unchanged."""
|
||||
import a2a_executor
|
||||
from unittest.mock import patch
|
||||
|
||||
with patch.object(a2a_executor, "read_delegation_results", return_value=""):
|
||||
agent = MagicMock()
|
||||
agent.astream_events = MagicMock(return_value=_stream(_text_chunk("ok")))
|
||||
executor = LangGraphA2AExecutor(agent)
|
||||
|
||||
part = MagicMock()
|
||||
part.text = "Hello"
|
||||
context = _make_context([part], "ctx-clean", task_id="task-clean")
|
||||
eq = _make_event_queue()
|
||||
eq._complete_calls = []
|
||||
eq._failed_calls = []
|
||||
|
||||
await executor.execute(context, eq)
|
||||
|
||||
agent.astream_events.assert_called_once()
|
||||
call_args = agent.astream_events.call_args
|
||||
messages = call_args[0][0]["messages"]
|
||||
human_turn = messages[-1]
|
||||
assert human_turn[0] == "human"
|
||||
# Must NOT contain the injection marker
|
||||
assert "[Delegation results available]" not in human_turn[1]
|
||||
assert human_turn[1] == "Hello"
|
||||
|
||||
@@ -326,105 +326,6 @@ class TestToolDelegateTask:
|
||||
assert a2a_tools._peer_names.get("ws-nona000") is not None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# delegate_task (non-tool, direct httpx path — used by adapter templates)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDelegateTaskDirect:
|
||||
|
||||
async def test_string_form_error_returns_error_message(self):
|
||||
"""The A2A proxy can return {"error": "plain string"}. Must not raise
|
||||
AttributeError: 'str' object has no attribute 'get'."""
|
||||
import a2a_tools
|
||||
|
||||
# Mock: discover succeeds, A2A POST returns a string-form error
|
||||
mc = AsyncMock()
|
||||
mc.__aenter__ = AsyncMock(return_value=mc)
|
||||
mc.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
async def fake_post(url, **kwargs):
|
||||
r = MagicMock()
|
||||
r.status_code = 200
|
||||
r.json = MagicMock(return_value={"error": "peer workspace unreachable"})
|
||||
return r
|
||||
|
||||
async def fake_get(url, **kwargs):
|
||||
r = MagicMock()
|
||||
r.status_code = 200
|
||||
r.json = MagicMock(return_value={"url": "http://peer.svc/a2a"})
|
||||
return r
|
||||
|
||||
mc.post = fake_post
|
||||
mc.get = fake_get
|
||||
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.delegate_task("ws-peer-123", "do a thing")
|
||||
|
||||
assert "Error" in result
|
||||
assert "peer workspace unreachable" in result
|
||||
|
||||
async def test_dict_form_error_returns_error_message(self):
|
||||
"""{"error": {"message": "...", "code": ...}} — the pre-existing path."""
|
||||
import a2a_tools
|
||||
|
||||
mc = AsyncMock()
|
||||
mc.__aenter__ = AsyncMock(return_value=mc)
|
||||
mc.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
async def fake_post(url, **kwargs):
|
||||
r = MagicMock()
|
||||
r.status_code = 200
|
||||
r.json = MagicMock(return_value={"error": {"message": "internal server error", "code": 500}})
|
||||
return r
|
||||
|
||||
async def fake_get(url, **kwargs):
|
||||
r = MagicMock()
|
||||
r.status_code = 200
|
||||
r.json = MagicMock(return_value={"url": "http://peer.svc/a2a"})
|
||||
return r
|
||||
|
||||
mc.post = fake_post
|
||||
mc.get = fake_get
|
||||
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.delegate_task("ws-peer-456", "do a thing")
|
||||
|
||||
assert "Error" in result
|
||||
assert "internal server error" in result
|
||||
|
||||
async def test_success_returns_result_text(self):
|
||||
"""Happy path: result with parts returns the first text part."""
|
||||
import a2a_tools
|
||||
|
||||
mc = AsyncMock()
|
||||
mc.__aenter__ = AsyncMock(return_value=mc)
|
||||
mc.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
async def fake_post(url, **kwargs):
|
||||
r = MagicMock()
|
||||
r.status_code = 200
|
||||
r.json = MagicMock(return_value={
|
||||
"result": {
|
||||
"parts": [{"kind": "text", "text": "Task done!"}]
|
||||
}
|
||||
})
|
||||
return r
|
||||
|
||||
async def fake_get(url, **kwargs):
|
||||
r = MagicMock()
|
||||
r.status_code = 200
|
||||
r.json = MagicMock(return_value={"url": "http://peer.svc/a2a"})
|
||||
return r
|
||||
|
||||
mc.post = fake_post
|
||||
mc.get = fake_get
|
||||
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.delegate_task("ws-peer-789", "do a thing")
|
||||
|
||||
assert result == "Task done!"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# tool_delegate_task_async
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -325,3 +325,44 @@ def test_resolve_registry_missing_module_falls_through(monkeypatch, tmp_path: Pa
|
||||
monkeypatch.setattr(pr, "_REGISTRY_ROOT", tmp_path / "empty-registry")
|
||||
_, source = pr.resolve("demo-plugin", "test_runtime", plugin_root)
|
||||
assert source == AdaptorSource.RAW_DROP
|
||||
|
||||
|
||||
def test_load_module_from_path_registers_plugins_registry_sys_modules(tmp_path: Path):
|
||||
"""KI-296: _load_module_from_path registers ``plugins_registry`` in sys.modules
|
||||
before exec'ing the adapter, so adapter files that do
|
||||
``from plugins_registry import ...`` resolve correctly when the runtime is
|
||||
installed from the PyPI wheel (where the package ships as
|
||||
``molecule_runtime.plugins_registry`` rather than a top-level ``plugins_registry``).
|
||||
"""
|
||||
import sys as _sys
|
||||
import plugins_registry as pr
|
||||
|
||||
# Create a fake adapter that imports plugins_registry at top level.
|
||||
adapter_file = tmp_path / "fake_runtime_adapter.py"
|
||||
adapter_file.write_text(
|
||||
"from plugins_registry import InstallContext # noqa: F401\n"
|
||||
"from plugins_registry.builtins import AgentskillsAdaptor as Adaptor # noqa: F401\n"
|
||||
)
|
||||
|
||||
# Evict any pre-existing sys.modules entries for the shim keys so the
|
||||
# import inside _load_module_from_path actually runs.
|
||||
_saved = {
|
||||
k: _sys.modules.pop(k, None)
|
||||
for k in (
|
||||
"plugins_registry", "plugins_registry.builtins",
|
||||
"plugins_registry.protocol", "plugins_registry.raw_drop",
|
||||
"_plugin_adaptor.test.fake_runtime",
|
||||
)
|
||||
}
|
||||
|
||||
try:
|
||||
result = pr._load_module_from_path("_plugin_adaptor.test.fake_runtime", adapter_file)
|
||||
assert result is not None, "module should load without ImportError"
|
||||
assert hasattr(result, "Adaptor"), "AgentskillsAdaptor alias should be in namespace"
|
||||
finally:
|
||||
# Restore sys.modules state.
|
||||
for k, v in _saved.items():
|
||||
if v is None:
|
||||
_sys.modules.pop(k, None)
|
||||
else:
|
||||
_sys.modules[k] = v
|
||||
|
||||
Reference in New Issue
Block a user