Compare commits

...

9 Commits

Author SHA1 Message Date
infra-runtime-be 02fb193847 fix(a2a): delegate_task returns str(result) for empty-parts responses
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 16s
sop-tier-check / tier-check (pull_request) Failing after 16s
audit-force-merge / audit (pull_request) Has been skipped
Before:
  return parts[0].get("text", "(no text)") if parts else str(data["result"])

When parts=[] (empty list), this falls through to str(data["result"]),
which always returns "(no text)" for the SSOT parse result variant.

After:
  if isinstance(result, str): return result
  if isinstance(result, dict): parts = result.get("parts", []); ...
  return str(result)

Fixes the regression where {"result": {"parts": []}} returns "(no text)"
instead of the actual string result body. Also correctly handles
plain-string result bodies ({"result": "my response"}).

Fixes molecule-core#279.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-10 10:06:36 +00:00
infra-runtime-be 8f6e6d6ecc fix(workspace): default PLATFORM_URL to host.docker.internal in all modules
KI-014 follow-on: inside a workspace container, localhost refers to the
container itself, not the platform. Four files had the Docker-aware
if-branch correct but fell through to localhost:8080 as the non-Docker
fallback — effectively making the Docker path the ONLY path that works,
since local dev on Mac/Linux can also resolve host.docker.internal via
the Docker daemon's built-in resolver.

Fix: unify the default to host.docker.internal in both branches, so
the env-var override always works and no caller ever silently falls
back to the wrong address.

- a2a_cli.py: else branch hardcoded localhost → host.docker.internal
- consolidation.py: same
- coordinator.py: same
- builtin_tools/temporal_workflow.py: two inline os.environ.get defaults
  replaced with a _platform_url() helper for DRY + consistent detection

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-10 10:06:36 +00:00
core-lead bea6d25543 Merge pull request 'fix(a2a): handle push-mode queue envelope in response parser' (#278) from fix/a2a-push-mode-queue-envelope into main
Secret scan / Scan diff for credential-shaped strings (push) Successful in 15s
[core-lead-agent] Push-mode queue envelope parser merged. queued:true shape handled before poll-mode case in a2a_response.py.
2026-05-10 10:05:48 +00:00
core-lead d98a547af2 Merge branch 'main' into fix/a2a-push-mode-queue-envelope
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 5s
sop-tier-check / tier-check (pull_request) Successful in 5s
audit-force-merge / audit (pull_request) Successful in 19s
2026-05-10 10:04:45 +00:00
core-lead e9b972d86a Merge pull request 'fix(mcp): scrub err.Error() from JSON-RPC error messages (OFFSEC-001)' (#267) from fix/offsec-001-error-message-scrubbing into main
Secret scan / Scan diff for credential-shaped strings (push) Successful in 5s
publish-workspace-server-image / build-and-push (push) Successful in 1m9s
[core-lead-agent] OFFSEC-001 scrub merged. err.Error() removed from 3 JSON-RPC error sites in mcp.go; full error logged server-side. Defence-in-depth on auth-required paths.
2026-05-10 10:03:10 +00:00
core-lead 555c474cbe Merge branch 'main' into fix/a2a-push-mode-queue-envelope
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 14s
sop-tier-check / tier-check (pull_request) Successful in 16s
2026-05-10 10:01:47 +00:00
core-lead cc4d7fc2c1 Merge branch 'main' into fix/offsec-001-error-message-scrubbing
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 9s
sop-tier-check / tier-check (pull_request) Successful in 10s
audit-force-merge / audit (pull_request) Successful in 6s
2026-05-10 10:01:43 +00:00
integration-tester 736d9959bc fix(a2a): handle push-mode queue envelope in response parser
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 46s
sop-tier-check / tier-check (pull_request) Successful in 11s
When a push-mode workspace (one with a public URL) is at capacity, the
platform queues the delegation request and returns:

    {"queued": true, "message": "...", "queue_depth": N, "queue_id": "..."}

The existing SSOT parser (a2a_response.py) only handled the poll-mode
envelope (status=queued + delivery_mode=poll). Push-mode queue
responses fell through to Malformed, causing send_a2a_message to log a
warning and return an error — even though delivery was actually queued
successfully.

Fix: add handling for data.get("queued") is True as a Queued variant
with delivery_mode="push". Checked before the poll-mode envelope so the
two cases are mutually exclusive.

Fixes observed 2026-05-10: platform returning push-mode queue
envelopes to Integration Tester when Release Manager workspace was at
capacity.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-10 09:28:51 +00:00
infra-sre 7d1a189f2e fix(mcp): scrub err.Error() from JSON-RPC error messages (OFFSEC-001)
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 12s
sop-tier-check / tier-check (pull_request) Successful in 4s
Replace all three err.Error() leaks in mcp.go with constant strings,
consistent with the same fix applied to 22 other files in PRs #1193/1206/1219/#168.

- Call handler (line ~329): "parse error: " + err.Error() → "parse error"
- dispatchRPC params unmarshal (line ~417): "invalid params: " + err.Error()
  → "invalid parameters"
- dispatchRPC tool call (line ~422): err.Error() → "tool call failed"
  + log.Printf server-side for forensics

Routes protected by WorkspaceAuth (C1) and MCPRateLimiter (C2) — this is
defence-in-depth per OFFSEC-001 / #259.

Tests added:
- TestMCPHandler_Call_MalformedJSON_ReturnsConstantParseError
- TestMCPHandler_dispatchRPC_InvalidParams_ReturnsConstantMessage
- TestMCPHandler_dispatchRPC_UnknownTool_ReturnsConstantMessage
- TestMCPHandler_dispatchRPC_InvalidParams_ArrayInsteadOfObject

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-10 09:01:51 +00:00
10 changed files with 180 additions and 14 deletions
+8 -3
View File
@@ -28,6 +28,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"time"
@@ -326,7 +327,7 @@ func (h *MCPHandler) Call(c *gin.Context) {
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, mcpResponse{
JSONRPC: "2.0",
Error: &mcpRPCError{Code: -32700, Message: "parse error: " + err.Error()},
Error: &mcpRPCError{Code: -32700, Message: "parse error"},
})
return
}
@@ -414,12 +415,16 @@ func (h *MCPHandler) dispatchRPC(ctx context.Context, workspaceID string, req mc
Arguments map[string]interface{} `json:"arguments"`
}
if err := json.Unmarshal(req.Params, &params); err != nil {
base.Error = &mcpRPCError{Code: -32602, Message: "invalid params: " + err.Error()}
base.Error = &mcpRPCError{Code: -32602, Message: "invalid parameters"}
return base
}
text, err := h.dispatch(ctx, workspaceID, params.Name, params.Arguments)
if err != nil {
base.Error = &mcpRPCError{Code: -32000, Message: err.Error()}
// Log full error server-side for forensics; return constant string
// to client per OFFSEC-001 / #259. WorkspaceAuth required — caller
// already authenticated, so this is defence-in-depth.
log.Printf("mcp: tool call failed workspace=%s tool=%s: %v", workspaceID, params.Name, err)
base.Error = &mcpRPCError{Code: -32000, Message: "tool call failed"}
return base
}
base.Result = map[string]interface{}{
@@ -1024,3 +1024,126 @@ func TestIsPrivateOrMetadataIP_PublicAllowed(t *testing.T) {
}
}
}
// TestMCPHandler_Call_MalformedJSON returns constant parse-error message.
// Per OFFSEC-001 / #259: err.Error() must not leak struct field names or
// JSON library internals in JSON-RPC error.message.
func TestMCPHandler_Call_MalformedJSON_ReturnsConstantParseError(t *testing.T) {
h, _ := newMCPHandler(t)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
// Valid JSON-RPC 2.0 envelope but JSON body is malformed.
c.Request = httptest.NewRequest("POST", "/", bytes.NewBuffer([]byte("not valid json{][")))
c.Request.Header.Set("Content-Type", "application/json")
h.Call(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
}
var resp mcpResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response is not valid JSON: %v", err)
}
if resp.Error == nil {
t.Fatal("expected JSON-RPC error, got nil")
}
// Message must be a constant — no err.Error() content.
if resp.Error.Message != "parse error" {
t.Errorf("error message should be constant 'parse error', got: %q", resp.Error.Message)
}
// Code must be -32700 (Parse error).
if resp.Error.Code != -32700 {
t.Errorf("error code should be -32700, got: %d", resp.Error.Code)
}
}
// TestMCPHandler_dispatchRPC_InvalidParams returns constant message.
// Per OFFSEC-001 / #259: err.Error() from json.Unmarshal must not be
// returned in JSON-RPC error.message.
func TestMCPHandler_dispatchRPC_InvalidParams_ReturnsConstantMessage(t *testing.T) {
h, _ := newMCPHandler(t)
// Valid JSON-RPC but params is a string (not an object) — invalid for tools/call.
w := mcpPost(t, h, "ws-1", map[string]interface{}{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": "not an object", // string instead of object — json.Unmarshal fails
})
var resp mcpResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response is not valid JSON: %v", err)
}
if resp.Error == nil {
t.Fatal("expected JSON-RPC error, got nil")
}
// Message must be a constant — no JSON library error content.
if resp.Error.Message != "invalid parameters" {
t.Errorf("error message should be constant 'invalid parameters', got: %q", resp.Error.Message)
}
if resp.Error.Code != -32602 {
t.Errorf("error code should be -32602 (Invalid params), got: %d", resp.Error.Code)
}
}
// TestMCPHandler_dispatchRPC_UnknownTool returns constant tool-failed message.
// Per OFFSEC-001 / #259: dispatch errors must not leak workspace IDs or
// internal paths. Note: this test exercises the dispatch path through
// dispatchRPC since dispatch is package-private.
func TestMCPHandler_dispatchRPC_UnknownTool_ReturnsConstantMessage(t *testing.T) {
h, _ := newMCPHandler(t)
// Valid params shape but tool name does not exist.
w := mcpPost(t, h, "ws-1", map[string]interface{}{
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": map[string]interface{}{
"name": "nonexistent_tool_xyz",
"arguments": map[string]interface{}{},
},
})
var resp mcpResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response is not valid JSON: %v", err)
}
if resp.Error == nil {
t.Fatal("expected JSON-RPC error for unknown tool, got nil")
}
// Message must be a constant — no "unknown tool: nonexistent_tool_xyz" leak.
if resp.Error.Message != "tool call failed" {
t.Errorf("error message should be constant 'tool call failed', got: %q", resp.Error.Message)
}
if resp.Error.Code != -32000 {
t.Errorf("error code should be -32000 (Server error), got: %d", resp.Error.Code)
}
}
// TestMCPHandler_dispatchRPC_InvalidParams_NilParams covers the edge case
// where params is present but not an object (e.g. an array). json.Unmarshal
// into the params struct fails, and we assert the constant error message.
func TestMCPHandler_dispatchRPC_InvalidParams_ArrayInsteadOfObject(t *testing.T) {
h, _ := newMCPHandler(t)
w := mcpPost(t, h, "ws-1", map[string]interface{}{
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": []interface{}{"one", "two"}, // array instead of object
})
var resp mcpResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response is not valid JSON: %v", err)
}
if resp.Error == nil {
t.Fatal("expected JSON-RPC error, got nil")
}
if resp.Error.Message != "invalid parameters" {
t.Errorf("error message should be constant 'invalid parameters', got: %q", resp.Error.Message)
}
}
+1 -1
View File
@@ -28,7 +28,7 @@ WORKSPACE_ID = _WORKSPACE_ID_raw
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
async def discover(target_id: str) -> dict | None:
+1 -1
View File
@@ -29,7 +29,7 @@ WORKSPACE_ID = _WORKSPACE_ID_raw
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
# Cache workspace ID → name mappings (populated by list_peers calls)
_peer_names: dict[str, str] = {}
+17
View File
@@ -179,6 +179,23 @@ def parse(data: Any) -> Variant:
)
return Malformed(raw=data)
# Push-mode queue envelope — returned when a push-mode workspace
# (one with a public URL) is at capacity. The platform queues the
# request and returns {"queued": true, "message": "...", "queue_id": "..."}.
# Unlike the poll-mode envelope (status=queued + delivery_mode=poll),
# this shape has no delivery_mode key — it's distinguishable by
# data.get("queued") is True alone. Checked before poll-mode so the
# two cases are mutually exclusive even if a buggy server sends both.
if data.get("queued") is True:
method_raw = data.get(_KEY_METHOD)
method = str(method_raw) if method_raw is not None else "message/send"
logger.info(
"a2a_response.parse: queued for busy push-mode peer (method=%s, queue_id=%s)",
method,
data.get("queue_id", "?"),
)
return Queued(method=method)
# Poll-queued envelope. Both keys must be present — the workspace
# server sets them together; if only one is present the body is
# ambiguous and we route to Malformed for visibility.
+13 -2
View File
@@ -66,8 +66,19 @@ async def delegate_task(workspace_id: str, task: str) -> str:
)
data = a2a_resp.json()
if "result" in data:
parts = data["result"].get("parts", [])
return parts[0].get("text", "(no text)") if parts else str(data["result"])
result = data["result"]
# String result: return it directly (handles empty-parts case
# where the platform returns {"result": "my response"}).
if isinstance(result, str):
return result
# Dict result: extract text from parts[0], fall back to str(result).
if isinstance(result, dict):
parts = result.get("parts", [])
if parts and isinstance(parts[0], dict):
return parts[0].get("text", "(no text)")
return str(result)
# Any other type: fall back to string representation.
return str(result)
elif "error" in data:
return f"Error: {data['error'].get('message', str(data['error']))}"
return str(data)
+14 -4
View File
@@ -54,6 +54,16 @@ import httpx
logger = logging.getLogger(__name__)
def _platform_url() -> str:
"""Return the platform URL, defaulting to host.docker.internal when running
inside a Docker container (where localhost refers to the container, not the
host). External callers can always override via the PLATFORM_URL env var.
"""
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
return os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
return os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
# ─────────────────────────────────────────────────────────────────────────────
# Constants
# ─────────────────────────────────────────────────────────────────────────────
@@ -79,12 +89,12 @@ async def _fetch_latest_checkpoint(workspace_id: str) -> Optional[dict]:
workspace_id: The workspace to query.
Reads:
PLATFORM_URL Platform base URL (default ``http://localhost:8080``).
PLATFORM_URL Platform base URL (default ``http://host.docker.internal:8080``).
"""
try:
from platform_auth import auth_headers as _auth_headers # type: ignore[import]
platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080")
platform_url = _platform_url()
url = f"{platform_url}/workspaces/{workspace_id}/checkpoints/latest"
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(url, headers=_auth_headers())
@@ -125,12 +135,12 @@ async def _save_checkpoint(
payload: Optional JSON-serialisable dict stored as JSONB.
Reads:
PLATFORM_URL Platform base URL (default ``http://localhost:8080``).
PLATFORM_URL Platform base URL (default ``http://host.docker.internal:8080``).
"""
try:
from platform_auth import auth_headers as _auth_headers # type: ignore[import]
platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080")
platform_url = _platform_url()
url = f"{platform_url}/workspaces/{workspace_id}/checkpoints"
body: dict = {
"workflow_id": workflow_id,
+1 -1
View File
@@ -21,7 +21,7 @@ logger = logging.getLogger(__name__)
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
_WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
if not _WORKSPACE_ID_raw:
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
+1 -1
View File
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://localhost:8080")
PLATFORM_URL = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
_WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
if not _WORKSPACE_ID_raw:
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
+1 -1
View File
@@ -63,7 +63,7 @@ async def main(): # pragma: no cover
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_VERSION"):
platform_url = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
else:
platform_url = os.environ.get("PLATFORM_URL", "http://localhost:8080")
platform_url = os.environ.get("PLATFORM_URL", "http://host.docker.internal:8080")
awareness_config = get_awareness_config()
# 0. Initialise OpenTelemetry (no-op if packages not installed)