Compare commits

..

4 Commits

Author SHA1 Message Date
core-devops 3e38a885a4 fix(ci): use GITHUB_EVENT_BEFORE env var in detect-changes push job
CI / Platform (Go) (pull_request) Blocked by required conditions
CI / Canvas (Next.js) (pull_request) Blocked by required conditions
CI / Shellcheck (E2E scripts) (pull_request) Blocked by required conditions
CI / Canvas Deploy Reminder (pull_request) Blocked by required conditions
CI / Python Lint & Test (pull_request) Blocked by required conditions
CI / all-required (pull_request) Blocked by required conditions
E2E API Smoke Test / E2E API Smoke Test (pull_request) Blocked by required conditions
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Blocked by required conditions
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Blocked by required conditions
Harness Replays / Harness Replays (pull_request) Blocked by required conditions
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Blocked by required conditions
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 17s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Has been skipped
CI / Detect changes (pull_request) Successful in 33s
E2E API Smoke Test / detect-changes (pull_request) Successful in 34s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 32s
Harness Replays / detect-changes (pull_request) Successful in 15s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 21s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 8s
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Successful in 45s
publish-runtime-autobump / bump-and-tag (pull_request) Has been skipped
publish-runtime-autobump / pr-validate (pull_request) Successful in 47s
review-check-tests / review-check.sh regression tests (pull_request) Successful in 14s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m13s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 17s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 19s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m36s
gate-check-v3 / gate-check (pull_request) Successful in 11s
qa-review / approved (pull_request) Successful in 8s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m36s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m36s
sop-checklist-gate / gate (pull_request) Successful in 14s
security-review / approved (pull_request) Successful in 15s
sop-tier-check / tier-check (pull_request) Successful in 16s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Successful in 1m46s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m52s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 1m17s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Successful in 5m28s
audit-force-merge / audit (pull_request) Failing after 11m52s
mc#917 root fix.

Gitea Actions does not expose github.event.before as a ${{ }} template
expression that resolves in shell scripts for push events — it silently
becomes an empty string. This caused `git cat-file -e ""` to hang
indefinitely on some runner configurations (10m timeout was masking the
failure via continue-on-error: true).

Fix: use GITHUB_EVENT_BEFORE env var (set by the runner for push
events) instead of the broken template expression. Also guard both
`git cat-file -e` calls with `timeout 30` to prevent future hangs if
BASE is ever malformed.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-14 01:24:16 +00:00
infra-runtime-be 9f3948dc3a test(a2a_mcp_server): add 5 tool-branch coverage cases to HTTP transport tests
CI / Canvas Deploy Reminder (pull_request) Blocked by required conditions
CI / all-required (pull_request) Blocked by required conditions
CI / Platform (Go) (pull_request) Blocked by required conditions
CI / Canvas (Next.js) (pull_request) Blocked by required conditions
CI / Shellcheck (E2E scripts) (pull_request) Blocked by required conditions
CI / Python Lint & Test (pull_request) Blocked by required conditions
E2E API Smoke Test / E2E API Smoke Test (pull_request) Blocked by required conditions
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Blocked by required conditions
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Blocked by required conditions
Harness Replays / Harness Replays (pull_request) Blocked by required conditions
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Blocked by required conditions
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 9s
E2E API Smoke Test / detect-changes (pull_request) Successful in 16s
CI / Detect changes (pull_request) Successful in 16s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Has been skipped
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 15s
Harness Replays / detect-changes (pull_request) Successful in 7s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 11s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 10s
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Successful in 44s
publish-runtime-autobump / bump-and-tag (pull_request) Has been skipped
publish-runtime-autobump / pr-validate (pull_request) Successful in 36s
review-check-tests / review-check.sh regression tests (pull_request) Successful in 10s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m26s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 14s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 14s
security-review / approved (pull_request) Successful in 9s
qa-review / approved (pull_request) Successful in 9s
gate-check-v3 / gate-check (pull_request) Successful in 9s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m18s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Successful in 1m31s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m10s
sop-checklist-gate / gate (pull_request) Successful in 10s
sop-tier-check / tier-check (pull_request) Successful in 13s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m28s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m34s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 1m13s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Successful in 5m23s
Cover remaining elif branches in handle_tool_call:
- send_message_to_user: mixed-type attachments are filtered (line 116)
- wait_for_message: dispatched with timeout_secs argument
- inbox_peek: dispatched with limit argument
- inbox_pop: dispatched with activity_id argument
- chat_history: dispatched with peer_id/limit/before_ts arguments

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-13 10:15:26 +00:00
infra-runtime-be c4deda1035 test(builtin_tools): add 16-case coverage for _redact_secrets (C2, #834)
Bring builtin_tools/security._redact_secrets from 58% to 100% coverage.
Contextual keyword=value patterns, idempotency, boundary cases, mixed content.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-13 10:15:26 +00:00
infra-runtime-be 0dbda533fb feat(workspace): add HTTP/SSE transport to a2a_mcp_server
Port HTTP/SSE transport (from workspace-runtime PR #16) to the canonical
monorepo source. Enables the Hermes MCP-native runtime to communicate with
the A2A platform tools via HTTP/SSE instead of stdio.

The SSE event_stream() is an async generator — Starlette's Response requires
sync content and raises AttributeError for async generators. Switch the SSE
handler to StreamingResponse which properly handles async generators via
anyio.create_task_group (Starlette 1.0.0).

Adds test_a2a_mcp_server_http.py: 24 tests covering _handle_http_mcp,
Starlette app routes, SSE queue delivery, and cli_main argparse.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-13 10:15:26 +00:00
43 changed files with 1212 additions and 1394 deletions
-165
View File
@@ -1,165 +0,0 @@
name: MCP Stdio Transport Regression
# Regression test for molecule-ai-workspace-runtime#61:
# asyncio.connect_read_pipe / connect_write_pipe fail with
# ValueError: "Pipe transport is only for pipes, sockets and character devices"
# when stdout is a regular file (openclaw capture, CI tee, debugging).
#
# This workflow reproduces the exact failure mode and verifies the
# fallback to direct buffer I/O works. It runs on every PR that
# touches the MCP server or this workflow, plus nightly cron.
#
# Why a separate workflow (not folded into ci.yml python-lint):
# - The test needs to spawn the MCP server with stdout redirected
# to a regular file (not a TTY/pipe), which conflicts with
# pytest's own capture mechanism.
# - It exercises the actual process spawn path (python a2a_mcp_server.py)
# not just unit-test mocks — closer to the real openclaw integration.
# - A dedicated workflow surfaces stdio-specific regressions without
# coupling to the broader Python test suite's coverage gate.
on:
pull_request:
branches: [main, staging]
paths:
- 'workspace/a2a_mcp_server.py'
- 'workspace/mcp_cli.py'
- 'workspace/tests/test_a2a_mcp_server.py'
- '.gitea/workflows/ci-mcp-stdio-transport.yml'
push:
branches: [main, staging]
paths:
- 'workspace/a2a_mcp_server.py'
- 'workspace/mcp_cli.py'
- 'workspace/tests/test_a2a_mcp_server.py'
- '.gitea/workflows/ci-mcp-stdio-transport.yml'
schedule:
# Nightly at 04:00 UTC — catches drift from dependency updates
# (e.g. asyncio behavior changes in new Python patch releases).
- cron: '0 4 * * *'
concurrency:
group: mcp-stdio-${{ github.ref }}
cancel-in-progress: true
env:
GITHUB_SERVER_URL: https://git.moleculesai.app
jobs:
# bp-exempt: regression canary for runtime#61; not a merge gate — informational only until promoted to required.
# mc#774: continue-on-error mask — new workflow, flip to false once it's green on ≥3 consecutive main runs.
mcp-stdio-regular-file:
name: MCP stdio with regular-file stdout
runs-on: ubuntu-latest
continue-on-error: true # mc#774
timeout-minutes: 5
env:
WORKSPACE_ID: "00000000-0000-0000-0000-000000000001"
defaults:
run:
working-directory: workspace
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
with:
python-version: '3.11'
cache: pip
cache-dependency-path: workspace/requirements.txt
- run: pip install -r requirements.txt pytest pytest-asyncio pytest-cov
- name: Reproduce runtime#61 — stdout as regular file
run: |
set -euo pipefail
echo "=== Reproducing molecule-ai-workspace-runtime#61 ==="
echo ""
echo "Before the fix, this command would fail with:"
echo ' ValueError: Pipe transport is only for pipes, sockets and character devices'
echo ""
# Spawn the MCP server with stdout redirected to a regular file.
# This is exactly what openclaw does when capturing MCP output.
OUTPUT=$(mktemp)
trap 'rm -f "$OUTPUT"' EXIT
# Send initialize request, then tools/list, then exit
{
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}'
echo '{"jsonrpc":"2.0","id":2,"method":"tools/list"}'
} | python a2a_mcp_server.py > "$OUTPUT" 2>&1 || {
RC=$?
echo "FAIL: MCP server exited with code $RC"
echo "--- stdout+stderr ---"
cat "$OUTPUT"
exit 1
}
echo "PASS: MCP server handled regular-file stdout without crashing"
echo ""
echo "--- Output (first 20 lines) ---"
head -20 "$OUTPUT"
echo ""
# Verify we got valid JSON-RPC responses
if grep -q '"result"' "$OUTPUT"; then
echo "PASS: JSON-RPC responses found in output"
else
echo "FAIL: No JSON-RPC responses in output"
cat "$OUTPUT"
exit 1
fi
- name: Reproduce runtime#61 — stdin from regular file
run: |
set -euo pipefail
echo "=== stdin as regular file (CI tee / capture pattern) ==="
INPUT=$(mktemp)
OUTPUT=$(mktemp)
trap 'rm -f "$INPUT" "$OUTPUT"' EXIT
cat > "$INPUT" <<'EOF'
{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}
{"jsonrpc":"2.0","id":2,"method":"tools/list"}
EOF
python a2a_mcp_server.py < "$INPUT" > "$OUTPUT" 2>&1 || {
RC=$?
echo "FAIL: MCP server exited with code $RC"
cat "$OUTPUT"
exit 1
}
echo "PASS: MCP server handled regular-file stdin without crashing"
if grep -q '"result"' "$OUTPUT"; then
echo "PASS: JSON-RPC responses found in output"
else
echo "FAIL: No JSON-RPC responses in output"
cat "$OUTPUT"
exit 1
fi
- name: Verify warning is emitted for non-pipe stdio
run: |
set -euo pipefail
echo "=== Verify diagnostic warning ==="
OUTPUT=$(mktemp)
trap 'rm -f "$OUTPUT"' EXIT
{
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}'
} | python a2a_mcp_server.py > "$OUTPUT" 2>&1
# The warning should mention "not a pipe" for operator visibility
if grep -qi "not a pipe" "$OUTPUT"; then
echo "PASS: Diagnostic warning emitted for non-pipe stdio"
else
echo "NOTE: No warning in output (may be suppressed by log level)"
fi
- name: Run unit tests for stdio transport
run: |
set -euo pipefail
echo "=== Running stdio transport unit tests ==="
python -m pytest tests/test_a2a_mcp_server.py::TestStdioPipeAssertion -v --no-cov
+11 -3
View File
@@ -66,19 +66,27 @@ jobs:
# PR#372's ci.yml port used. Diffs against the PR base or the
# previous push SHA, then matches against the wheel-relevant
# path set.
BASE="${GITHUB_BASE_REF:-${{ github.event.before }}}"
#
# Root fix (mc#917): Gitea Actions does not expose github.event.before
# as a ${{ }} template-expression that resolves in shell scripts for
# push events (it becomes empty string). The env var GITHUB_EVENT_BEFORE
# IS set by the runner for push events. Guard git cat-file with
# `timeout 30` to prevent indefinite hangs on malformed BASE values.
if [ "${{ github.event_name }}" = "pull_request" ] && [ -n "${{ github.event.pull_request.base.sha }}" ]; then
BASE="${{ github.event.pull_request.base.sha }}"
else
BASE="${GITHUB_EVENT_BEFORE:-}"
fi
if [ -z "$BASE" ] || echo "$BASE" | grep -qE '^0+$'; then
# New branch or no previous SHA: treat as wheel-relevant.
echo "wheel=true" >> "$GITHUB_OUTPUT"
exit 0
fi
if ! git cat-file -e "$BASE" 2>/dev/null; then
if ! timeout 30 git cat-file -e "$BASE" 2>/dev/null; then
git fetch --depth=1 origin "$BASE" 2>/dev/null || true
fi
if ! git cat-file -e "$BASE" 2>/dev/null; then
if ! timeout 30 git cat-file -e "$BASE" 2>/dev/null; then
echo "::notice::BASE=$BASE not in local clone (shallow fetch or pruned ref)"
echo "wheel=true" >> "$GITHUB_OUTPUT"
exit 0
fi
+12 -9
View File
@@ -28,16 +28,15 @@
#
# Environment variables:
# SOP_DEBUG=1 — per-API-call diagnostic lines. Default: off.
# SOP_LEGACY_CHECK=1 — revert to OR-gate for this run. Intended for
# emergency use only; burn-in window closed
# 2026-05-17 (internal#189 Phase 1).
# SOP_LEGACY_CHECK=1 — revert to OR-gate for this run. Grace window
# for PRs in-flight when AND-composition deployed.
# Burn-in: remove after 2026-05-17 (7-day window).
#
# BURN-IN CLOSED 2026-05-17 (internal#189 Phase 1): The 7-day burn-in
# window closed. continue-on-error: true has been removed from the
# tier-check job; AND-composition is now fully enforced. If you need
# to temporarily re-introduce a mask, file a tracker and follow the
# mc#774 protocol (Tier 2e lint requires a current tracker within
# 2 lines of any continue-on-error: true).
# BURN-IN NOTE (internal#189 Phase 1): continue-on-error: true is set on
# the tier-check job below. This prevents AND-composition from blocking
# PRs during the 7-day burn-in. After 2026-05-17:
# 1. Remove `continue-on-error: true` from this job block.
# 2. Update this BURN-IN NOTE comment to mark the window closed.
name: sop-tier-check
@@ -64,6 +63,10 @@ on:
jobs:
tier-check:
runs-on: ubuntu-latest
# BURN-IN: continue-on-error prevents AND-composition from blocking
# PRs during the 7-day window. Remove after 2026-05-17 (mc#774).
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
permissions:
contents: read
pull-requests: read
@@ -80,7 +80,6 @@ export function CreateWorkspaceButton() {
// isExternal is true the template / model / hermes-provider fields are
// hidden (they're meaningless for BYO-compute agents).
const [isExternal, setIsExternal] = useState(false);
const [externalRuntime, setExternalRuntime] = useState("external");
const [externalConnection, setExternalConnection] =
useState<ExternalConnectionInfo | null>(null);
@@ -224,7 +223,6 @@ export function CreateWorkspaceButton() {
setBudgetLimit("");
setError(null);
setHermesProvider("anthropic");
setExternalRuntime("external");
setHermesApiKey("");
setHermesModel("");
api
@@ -284,7 +282,7 @@ export function CreateWorkspaceButton() {
// Runtime=external flips the backend into awaiting-agent mode:
// no container provisioning, token minted, connection payload
// returned in the response for the modal below.
...(isExternal ? { runtime: externalRuntime } : {}),
...(isExternal ? { runtime: "external" } : {}),
...(!isExternal && isHermes && provider
? {
secrets: { [provider.envVar]: hermesApiKey.trim() },
@@ -384,23 +382,6 @@ export function CreateWorkspaceButton() {
</div>
</label>
{isExternal && (
<div>
<label className="text-[11px] text-ink-mid block mb-1">
External Runtime
</label>
<select
value={externalRuntime}
onChange={(e) => setExternalRuntime(e.target.value)}
className="w-full bg-surface-card/60 border border-line/50 rounded-lg px-3 py-2 text-sm text-ink focus:outline-none focus:border-accent/60 focus:ring-1 focus:ring-accent/20 transition-colors"
>
<option value="external">Generic External</option>
<option value="kimi">Kimi CLI</option>
<option value="kimi-cli">Kimi CLI (alt)</option>
</select>
</div>
)}
{!isExternal && (
<InputField
label="Template"
+1 -22
View File
@@ -18,7 +18,7 @@
import { useCallback, useState } from "react";
import * as Dialog from "@radix-ui/react-dialog";
type Tab = "python" | "curl" | "claude" | "mcp" | "hermes" | "codex" | "openclaw" | "kimi" | "fields";
type Tab = "python" | "curl" | "claude" | "mcp" | "hermes" | "codex" | "openclaw" | "fields";
export interface ExternalConnectionInfo {
workspace_id: string;
@@ -58,10 +58,6 @@ export interface ExternalConnectionInfo {
// openclaw gateway on loopback. Outbound-tools-only today; push
// parity on an external openclaw needs a sessions.steer bridge.
openclaw_snippet?: string;
// Kimi CLI setup snippet — self-contained Python heartbeat script
// that keeps a Kimi workspace online in poll mode. Optional for
// backward compat with platforms that haven't shipped the Kimi tab.
kimi_snippet?: string;
}
interface Props {
@@ -154,11 +150,6 @@ export function ExternalConnectModal({ info, onClose }: Props) {
'WORKSPACE_TOKEN="<paste from create response>"',
`WORKSPACE_TOKEN="${info.auth_token}"`,
);
// Kimi snippet carries the placeholder inside the shell heredoc.
const filledKimi = info.kimi_snippet?.replace(
'MOLECULE_WORKSPACE_TOKEN=<paste from create response>',
`MOLECULE_WORKSPACE_TOKEN=${info.auth_token}`,
);
return (
<Dialog.Root open onOpenChange={(o) => !o && onClose()}>
@@ -198,7 +189,6 @@ export function ExternalConnectModal({ info, onClose }: Props) {
if (filledHermes) tabs.push("hermes");
if (filledCodex) tabs.push("codex");
if (filledOpenClaw) tabs.push("openclaw");
if (filledKimi) tabs.push("kimi");
tabs.push("curl", "fields");
return tabs;
})().map((t) => (
@@ -222,8 +212,6 @@ export function ExternalConnectModal({ info, onClose }: Props) {
? "Codex"
: t === "openclaw"
? "OpenClaw"
: t === "kimi"
? "Kimi"
: t === "python"
? "Python SDK"
: t === "mcp"
@@ -300,15 +288,6 @@ export function ExternalConnectModal({ info, onClose }: Props) {
onCopy={() => copy(filledOpenClaw, "openclaw")}
/>
)}
{tab === "kimi" && filledKimi && (
<SnippetBlock
value={filledKimi}
label="Kimi CLI — self-contained Python bridge. Registers, heartbeats, polls for canvas messages, and echoes replies back. NAT-safe (no public URL). Run in a background terminal or via launchd."
copyKey="kimi"
copied={copiedKey === "kimi"}
onCopy={() => copy(filledKimi, "kimi")}
/>
)}
{tab === "fields" && (
<div className="space-y-2">
<Field label="workspace_id" value={info.workspace_id} onCopy={() => copy(info.workspace_id, "wsid")} copied={copiedKey === "wsid"} />
+1 -2
View File
@@ -9,7 +9,6 @@ import { Tooltip } from "@/components/Tooltip";
import { STATUS_CONFIG, TIER_CONFIG } from "@/lib/design-tokens";
import { useOrgDeployState } from "@/components/canvas/useOrgDeployState";
import { OrgCancelButton } from "@/components/canvas/OrgCancelButton";
import { isExternalLikeRuntime } from "@/lib/externalRuntimes";
/** Descendant count for the "N sub" badge — children are first-class nodes
* rendered as full cards inside this one via React Flow's native parentId,
@@ -249,7 +248,7 @@ export function WorkspaceNode({ id, data }: NodeProps<Node<WorkspaceNodeData>>)
if (!runtime) return null;
return (
<div className="mb-1 flex items-center gap-1">
{isExternalLikeRuntime(runtime) ? (
{runtime === "external" ? (
<span
className="text-[7px] font-mono px-1.5 py-0.5 rounded-md text-white bg-violet-600 border border-violet-700"
title="Phase 30 remote agent — runs outside this platform's Docker network. Lifecycle managed via heartbeat-based polling, not Docker exec."
@@ -7,7 +7,7 @@
* itself (MemoryInspectorPanel) requires full API + store mocking and
* is exercised by the existing MemoryTab.test.tsx.
*/
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { describe, it, expect } from "vitest";
import { isPluginUnavailableError, formatTTL } from "../MemoryInspectorPanel";
// formatRelativeTime is not exported — tested via the component in MemoryTab.test.tsx
@@ -47,9 +47,6 @@ describe("isPluginUnavailableError", () => {
});
describe("formatTTL", () => {
beforeEach(() => { vi.useFakeTimers(); });
afterEach(() => { vi.useRealTimers(); });
it("returns '' for null", () => {
expect(formatTTL(null)).toBe("");
});
@@ -20,7 +20,6 @@ import { MobileMe } from "./MobileMe";
import { MobileSpawn } from "./MobileSpawn";
import { usePalette } from "./palette";
import { MobileAccentProvider } from "./palette-context";
import { SearchDialog } from "@/components/SearchDialog";
type Route = "home" | "canvas" | "detail" | "chat" | "comms" | "me";
@@ -205,8 +204,6 @@ export function MobileApp() {
{showTabBar && <TabBar dark={dark} active={activeTab} onChange={onTabChange} />}
{showSpawn && <MobileSpawn dark={dark} onClose={() => setShowSpawn(false)} />}
<SearchDialog />
</main>
</MobileAccentProvider>
);
+1 -2
View File
@@ -17,7 +17,6 @@ import {
usePalette,
} from "./palette";
import { Icons, StatusDot, TierChip } from "./primitives";
import { isExternalLikeRuntime } from "@/lib/externalRuntimes";
// Derived view-model the mobile screens consume. Built once per render
// from the store's Node<WorkspaceNodeData>.
@@ -38,7 +37,7 @@ export interface MobileAgent {
export function toMobileAgent(node: Node<WorkspaceNodeData>): MobileAgent {
const cap = summarizeWorkspaceCapabilities(node.data);
const runtime = cap.runtime ?? "unknown";
const remote = isExternalLikeRuntime(runtime);
const remote = runtime === "external";
return {
id: node.id,
name: node.data.name || node.id,
+2 -3
View File
@@ -13,7 +13,6 @@ import {
findProviderForModel,
type SelectorValue,
} from "../ProviderModelSelector";
import { isExternalLikeRuntime } from "@/lib/externalRuntimes";
interface Props {
workspaceId: string;
@@ -176,7 +175,7 @@ function deriveProvidersFromModels(models: ModelSpec[]): string[] {
// exactly the point of the platform adaptor. The deep `~/.hermes/
// config.yaml` on the container is a separate runtime-internal file,
// not this one.
const RUNTIMES_WITH_OWN_CONFIG = new Set<string>(["external", "kimi", "kimi-cli"]);
const RUNTIMES_WITH_OWN_CONFIG = new Set<string>(["external"]);
const FALLBACK_RUNTIME_OPTIONS: RuntimeOption[] = [
{ value: "", label: "LangGraph (default)", models: [], providers: [] },
@@ -1004,7 +1003,7 @@ export function ConfigTab({ workspaceId }: Props) {
: "This runtime manages its own config outside the platform template."}
</div>
)}
{!error && isExternalLikeRuntime(config.runtime) && (
{!error && config.runtime === "external" && (
<ExternalConnectionSection workspaceId={workspaceId} />
)}
{success && (
+3 -2
View File
@@ -9,7 +9,6 @@ import { FileEditor } from "./FilesTab/FileEditor";
import { NotAvailablePanel } from "./FilesTab/NotAvailablePanel";
import { useFilesApi } from "./FilesTab/useFilesApi";
import { buildTree } from "./FilesTab/tree";
import { isExternalLikeRuntime } from "@/lib/externalRuntimes";
// Re-exports preserved for external imports (e.g. tests importing from `../tabs/FilesTab`)
export { buildTree } from "./FilesTab/tree";
@@ -33,6 +32,8 @@ interface Props {
* has no platform-owned filesystem. Otherwise the user loses access to
* a real surface (e.g. claude-code SaaS workspaces have files served
* by ListFiles via EIC; they belong on the rendering path, not here). */
const RUNTIMES_WITHOUT_FILES = new Set(["external"]);
export function FilesTab({ workspaceId, data }: Props) {
// Early-return for runtimes whose filesystem is not platform-owned.
// Skips the whole useFilesApi hook + tree render below — without this,
@@ -42,7 +43,7 @@ export function FilesTab({ workspaceId, data }: Props) {
// "0 files / No config files yet" reads as a bug. The placeholder
// makes the absence intentional and points the user at the right
// surface (Chat).
if (data && isExternalLikeRuntime(data.runtime)) {
if (data && RUNTIMES_WITHOUT_FILES.has(data.runtime)) {
return <NotAvailablePanel runtime={data.runtime} />;
}
return <PlatformOwnedFilesTab workspaceId={workspaceId} />;
+3 -2
View File
@@ -13,7 +13,6 @@ interface Props {
}
import { deriveWsBaseUrl } from "@/lib/ws-url";
import { isExternalLikeRuntime } from "@/lib/externalRuntimes";
const WS_URL = deriveWsBaseUrl();
@@ -88,6 +87,8 @@ function NotAvailablePanel({ runtime }: { runtime: string }) {
/** Runtimes that don't expose a TTY. Keep narrow only add a runtime
* here when its provisioner genuinely has no shell endpoint, otherwise
* the user loses access to a real debugging surface. */
const RUNTIMES_WITHOUT_TERMINAL = new Set(["external"]);
export function TerminalTab({ workspaceId, data }: Props) {
// Early-return for runtimes that have no shell. Skips the entire
// xterm + WebSocket dance below — without this, mounting the tab
@@ -95,7 +96,7 @@ export function TerminalTab({ workspaceId, data }: Props) {
// workspace-server (no /ws/terminal/<id> route registered for it),
// and shows "Connection failed" with a Reconnect button — confusing
// because the workspace IS healthy, just doesn't have a TTY.
if (data && isExternalLikeRuntime(data.runtime)) {
if (data && RUNTIMES_WITHOUT_TERMINAL.has(data.runtime)) {
return <NotAvailablePanel runtime={data.runtime} />;
}
@@ -58,7 +58,6 @@ const SAMPLE_INFO = {
hermes_channel_snippet: "# hermes ws=ws-test",
codex_snippet: "# codex ws=ws-test",
openclaw_snippet: "# openclaw ws=ws-test",
kimi_snippet: "# kimi ws=ws-test",
};
describe("ExternalConnectionSection", () => {
-21
View File
@@ -1,21 +0,0 @@
/**
* External-like (BYO-compute) runtime detection.
*
* Mirrors the backend's isExternalLikeRuntime() in
* workspace-server/internal/handlers/runtime_registry.go.
*
* These runtimes have no platform-owned container — the operator installs
* the agent CLI locally and calls /registry/register. They share UX
* behaviour: no Files tab, no Terminal tab, no Docker config, and the
* connection modal shows copy-paste snippets.
*/
const EXTERNAL_LIKE_RUNTIMES = new Set([
"external",
"kimi",
"kimi-cli",
]);
export function isExternalLikeRuntime(runtime: string | undefined): boolean {
return !!runtime && EXTERNAL_LIKE_RUNTIMES.has(runtime);
}
-2
View File
@@ -9,8 +9,6 @@ const RUNTIME_NAMES: Record<string, string> = {
openclaw: "OpenClaw",
crewai: "CrewAI",
autogen: "AutoGen",
kimi: "Kimi",
"kimi-cli": "Kimi CLI",
};
export function runtimeDisplayName(runtime: string): string {
-132
View File
@@ -1,132 +0,0 @@
#!/usr/bin/env bash
# Staging E2E for MCP stdio transport (runtime#61 regression).
#
# Verifies that the MCP server in the claude-code workspace image
# handles stdout redirected to a regular file — the exact failure
# mode openclaw hits when capturing MCP output.
#
# Required env:
# MOLECULE_CP_URL default: https://staging-api.moleculesai.app
# MOLECULE_ADMIN_TOKEN CP admin bearer (Railway CP_ADMIN_API_TOKEN)
#
# Optional env:
# E2E_KEEP_ORG 1 → skip teardown (debugging only)
# E2E_RUN_ID Slug suffix; CI: ${GITHUB_RUN_ID}
set -euo pipefail
CP_URL="${MOLECULE_CP_URL:-https://staging-api.moleculesai.app}"
ADMIN_TOKEN="${MOLECULE_ADMIN_TOKEN:?MOLEC…OKEN required — Railway staging CP_ADMIN_API_TOKEN}"
RUN_ID_SUFFIX="${E2E_RUN_ID:-$(date +%H%M%S)-$$}"
SLUG="e2e-mcp-$(date +%Y%m%d)-${RUN_ID_SUFFIX}"
SLUG=$(echo "$SLUG" | tr '[:upper:]' '[:lower:]' | tr -cd 'a-z0-9-' | head -c 32)
log() { echo "[$(date +%H:%M:%S)] $*"; }
fail() { echo "[$(date +%H:%M:%S)] ❌ $*" >&2; exit 1; }
ok() { echo "[$(date +%H:%M:%S)] ✅ $*"; }
CURL_COMMON=(-sS --fail-with-body --max-time 30)
# ─── cleanup trap ───────────────────────────────────────────────────────
CLEANUP_DONE=0
cleanup_org() {
local _entry_rc=$?
if [ "$CLEANUP_DONE" = "1" ]; then return 0; fi
CLEANUP_DONE=1
if [ "${E2E_KEEP_ORG:-0}" = "1" ]; then
log "E2E_KEEP_ORG=1 → leaving $SLUG behind for inspection"
return 0
fi
log "Cleanup: deleting tenant $SLUG..."
curl "${CURL_COMMON[@]}" --max-time 120 -X DELETE "$CP_URL/cp/admin/tenants/$SLUG" \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"confirm\":\"$SLUG\"}" >/dev/null 2>&1 \
&& ok "Teardown request accepted" \
|| log "Teardown returned non-2xx (may already be gone)"
}
trap cleanup_org EXIT
# ─── provision tenant ───────────────────────────────────────────────────
log "Provisioning tenant $SLUG..."
# shellcheck disable=SC2034 # response body unused; --fail-with-body handles errors
TENANT=$(curl "${CURL_COMMON[@]}" -X POST "$CP_URL/cp/admin/orgs" \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"slug\":\"$SLUG\",\"name\":\"MCP Stdio E2E $SLUG\"}")
ok "Tenant provisioned"
# ─── get tenant admin token ─────────────────────────────────────────────
log "Fetching tenant admin token..."
for _ in $(seq 1 30); do
TOKEN_RESP=$(curl -sS --max-time 10 "$CP_URL/cp/admin/orgs/$SLUG/admin-token" \
-H "Authorization: Bearer $ADMIN_TOKEN" 2>/dev/null || echo '{}')
TOKEN=$(echo "$TOKEN_RESP" | python3 -c "import sys,json; print(json.load(sys.stdin).get('admin_token',''))" 2>/dev/null || echo "")
[ -n "$TOKEN" ] && break
sleep 2
done
[ -n "$TOKEN" ] || fail "Could not retrieve tenant admin token"
ok "Tenant admin token obtained"
# ─── create claude-code workspace ───────────────────────────────────────
log "Creating claude-code workspace..."
WS=$(curl "${CURL_COMMON[@]}" -X POST "$CP_URL/workspaces" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"name":"MCP Stdio Test","role":"Test","runtime":"claude-code","tier":1}')
WS_ID=$(echo "$WS" | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])")
ok "Workspace created: $WS_ID"
# ─── wait for online ────────────────────────────────────────────────────
log "Waiting for workspace to come online (up to 120s)..."
for _ in $(seq 1 24); do
STATUS=$(curl -sS --max-time 10 "$CP_URL/workspaces/$WS_ID" \
-H "Authorization: Bearer $TOKEN" 2>/dev/null \
| python3 -c "import sys,json; print(json.load(sys.stdin).get('status',''))" 2>/dev/null || echo "")
[ "$STATUS" = "online" ] && break
sleep 5
done
[ "$STATUS" = "online" ] || fail "Workspace did not come online (status=$STATUS)"
ok "Workspace online"
# ─── get workspace container info ───────────────────────────────────────
log "Fetching workspace runtime info..."
RUNTIME_INFO=$(curl -sS --max-time 10 "$CP_URL/workspaces/$WS_ID" \
-H "Authorization: Bearer $TOKEN" 2>/dev/null)
CONTAINER_ID=$(echo "$RUNTIME_INFO" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('container_id',''))" 2>/dev/null || echo "")
[ -n "$CONTAINER_ID" ] || fail "No container_id in workspace response"
ok "Container ID: $CONTAINER_ID"
# ─── MCP stdio transport test ───────────────────────────────────────────
log "Testing MCP stdio transport with regular-file stdout..."
OUTPUT=$(mktemp)
trap 'rm -f "$OUTPUT"; cleanup_org' EXIT
# Send initialize + tools/list via stdin, capture stdout to regular file
{
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}'
echo '{"jsonrpc":"2.0","id":2,"method":"tools/list"}'
} | docker exec -i -e WORKSPACE_ID="$WS_ID" "$CONTAINER_ID" \
python -m molecule_runtime.a2a_mcp_server > "$OUTPUT" 2>&1 || {
RC=$?
log "MCP server exited with code $RC (expected for stdin EOF)"
}
if grep -q '"result"' "$OUTPUT"; then
ok "MCP server handles regular-file stdout"
else
fail "MCP server did not produce JSON-RPC result. Output:\n$(head -20 "$OUTPUT")"
fi
if grep -q '"tools"' "$OUTPUT"; then
ok "MCP tools/list returns tools"
else
fail "MCP tools/list did not return tools. Output:\n$(head -20 "$OUTPUT")"
fi
# ─── summary ────────────────────────────────────────────────────────────
log "All tests passed ✅"
@@ -162,7 +162,7 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool {
var wsRuntime string
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime)
if isExternalLikeRuntime(wsRuntime) {
if wsRuntime == "external" {
return false
}
if !h.HasProvisioner() {
@@ -7,7 +7,6 @@ import (
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
@@ -361,7 +361,7 @@ func (h *DelegationHandler) executeDelegation(ctx context.Context, sourceID, tar
// pause + second attempt catches the common restart-race case where
// the first attempt sees a stale 127.0.0.1:<ephemeral> URL from a
// container that was just recreated.
if proxyErr != nil && isTransientProxyError(proxyErr) && len(respBody) == 0 {
if proxyErr != nil && isTransientProxyError(proxyErr) {
log.Printf("Delegation %s: first attempt failed (%s) — retrying in %s after reactive URL refresh",
delegationID, proxyErr.Error(), delegationRetryDelay)
select {
@@ -5,10 +5,8 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
@@ -958,316 +956,3 @@ func TestInsertDelegationOutcome_ZeroValueIsUnknown(t *testing.T) {
t.Errorf("insertOutcomeUnknown must not collide with insertOK")
}
}
// ==================== executeDelegation — delivery-confirmed proxy error regression tests ====================
//
// These test the fix for issue #159: when proxyA2ARequest returns an error but we have a
// non-empty response body with a 2xx status code, executeDelegation must treat it as success.
// The error is a delivery/transport error (e.g., connection reset after response was received).
// Previously, executeDelegation marked these as "failed" even though the work was done,
// causing retry storms and "error" rendering in canvas despite the response being available.
//
// Test strategy: spin up a mock A2A agent server, set up the source/target DB rows, call
// executeDelegation directly, and verify the activity_logs status and delegation status.
const testDelegationID = "del-159-test"
const testSourceID = "ws-source-159"
const testTargetID = "ws-target-159"
// expectExecuteDelegationBase sets up sqlmock expectations for the DB queries that
// executeDelegation always makes, regardless of outcome.
func expectExecuteDelegationBase(mock sqlmock.Sqlmock) {
// updateDelegationStatus: dispatched
// Uses prefix match — sqlmock regexes match the full query string.
mock.ExpectExec("UPDATE activity_logs SET status").
WithArgs("dispatched", "", testSourceID, testDelegationID).
WillReturnResult(sqlmock.NewResult(0, 1))
// CanCommunicate: getWorkspaceRef(source) + getWorkspaceRef(target).
// Both are root-level workspaces (parent_id=NULL) → root-level siblings → allowed.
mock.ExpectQuery("SELECT id, parent_id FROM workspaces WHERE id = ").
WithArgs(testSourceID).
WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(testSourceID, nil))
mock.ExpectQuery("SELECT id, parent_id FROM workspaces WHERE id = ").
WithArgs(testTargetID).
WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(testTargetID, nil))
// resolveAgentURL: test callers always set the URL in Redis (mr.Set ws:{id}:url),
// so resolveAgentURL gets a cache hit and never falls back to DB.
}
// expectExecuteDelegationSuccess sets up expectations for a completed delegation.
// Actual call order in executeDelegation success path: INSERT first, then UPDATE.
// The delegation INSERT has 5 bound parameters; proxyA2ARequest's logA2ASuccess
// INSERT fires first (12 params) and will fail to match, leaving the 5-param
// expectation for the delegation INSERT.
func expectExecuteDelegationSuccess(mock sqlmock.Sqlmock, respBody string) {
// INSERT activity_logs for delegation completion ('completed' is a SQL literal, not a param)
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// updateDelegationStatus: completed
mock.ExpectExec("UPDATE activity_logs SET status").
WithArgs("completed", "", testSourceID, testDelegationID).
WillReturnResult(sqlmock.NewResult(0, 1))
}
// expectExecuteDelegationFailed sets up expectations for a failed delegation.
// Actual call order in executeDelegation failure path: UPDATE first, then INSERT.
func expectExecuteDelegationFailed(mock sqlmock.Sqlmock) {
// updateDelegationStatus: failed (fires before the INSERT in the failure path)
mock.ExpectExec("UPDATE activity_logs SET status").
WithArgs("failed", sqlmock.AnyArg(), testSourceID, testDelegationID).
WillReturnResult(sqlmock.NewResult(0, 1))
// INSERT activity_logs for delegation failure ('failed' is a SQL literal, not a param)
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
}
// TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess is the primary regression
// test for issue #159. The scenario:
// - Attempt 1: server sends 200 OK headers + partial body, then closes connection.
// proxyA2ARequest: body read gets io.EOF (partial body read), returns (200, <partial>, BadGateway).
// isTransientProxyError(BadGateway) = TRUE → retry.
// - Attempt 2: server does the same thing (closes after partial body).
// proxyA2ARequest: same (200, <partial>, BadGateway).
// isTransientProxyError(BadGateway) = TRUE → retry AGAIN (but outer context will fire soon,
// or we get one more attempt). For the test we let it run.
// POST-FIX: the executeDelegation new condition sees status=200, body=<partial>, err!=nil
// and routes to handleSuccess immediately.
//
// The key pre/post-fix difference: pre-fix, executeDelegation received status=0 (hardcoded)
// even when the server sent 200, so the condition always failed. Post-fix, status=200 is
// preserved through the error return path (proxyA2ARequest now returns resp.StatusCode, respBody).
// In this test the retry ultimately succeeds (server eventually sends full body), but
// the critical assertion is that a 2xx partial-body delivery-confirmed response is never
// classified as "failed" — it always routes to success.
func TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// Server that sends a 200 response with declared Content-Length but closes
// the connection before sending all bytes. Go's http.Client sees io.EOF on
// the body read. proxyA2ARequest captures the partial body + status=200 and
// returns (200, <partial>, error). executeDelegation's new condition sees
// status=200 + body > 0 + error != nil → routes to handleSuccess.
var wg sync.WaitGroup
wg.Add(1)
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
defer ln.Close()
go func() {
defer wg.Done()
conn, err := ln.Accept()
if err != nil {
return
}
defer conn.Close()
// Consume the HTTP request
buf := make([]byte, 2048)
conn.Read(buf)
// Send 200 OK with Content-Length: 100 but only 74 bytes of body
// (less than declared length → io.LimitReader returns io.EOF after reading all 74)
resp := "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n"
resp += `{"result":{"parts":[{"text":"work completed successfully"}]}}` // 74 bytes
conn.Write([]byte(resp))
// Close immediately — client gets io.EOF on body read
}()
agentURL := "http://" + ln.Addr().String()
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL)
allowLoopbackForTest(t)
expectExecuteDelegationBase(mock)
expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"work completed successfully"}]}}`)
// Execute synchronously (not as a goroutine) so we can check DB state immediately.
// The handler fires it as goroutine; we call it directly for deterministic testing.
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0",
"id": "1",
"method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(100 * time.Millisecond) // let DB writes settle
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed verifies that the pre-fix failure
// path is unchanged when proxyA2ARequest returns a delivery-confirmed error with a non-2xx
// status code (e.g., 500 Internal Server Error with partial body read before connection drop).
// The new condition requires status >= 200 && status < 300, so non-2xx always routes to failure.
func TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// Server returns 500 with declared Content-Length but closes connection early.
// proxyA2ARequest: reads 500 headers, partial body, then connection drop → body read error.
// Returns (500, <partial_body>, BadGateway).
// New condition: status=500 is NOT >= 200 && < 300 → routes to failure.
// isTransientProxyError(500) = false → no retry.
var wg sync.WaitGroup
wg.Add(1)
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
defer ln.Close()
go func() {
defer wg.Done()
conn, err := ln.Accept()
if err != nil {
return
}
defer conn.Close()
buf := make([]byte, 2048)
conn.Read(buf)
// 500 with Content-Length: 100 but only ~60 bytes of body
resp := "HTTP/1.1 500 Internal Server Error\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n"
resp += `{"error":"agent crashed"}` // ~24 bytes, less than declared
conn.Write([]byte(resp))
// Close immediately — client gets io.EOF on body read
}()
agentURL := "http://" + ln.Addr().String()
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL)
allowLoopbackForTest(t)
expectExecuteDelegationBase(mock)
expectExecuteDelegationFailed(mock)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(100 * time.Millisecond)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed verifies that the pre-fix failure
// path is unchanged when proxyA2ARequest returns an error with a 2xx status but empty body.
// The new condition requires len(respBody) > 0, so empty body routes to failure.
func TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
// Server returns 502 Bad Gateway — proxyA2ARequest returns 502, body="" (empty), error != nil.
// New condition: proxyErr != nil && len(respBody) > 0 && status >= 200 && status < 300
// → len(respBody) == 0 → condition FALSE → falls through to failure.
// isTransientProxyError(502) is TRUE → retry → same result → failure.
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadGateway)
// No body — connection closes normally
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL)
allowLoopbackForTest(t)
// executeDelegationBase: UPDATE dispatched + CanCommunicate SELECTs
expectExecuteDelegationBase(mock)
// The retry (isTransientProxyError && len(respBody)==0) fires after delegationRetryDelay,
// re-uses the Redis-cached URL — no extra DB calls before the failure path.
// Failure: UPDATE failed + INSERT (failed status is a SQL literal, 5 bound params)
expectExecuteDelegationFailed(mock)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(100 * time.Millisecond)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestExecuteDelegation_CleanProxyResponse_Unchanged verifies that a clean proxy response
// (no error, 200 with body) is unaffected by the new condition. This is the baseline:
// proxyErr == nil so the new condition never fires.
func TestExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`))
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL)
allowLoopbackForTest(t)
expectExecuteDelegationBase(mock)
expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"all good"}]}}`)
a2aBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0", "id": "1", "method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"parts": []map[string]string{{"type": "text", "text": "do work"}},
},
},
})
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
time.Sleep(100 * time.Millisecond)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
@@ -136,7 +136,7 @@ func discoverWorkspacePeer(ctx context.Context, c *gin.Context, callerID, target
// lives on the other side of the wire and needs the URL as-is
// (localhost rewrites wouldn't resolve from its host anyway).
// Phase 30.6.
if isExternalLikeRuntime(wsRuntime) {
if wsRuntime == "external" {
if handled := writeExternalWorkspaceURL(ctx, c, callerID, targetID, wsName); handled {
return
}
@@ -181,7 +181,7 @@ func writeExternalWorkspaceURL(ctx context.Context, c *gin.Context, callerID, ta
outURL := wsURL
var callerRuntime string
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime,'langgraph') FROM workspaces WHERE id = $1`, callerID).Scan(&callerRuntime)
if !isExternalLikeRuntime(callerRuntime) {
if callerRuntime != "external" {
outURL = strings.Replace(outURL, "127.0.0.1", "host.docker.internal", 1)
outURL = strings.Replace(outURL, "localhost", "host.docker.internal", 1)
}
@@ -50,7 +50,6 @@ func BuildExternalConnectionPayload(platformURL, workspaceID, authToken string)
"hermes_channel_snippet": stamp(externalHermesChannelTemplate),
"codex_snippet": stamp(externalCodexTemplate),
"openclaw_snippet": stamp(externalOpenClawTemplate),
"kimi_snippet": stamp(externalKimiTemplate),
}
}
@@ -490,149 +489,6 @@ codex
// external openclaw would need a sessions.steer bridge daemon (the
// equivalent of hermes-channel-molecule for openclaw). Tracked
// separately; outbound tools is the first cut.
// externalKimiTemplate — complete poll-based external setup for Kimi CLI.
// Includes register + heartbeat + inbound activity polling + reply via
// /notify. No public URL needed (NAT-safe). Operators paste once and run
// in a background terminal or via launchd.
const externalKimiTemplate = `# Kimi CLI external setup — register + heartbeat + inbound poll + reply.
# For operators whose external agent is a Kimi CLI session.
# No public URL needed; runs behind NAT in poll mode.
# 1. Install the workspace runtime wheel (provides HTTP client):
pip install molecule-ai-workspace-runtime
# 2. Save credentials and the bridge script:
mkdir -p ~/.molecule-ai/kimi-workspace
chmod 700 ~/.molecule-ai/kimi-workspace
cat > ~/.molecule-ai/kimi-workspace/env <<'EOF'
WORKSPACE_ID={{WORKSPACE_ID}}
PLATFORM_URL={{PLATFORM_URL}}
MOLECULE_WORKSPACE_TOKEN=<paste from create response>
EOF
chmod 600 ~/.molecule-ai/kimi-workspace/env
cat > ~/.molecule-ai/kimi-workspace/kimi_bridge.py <<'PYEOF'
#!/usr/bin/env python3
"""Kimi bridge — keeps workspace online and polls for canvas messages."""
import json, logging, time
from pathlib import Path
import httpx
ENV = Path.home() / ".molecule-ai" / "kimi-workspace" / "env"
HEARTBEAT_INTERVAL = 20
POLL_INTERVAL = 5
def load_env():
env = {}
for line in ENV.read_text().splitlines():
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
env[k.strip()] = v.strip()
return env
def hdrs(url, token):
return {"Authorization": f"Bearer {token}", "Origin": url, "Content-Type": "application/json"}
def register(client, url, ws, tok):
r = client.post(f"{url}/registry/register", json={
"id": ws, "url": "", "agent_card": {"name": "mac-laptop-kimi", "skills": []},
"delivery_mode": "poll",
}, headers=hdrs(url, tok))
r.raise_for_status()
logging.info("registered %s", ws)
def heartbeat(client, url, ws, tok, start):
r = client.post(f"{url}/registry/heartbeat", json={
"workspace_id": ws, "error_rate": 0.0, "sample_error": "",
"active_tasks": 0, "current_task": "", "uptime_seconds": int(time.time() - start),
}, headers=hdrs(url, tok))
r.raise_for_status()
def poll_inbound(client, url, ws, tok, since_id):
params = {"since_secs": "30", "limit": "50"}
if since_id:
params["since_id"] = since_id
r = client.get(f"{url}/workspaces/{ws}/activity", params=params, headers=hdrs(url, tok))
r.raise_for_status()
return r.json()
def send_reply(client, url, ws, tok, text):
r = client.post(f"{url}/workspaces/{ws}/notify", json={"message": text}, headers=hdrs(url, tok))
r.raise_for_status()
logging.info("reply sent: %s", text[:80])
def extract_user_text(item):
"""Pull the user message text from an activity log request_body."""
try:
body = item.get("request_body") or {}
parts = body.get("params", {}).get("message", {}).get("parts", [])
return " ".join(p.get("text", "") for p in parts if p.get("text"))
except Exception:
return ""
def main():
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
start = time.time()
since_id = ""
last_beat = 0
while True:
try:
e = load_env()
purl, ws, tok = e["PLATFORM_URL"], e["WORKSPACE_ID"], e["MOLECULE_WORKSPACE_TOKEN"]
with httpx.Client(timeout=10.0) as c:
# Heartbeat every HEARTBEAT_INTERVAL seconds
if time.time() - last_beat >= HEARTBEAT_INTERVAL:
register(c, purl, ws, tok)
heartbeat(c, purl, ws, tok, start)
last_beat = time.time()
# Poll for new canvas messages
items = poll_inbound(c, purl, ws, tok, since_id)
for item in items:
since_id = item["id"]
src = item.get("source_id")
method = item.get("method") or ""
# Skip our own /notify replies and agent-originated traffic
if method == "notify" or src is not None:
continue
text = extract_user_text(item)
if text:
logging.info("INBOUND from canvas: %s", text)
# Replace the echo below with your own logic:
send_reply(c, purl, ws, tok, f"Echo: {text}")
time.sleep(POLL_INTERVAL)
except Exception as exc:
logging.warning("loop failed: %s", exc)
time.sleep(5)
if __name__ == "__main__":
main()
PYEOF
chmod +x ~/.molecule-ai/kimi-workspace/kimi_bridge.py
# 3. Start the bridge (run in a persistent terminal or via launchd):
python3 ~/.molecule-ai/kimi-workspace/kimi_bridge.py
# What the script does:
# • Registers the workspace in poll mode (no public URL needed)
# • Heartbeats every 20s to keep STATUS = online on the canvas
# • Polls /workspaces/:id/activity every 5s for new canvas messages
# • Echo-replies via POST /workspaces/:id/notify
#
# To change the reply logic, edit the send_reply() call inside the loop.
# To send a one-off reply from another terminal:
# curl -fsS -X POST "{{PLATFORM_URL}}/workspaces/{{WORKSPACE_ID}}/notify" \
# -H "Authorization: Bearer $(cat ~/.molecule-ai/kimi-workspace/env | grep TOKEN | cut -d= -f2)" \
# -H "Content-Type: application/json" \
# -d '{"message":"Hello from Kimi"}'
#
# For push-mode inbound A2A (instead of polling), pair with the Python SDK
# tab — but that requires a public HTTPS endpoint (ngrok / Cloudflare Tunnel).
#
# Need help?
# Documentation: https://doc.moleculesai.app/docs/guides/external-agent-registration
`
const externalOpenClawTemplate = `# OpenClaw MCP config — outbound tool path. For operators whose
# external agent is an openclaw session.
#
@@ -62,7 +62,7 @@ func (h *WorkspaceHandler) RotateExternalCredentials(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
return
}
if !isExternalLikeRuntime(runtime) {
if runtime != "external" {
// Rotating a hermes/claude-code workspace's bearer would not
// just break the ssh-EIC tunnel auth on the platform side — it
// would also leave the workspace's in-container heartbeat with
@@ -73,9 +73,9 @@ func (h *WorkspaceHandler) RotateExternalCredentials(c *gin.Context) {
// here so the canvas can show "rotate is for external workspaces;
// click Restart instead" rather than silently corrupting state.
c.JSON(http.StatusBadRequest, gin.H{
"error": "rotate is only valid for external/BYO-compute workspaces",
"error": "rotate is only valid for runtime=external workspaces",
"runtime": runtime,
"hint": "use POST /workspaces/:id/restart for container-backed runtimes",
"hint": "use POST /workspaces/:id/restart for non-external runtimes",
})
return
}
@@ -139,9 +139,9 @@ func (h *WorkspaceHandler) GetExternalConnection(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
return
}
if !isExternalLikeRuntime(runtime) {
if runtime != "external" {
c.JSON(http.StatusBadRequest, gin.H{
"error": "connection payload is only valid for external/BYO-compute workspaces",
"error": "connection payload is only valid for runtime=external workspaces",
"runtime": runtime,
})
return
@@ -82,7 +82,6 @@ func TestRotateExternalCredentials_HappyPath(t *testing.T) {
"curl_register_template", "python_snippet",
"claude_code_channel_snippet", "universal_mcp_snippet",
"hermes_channel_snippet", "codex_snippet", "openclaw_snippet",
"kimi_snippet",
} {
if _, ok := body.Connection[k]; !ok {
t.Errorf("payload missing snippet field: %s", k)
@@ -242,7 +242,7 @@ func (h *PluginsHandler) isExternalRuntime(workspaceID string) bool {
if err != nil {
return false
}
return isExternalLikeRuntime(runtime)
return runtime == "external"
}
func (h *PluginsHandler) execAsRoot(ctx context.Context, containerName string, cmd []string) (string, error) {
@@ -76,34 +76,6 @@ func TestPluginUninstall_ExternalRuntime_Returns422(t *testing.T) {
}
}
// TestPluginInstall_KimiRuntime_Returns422 — kimi-cli is BYO-compute,
// same shape as external. Push-install via docker exec must be rejected.
func TestPluginInstall_KimiRuntime_Returns422(t *testing.T) {
h := NewPluginsHandler(t.TempDir(), nil, nil).
WithRuntimeLookup(func(workspaceID string) (string, error) {
return "kimi-cli", nil
})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-kimi"}}
c.Request = httptest.NewRequest(
"POST",
"/workspaces/ws-kimi/plugins",
bytes.NewBufferString(`{"source":"local://my-plugin"}`),
)
c.Request.Header.Set("Content-Type", "application/json")
h.Install(c)
if w.Code != http.StatusUnprocessableEntity {
t.Errorf("expected 422 for runtime='kimi-cli', got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), "external runtimes") {
t.Errorf("expected error body to mention 'external runtimes', got: %s", w.Body.String())
}
}
// TestPluginInstall_ContainerBackedRuntime_FallsThroughGuard — the runtime
// guard MUST NOT short-circuit container-backed runtimes. With
// `runtime='claude-code'` the install proceeds past the guard; without a
@@ -158,7 +158,7 @@ func (h *RegistryHandler) resolveDeliveryMode(ctx context.Context, workspaceID,
if existing.Valid && existing.String != "" {
return existing.String, nil
}
if runtime.Valid && isExternalLikeRuntime(runtime.String) {
if runtime.Valid && runtime.String == "external" {
return models.DeliveryModePoll, nil
}
return models.DeliveryModePush, nil
@@ -1721,65 +1721,6 @@ func TestRegister_ExternalRuntime_DefaultsToPoll(t *testing.T) {
}
}
// TestRegister_KimiRuntime_DefaultsToPoll mirrors the external-runtime
// poll-default test: a workspace whose existing row has runtime=kimi-cli
// and empty delivery_mode must resolve to poll (laptop/NAT-safe default).
func TestRegister_KimiRuntime_DefaultsToPoll(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
const wsID = "ws-kimi-default-poll"
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode", "runtime"}).
AddRow(sql.NullString{}, "kimi-cli"))
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow(""))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
mock.ExpectExec("INSERT INTO workspace_auth_tokens").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/registry/register",
bytes.NewBufferString(`{"id":"`+wsID+`","agent_card":{"name":"a"}}`))
c.Request.Header.Set("Content-Type", "application/json")
handler.Register(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
_ = json.Unmarshal(w.Body.Bytes(), &resp)
if resp["delivery_mode"] != "poll" {
t.Errorf("delivery_mode = %v, want %q (kimi runtime + empty mode → poll)",
resp["delivery_mode"], "poll")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestRegister_NonExternalRuntime_StillDefaultsToPush guards the
// inverse: a non-external runtime (langgraph, hermes, etc.) with
// empty delivery_mode keeps the historical push default. Catches
@@ -78,8 +78,6 @@ var fallbackRuntimes = map[string]struct{}{
"openclaw": {},
"codex": {},
"external": {},
"kimi": {},
"kimi-cli": {},
// mock — virtual workspace with hardcoded canned A2A replies.
// No container, no EC2, no template repo. See mock_runtime.go
// for the full rationale (200-workspace funding-demo org).
@@ -110,10 +108,6 @@ func loadRuntimesFromManifest(path string) (map[string]struct{}, error) {
// the manifest doesn't know about it. Injected here so we
// don't need a special-case in every caller.
"external": {},
// kimi and kimi-cli are BYO-compute meta-runtimes (same shape
// as external). No template repo; injected like external.
"kimi": {},
"kimi-cli": {},
// mock is ALWAYS available for the same reason as external:
// virtual workspace, no template repo, never spawns a
// container. See mock_runtime.go.
@@ -134,28 +128,6 @@ func loadRuntimesFromManifest(path string) (map[string]struct{}, error) {
return out, nil
}
// isExternalLikeRuntime returns true for runtimes that are BYO-compute
// (operator-managed, no platform-owned container or EC2). These runtimes
// share behavior around delivery_mode defaulting, plugin install, restart,
// and discovery.
func isExternalLikeRuntime(runtime string) bool {
switch runtime {
case "external", "kimi", "kimi-cli":
return true
}
return false
}
// normalizeExternalRuntime returns the given runtime label if non-empty,
// otherwise falls back to "external". Used when persisting BYO-compute
// workspaces so we don't store an empty runtime string.
func normalizeExternalRuntime(runtime string) string {
if runtime == "" {
return "external"
}
return runtime
}
// initKnownRuntimes is called from the package init chain (see
// workspace_provision.go var initialization) to replace the
// fallback map with the manifest-derived one. Idempotent —
@@ -33,7 +33,7 @@ func TestLoadRuntimesFromManifest_StripsDefaultSuffix(t *testing.T) {
if err != nil {
t.Fatalf("load: %v", err)
}
want := []string{"claude-code", "langgraph", "hermes", "external", "kimi", "kimi-cli"}
want := []string{"claude-code", "langgraph", "hermes", "external"}
for _, w := range want {
if _, ok := got[w]; !ok {
t.Errorf("want runtime %q in set, missing. got=%v", w, keys(got))
@@ -59,10 +59,8 @@ func TestLoadRuntimesFromManifest_ExternalAlwaysInjected(t *testing.T) {
if err != nil {
t.Fatalf("load: %v", err)
}
for _, must := range []string{"external", "kimi", "kimi-cli"} {
if _, ok := got[must]; !ok {
t.Errorf("%s must be injected even when absent from manifest: %v", must, keys(got))
}
if _, ok := got["external"]; !ok {
t.Errorf("external must be injected even when absent from manifest: %v", keys(got))
}
}
@@ -97,7 +95,7 @@ func TestRealManifestParses(t *testing.T) {
t.Fatalf("real manifest load: %v", err)
}
// Core runtimes we always expect to ship.
for _, must := range []string{"langgraph", "hermes", "claude-code", "external", "kimi", "kimi-cli"} {
for _, must := range []string{"langgraph", "hermes", "claude-code", "external"} {
if _, ok := got[must]; !ok {
t.Errorf("real manifest missing runtime %q — got=%v", must, keys(got))
}
@@ -428,16 +428,13 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
// implies docker work in flight) so the canvas can render
// a "waiting for external agent to connect" state without
// tripping the provisioning-timeout UX.
if payload.External || isExternalLikeRuntime(payload.Runtime) {
if payload.External || payload.Runtime == "external" {
var connectionToken string
if payload.URL != "" {
// URL already validated by validateAgentURL above (before BeginTx).
// Now persist it: the external URL is set after the workspace row
// commits so that a failed URL UPDATE doesn't roll back the row.
// Preserve BYO-compute runtime label (kimi, kimi-cli, external) —
// don't coerce to generic "external" so the canvas can show the
// correct runtime name in the node card.
db.DB.ExecContext(ctx, `UPDATE workspaces SET url = $1, status = $2, runtime = $3, updated_at = now() WHERE id = $4`, payload.URL, models.StatusOnline, normalizeExternalRuntime(payload.Runtime), id)
db.DB.ExecContext(ctx, `UPDATE workspaces SET url = $1, status = $2, runtime = 'external', updated_at = now() WHERE id = $3`, payload.URL, models.StatusOnline, id)
if err := db.CacheURL(ctx, id, payload.URL); err != nil {
log.Printf("External workspace: failed to cache URL for %s: %v", id, err)
}
@@ -449,8 +446,7 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
// in awaiting_agent. First POST /registry/register call
// from the external agent (with this token + its URL)
// flips the row to online.
// Preserve BYO-compute runtime label (kimi, kimi-cli, external).
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = $2, updated_at = now() WHERE id = $3`, models.StatusAwaitingAgent, normalizeExternalRuntime(payload.Runtime), id)
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = 'external', updated_at = now() WHERE id = $2`, models.StatusAwaitingAgent, id)
tok, tokErr := wsauth.IssueToken(ctx, db.DB, id)
if tokErr != nil {
log.Printf("External workspace %s: token issuance failed: %v", id, tokErr)
@@ -103,11 +103,11 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) {
// behavior agree, and surface a clear message instead of silently
// no-op'ing — the canvas can show the operator that the fix is on
// their side.
if isExternalLikeRuntime(dbRuntime) {
if dbRuntime == "external" {
c.JSON(http.StatusOK, gin.H{
"status": "noop",
"runtime": dbRuntime,
"message": dbRuntime + " workspaces are operator-driven — restart your local agent; platform has nothing to restart",
"runtime": "external",
"message": "external workspaces are operator-driven — restart your local poller; platform has nothing to restart",
})
return
}
@@ -547,7 +547,7 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) {
// Don't auto-restart external workspaces (no Docker container)
// or mock workspaces (no container, every reply is canned —
// see workspace-server/internal/handlers/mock_runtime.go).
if isExternalLikeRuntime(dbRuntime) || dbRuntime == "mock" {
if dbRuntime == "external" || dbRuntime == "mock" {
return
}
@@ -179,51 +179,6 @@ func TestRestartHandler_ExternalRuntimeNoOps(t *testing.T) {
}
}
func TestRestartHandler_KimiRuntimeNoOps(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery("SELECT status, name, tier, COALESCE").
WithArgs("ws-kimi").
WillReturnRows(sqlmock.NewRows([]string{"status", "name", "tier", "runtime"}).
AddRow("offline", "Kimi Agent", 1, "kimi-cli"))
mock.ExpectQuery("SELECT parent_id FROM workspaces WHERE id =").
WithArgs("ws-kimi").
WillReturnRows(sqlmock.NewRows([]string{"parent_id"}))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-kimi"}}
c.Request = httptest.NewRequest("POST", "/workspaces/ws-kimi/restart", nil)
handler.Restart(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("decode response: %v", err)
}
if got, _ := resp["status"].(string); got != "noop" {
t.Errorf("expected status=noop, got %v", resp["status"])
}
if got, _ := resp["runtime"].(string); got != "kimi-cli" {
t.Errorf("expected runtime=kimi-cli, got %v", resp["runtime"])
}
if msg, _ := resp["message"].(string); !strings.Contains(msg, "operator-driven") {
t.Errorf("expected message about operator-driven, got %v", resp["message"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestRestartHandler_NilProvisionerReturns503(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
@@ -559,48 +559,6 @@ func TestWorkspaceCreate_ExternalURL_SSRFSafe(t *testing.T) {
}
}
// TestWorkspaceCreate_KimiRuntime_PreservesLabel asserts that a workspace
// created with runtime="kimi" takes the BYO-compute path (awaiting_agent,
// no Docker provisioning) and preserves the "kimi" label in the DB instead
// of coercing to "external". Regression guard for SOP runtime addition.
func TestWorkspaceCreate_KimiRuntime_PreservesLabel(t *testing.T) {
t.Setenv("MOLECULE_DEPLOY_MODE", "self-hosted")
t.Setenv("MOLECULE_ORG_ID", "")
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
mock.ExpectBegin()
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "Kimi Agent", nil, 3, "kimi", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
// Pre-register flow: awaiting_agent + runtime preserved as "kimi"
mock.ExpectExec("UPDATE workspaces SET status").
WithArgs(models.StatusAwaitingAgent, "kimi", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// Token issuance (workspace_auth_tokens, not workspace_tokens)
mock.ExpectExec("INSERT INTO workspace_auth_tokens").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"name":"Kimi Agent","runtime":"kimi","tier":3,"canvas":{"x":100,"y":100}}`
c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
if w.Code != http.StatusCreated {
t.Errorf("expected status 201, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestWorkspaceCreate_ExternalURL_SSRFMetadataBlocked asserts that an external
// workspace created with a cloud-metadata URL is rejected with 400 before any
// DB write. 169.254.0.0/16 is always blocked regardless of mode (SaaS or
@@ -80,7 +80,6 @@ func (s *Store) PatchNamespace(ctx context.Context, name string, body contract.N
}
parts = append(parts, fmt.Sprintf("metadata = $%d", idx))
args = append(args, metadata)
idx++ // advance so subsequent fields (if any) get correct positional index
}
query := fmt.Sprintf(`
UPDATE memory_namespaces SET %s
@@ -302,30 +302,3 @@ func TestStore_PatchNamespace_NotFound_SqlNoRows(t *testing.T) {
t.Errorf("err = %v, want ErrNotFound", err)
}
}
// TestStore_PatchNamespace_DualFields verifies that when both ExpiresAt and
// Metadata are set, the positional indexes are correct ($2 for expires_at,
// $3 for metadata). Prior to ad7acd30 this was broken: the idx++ after the
// metadata branch was removed as a golangci-lint false-positive, causing
// metadata to be written as $2 (same slot as expires_at) and expires_at to
// be omitted from args entirely.
func TestStore_PatchNamespace_DualFields(t *testing.T) {
db, mock := setupMockDB(t)
store := NewStore(db)
exp := time.Now().Add(time.Hour).UTC()
// sqlmock matches by query string; we verify the query uses $2 and $3.
mock.ExpectQuery("UPDATE memory_namespaces SET expires_at = \\$2, metadata = \\$3 WHERE name = \\$1").
WithArgs("workspace:abc", sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnRows(sqlmock.NewRows([]string{"name", "kind", "expires_at", "metadata", "created_at"}).
AddRow("workspace:abc", "workspace", exp, []byte(`{}`), time.Now()))
got, err := store.PatchNamespace(context.Background(), "workspace:abc", contract.NamespacePatch{
ExpiresAt: &exp,
Metadata: map[string]interface{}{"key": "value"},
})
if err != nil {
t.Fatalf("err = %v, want nil", err)
}
if got.Name != "workspace:abc" {
t.Errorf("got.Name = %q, want workspace:abc", got.Name)
}
}
+14 -6
View File
@@ -187,19 +187,27 @@ def enrich_peer_metadata_nonblocking(
canon = _validate_peer_id(peer_id)
if canon is None:
return None
# Cache hit (fresh): return without blocking on a registry GET.
# This is the hot path for active peer conversations — avoids
# spawning a background thread for every push from a known peer.
# Cache-first: return immediately on warm hit (same TTL logic as the
# sync path). This is the hot-path optimisation — every push from a
# warm peer must return the record without touching the in-flight set
# or the executor. A background fetch that races to fill the cache
# will find the entry already present when it calls
# enrich_peer_metadata (which does its own fresh-TTL check), so it
# exits as a no-op with no extra network traffic.
current = time.monotonic()
cached = _peer_metadata_get(canon)
if cached is not None:
fetched_at, record = cached
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
return record
# Cache miss or TTL expired: schedule background fetch unless one is
# already in flight for this peer. The in-flight set keeps a flurry
# of pushes from one peer (e.g., a chatty agent) from spawning N
# parallel GETs.
# already in flight for this peer. The synchronous version atomically
# reads-then-writes; the async version splits that into "schedule
# fetch" + "fetch fills cache later." The in-flight set keeps a
# flurry of pushes from one peer (e.g., a chatty agent) from
# spawning N parallel GETs.
with _enrich_in_flight_lock:
if canon in _enrich_in_flight:
return None
+6 -1
View File
@@ -548,7 +548,12 @@ class LangGraphA2AExecutor(AgentExecutor):
# receive the error and stop polling.
await updater.failed(
message=new_text_message(
sanitize_agent_error(exc=e), task_id=task_id, context_id=context_id
# Pass the exception string as stderr so sanitize_agent_error
# can include a ~1KB preview in the A2A error response.
# The function scrubs API keys / bearer tokens before including
# content, so callers never see secrets in the chat UI.
# Fixes: roadmap item "SDK executor stderr swallowing".
sanitize_agent_error(stderr=str(e)), task_id=task_id, context_id=context_id,
)
)
finally:
+211 -130
View File
@@ -12,12 +12,14 @@ Environment variables (set by the workspace container):
PLATFORM_URL — platform API base URL (e.g. http://platform:8080)
"""
import argparse
import asyncio
import json
import logging
import os
import stat
import sys
import uuid
from typing import Callable
# Top-level (not inside main()) so the wheel rewriter expands this to
@@ -163,67 +165,15 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
# --- MCP Notification bridge ---
# Runtime-adaptive notification method. Each MCP host uses a different
# JSON-RPC notification method for inbound push. Detect at startup so
# the inbox poller emits the right shape for the host that spawned us.
#
# Detection order (first match wins):
# CLAUDE_CODE / CLAUDE_CODE_VERSION → notifications/claude/channel
# OPENCLAW_SESSION_ID / OPENCLAW_GATEWAY_PORT → notifications/openclaw/channel
# CURSOR_MCP / CURSOR_TRACE_ID → notifications/cursor/channel
# HERMES_RUNTIME / HERMES_WORKSPACE_ID → notifications/hermes/channel
# fallback → notifications/message
#
# The method is resolved once at startup and cached in
# _CHANNEL_NOTIFICATION_METHOD. Tests can override by patching
# _detect_runtime() or setting the env var before import.
_DETECTED_RUNTIME: str | None = None
def _detect_runtime() -> str:
"""Detect which MCP host spawned this process."""
global _DETECTED_RUNTIME
if _DETECTED_RUNTIME is not None:
return _DETECTED_RUNTIME
env = os.environ
if env.get("CLAUDE_CODE") or env.get("CLAUDE_CODE_VERSION"):
_DETECTED_RUNTIME = "claude"
elif env.get("OPENCLAW_SESSION_ID") or env.get("OPENCLAW_GATEWAY_PORT"):
_DETECTED_RUNTIME = "openclaw"
elif env.get("CURSOR_MCP") or env.get("CURSOR_TRACE_ID"):
_DETECTED_RUNTIME = "cursor"
elif env.get("HERMES_RUNTIME") or env.get("HERMES_WORKSPACE_ID"):
_DETECTED_RUNTIME = "hermes"
else:
_DETECTED_RUNTIME = "generic"
logger.debug(f"Detected MCP runtime: {_DETECTED_RUNTIME}")
return _DETECTED_RUNTIME
def _notification_method_for_runtime(runtime: str) -> str:
"""Return the JSON-RPC notification method for the given runtime."""
return {
"claude": "notifications/claude/channel",
"openclaw": "notifications/openclaw/channel",
"cursor": "notifications/cursor/channel",
"hermes": "notifications/hermes/channel",
"generic": "notifications/message",
}.get(runtime, "notifications/message")
# Lazily resolved so tests can patch _detect_runtime() before the first
# notification is built. The value is read once per process lifetime.
_CHANNEL_NOTIFICATION_METHOD: str | None = None
def _channel_notification_method() -> str:
"""Return the cached notification method for the detected runtime."""
global _CHANNEL_NOTIFICATION_METHOD
if _CHANNEL_NOTIFICATION_METHOD is None:
_CHANNEL_NOTIFICATION_METHOD = _notification_method_for_runtime(_detect_runtime())
return _CHANNEL_NOTIFICATION_METHOD
# `notifications/claude/channel` matches the contract used by the
# molecule-mcp-claude-channel bun bridge (server.ts:509). Claude Code's
# MCP runtime treats this method as a conversation interrupt — `content`
# becomes the agent turn, `meta` is structured metadata. Notification-
# capable hosts (Claude Code today; any compliant client tomorrow)
# get push UX automatically; pollers (`wait_for_message` / `inbox_peek`)
# still work unchanged. See task #46 + the deprecation path documented
# in workspace/inbox.py:set_notification_callback.
_CHANNEL_NOTIFICATION_METHOD = "notifications/claude/channel"
# ============= Trust-boundary gates for channel-notification meta ==============
@@ -621,7 +571,7 @@ def _build_channel_notification(msg: dict) -> dict:
)
return {
"jsonrpc": "2.0",
"method": _channel_notification_method(),
"method": _CHANNEL_NOTIFICATION_METHOD,
"params": {
"content": content,
"meta": meta,
@@ -684,69 +634,66 @@ def _format_channel_content(
# --- MCP Server (JSON-RPC over stdio) ---
def _warn_if_stdio_not_pipe(stdin_fd: int = 0, stdout_fd: int = 1) -> None:
"""Warn when stdio isn't a pipe — but continue anyway.
def _assert_stdio_is_pipe_compatible(
stdin_fd: int = 0, stdout_fd: int = 1
) -> None:
"""Fail fast with a friendly message when stdio isn't pipe-compatible.
The legacy asyncio.connect_read_pipe / connect_write_pipe transport
rejected regular files, PTYs, and sockets with:
ValueError: Pipe transport is only for pipes, sockets and
character devices
We now use direct buffer I/O which works with ANY file descriptor,
so this is a diagnostic-only warning for operators debugging setup
issues. See molecule-ai-workspace-runtime#61.
asyncio.connect_read_pipe / connect_write_pipe accept only pipes,
sockets, and character devices. When molecule-mcp is launched with
stdout redirected to a regular file (CI smoke tests, ad-hoc local
debugging that captures output), the asyncio call later raises
``ValueError: Pipe transport is only for pipes, sockets and character
devices`` from inside the event loop — surfaced to the operator as a
confusing traceback. Detect early and exit cleanly with guidance
instead. See molecule-ai-workspace-runtime#61.
"""
for name, fd in (("stdin", stdin_fd), ("stdout", stdout_fd)):
try:
mode = os.fstat(fd).st_mode
except OSError:
continue
if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)):
logger.warning(
f"molecule-mcp: {name} (fd={fd}) is not a pipe/socket/char-device. "
f"This is fine — the universal stdio transport handles regular files, "
f"PTYs, and sockets. If you see garbled output, launch from an "
f"MCP-aware client (Claude Code, Cursor, OpenClaw, etc.)."
except OSError as exc:
print(
f"molecule-mcp: cannot stat {name} (fd={fd}): {exc}.\n"
f" This MCP server expects bidirectional pipe stdio. Launch it from\n"
f" an MCP-aware client (Claude Code, Cursor, etc.) — not detached\n"
f" from a terminal or with stdio closed.",
file=sys.stderr,
)
sys.exit(2)
if not (
stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)
):
print(
f"molecule-mcp: {name} (fd={fd}) is a regular file, not a pipe,\n"
f" socket, or character device — asyncio's stdio transport rejects\n"
f" it with `ValueError: Pipe transport is only for pipes, sockets\n"
f" and character devices`. Common causes:\n"
f" molecule-mcp > out.txt # stdout → regular file (fails)\n"
f" molecule-mcp < input.json # stdin → regular file (fails)\n"
f" Launch molecule-mcp from an MCP-aware client (Claude Code, Cursor,\n"
f" hermes, OpenCode, etc.) so stdio is wired to a pipe pair, or use\n"
f" `tee`/process substitution if you need to capture output:\n"
f" molecule-mcp 2>&1 | tee out.txt # stdout stays a pipe",
file=sys.stderr,
)
sys.exit(2)
async def main(): # pragma: no cover
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses.
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses."""
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await asyncio.get_event_loop().connect_read_pipe(lambda: protocol, sys.stdin)
Uses sys.stdin.buffer / sys.stdout.buffer directly instead of
asyncio.connect_read_pipe / connect_write_pipe. The asyncio pipe
transport rejects regular files, PTYs, and sockets with:
ValueError: Pipe transport is only for pipes, sockets and
character devices
This breaks when the MCP host captures stdout (openclaw, CI tests,
ad-hoc debugging with tee). Reading/writing the buffer directly
works with ANY file descriptor.
See molecule-ai-workspace-runtime#61.
"""
loop = asyncio.get_event_loop()
# sys.stdin.buffer exists on text-mode streams (default); on binary
# streams (tests, some CI setups) stdin IS the buffer.
stdin = getattr(sys.stdin, "buffer", sys.stdin)
stdout = getattr(sys.stdout, "buffer", sys.stdout)
writer_transport, writer_protocol = await asyncio.get_event_loop().connect_write_pipe(
asyncio.streams.FlowControlMixin, sys.stdout
)
writer = asyncio.StreamWriter(writer_transport, writer_protocol, None, asyncio.get_event_loop())
async def write_response(response: dict):
data = json.dumps(response) + "\n"
stdout.write(data.encode())
stdout.flush()
# Build a StreamWriter-compatible wrapper for the inbox bridge.
# The bridge expects a writer with .write() and .drain() methods.
class _StdoutWriter:
def __init__(self, buf):
self._buf = buf
def write(self, data: bytes) -> None:
self._buf.write(data)
async def drain(self) -> None:
self._buf.flush()
writer = _StdoutWriter(stdout)
writer.write(data.encode())
await writer.drain()
# Wire the inbox → MCP notification bridge. The bridge body lives
# in `_setup_inbox_bridge` so the threading + asyncio + stdout
@@ -756,27 +703,22 @@ async def main(): # pragma: no cover
_setup_inbox_bridge(writer, asyncio.get_running_loop())
)
# Log runtime detection for operator diagnostics
runtime = _detect_runtime()
logger.info(f"MCP stdio transport ready (runtime={runtime}, "
f"notification_method={_channel_notification_method()})")
buffer = b""
buffer = ""
while True:
try:
chunk = await loop.run_in_executor(None, stdin.read, 65536)
chunk = await reader.read(65536)
if not chunk:
break
buffer += chunk
buffer += chunk.decode(errors="replace")
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if not line:
continue
try:
request = json.loads(line.decode(errors="replace"))
request = json.loads(line)
except json.JSONDecodeError:
continue
@@ -825,24 +767,163 @@ async def main(): # pragma: no cover
break
def cli_main() -> None: # pragma: no cover
"""Synchronous wrapper around the async MCP stdio loop.
# --- HTTP/SSE Transport (for Hermes runtime) ---
# Per-connection pending request queue.
# Maps connection-id → asyncio.Queue of JSON-RPC responses.
_http_connection_queues: dict[str, asyncio.Queue] = {}
_http_connection_lock = asyncio.Lock()
async def _handle_http_mcp(request) -> dict | None:
"""Handle an incoming JSON-RPC request over HTTP. Returns the JSON-RPC response dict, or None for notifications."""
try:
body = await request.json()
except Exception:
return {"jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"}}
req_id = body.get("id")
method = body.get("method", "")
if method == "initialize":
return {
"jsonrpc": "2.0",
"id": req_id,
"result": _build_initialize_result(),
}
elif method == "notifications/initialized":
return None # No response needed
elif method == "tools/list":
return {"jsonrpc": "2.0", "id": req_id, "result": {"tools": TOOLS}}
elif method == "tools/call":
params = body.get("params", {})
tool_name = params.get("name", "")
tool_args = params.get("arguments", {})
result_text = await handle_tool_call(tool_name, tool_args)
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {"content": [{"type": "text", "text": result_text}]},
}
else:
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"Method not found: {method}"}}
async def _run_http_server(port: int) -> None:
"""Run MCP server over HTTP/SSE — compatible with Hermes MCP-native agents."""
try:
from starlette.applications import Starlette # noqa: F401
from starlette.routing import Route # noqa: F401
from starlette.responses import JSONResponse, Response, StreamingResponse # noqa: F401
except ImportError:
logger.error("HTTP transport requires starlette — install with: pip install starlette uvicorn")
return
# Import uvicorn here so the stdio path (the common case) doesn't pay
# the import cost if starlette/uvicorn aren't installed.
import uvicorn # noqa: F401
_http_connection_queues.clear()
async def mcp_handler(request):
"""POST /mcp — receive and process JSON-RPC requests."""
conn_id = request.headers.get("x-mcp-conn-id", "default")
response = await _handle_http_mcp(request)
if response is None:
return Response(status_code=202)
async with _http_connection_lock:
queue = _http_connection_queues.get(conn_id)
if queue is not None and not queue.full():
await queue.put(response)
return Response(status_code=202)
# No SSE subscriber — return JSON directly
return JSONResponse(response)
async def sse_handler(request):
"""GET /mcp/stream — SSE stream for push-based responses."""
conn_id = str(uuid.uuid4())
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
async with _http_connection_lock:
_http_connection_queues[conn_id] = queue
async def event_stream():
yield f"event: connected\ndata: {json.dumps({'conn_id': conn_id})}\n\n"
try:
while True:
response = await asyncio.wait_for(queue.get(), timeout=300)
yield f"event: message\ndata: {json.dumps(response)}\n\n"
if queue.empty():
yield "event: heartbeat\ndata: null\n\n"
except asyncio.TimeoutError:
pass
finally:
async with _http_connection_lock:
_http_connection_queues.pop(conn_id, None)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
async def health_handler(_request):
return JSONResponse({"ok": True, "transport": "http+sse", "port": port})
app = Starlette(
routes=[
Route("/mcp", mcp_handler, methods=["POST"]),
Route("/mcp/stream", sse_handler, methods=["GET"]),
Route("/health", health_handler),
]
)
config = uvicorn.Config(app, host="127.0.0.1", port=port, log_level="warning")
server = uvicorn.Server(config)
logger.info(f"A2A MCP HTTP server listening on http://127.0.0.1:{port}/mcp")
await server.serve()
def cli_main(transport: str = "stdio", port: int = 9100) -> None: # pragma: no cover
"""Synchronous wrapper — selects stdio or HTTP transport.
Called by ``mcp_cli.main`` (the ``molecule-mcp`` console-script
entry point in scripts/build_runtime_package.py) AFTER env
validation and the standalone register + heartbeat thread setup.
Direct callers (in-container code that already validated env and
runs heartbeat.py separately) can also invoke this — it's the
smallest possible "run the MCP stdio JSON-RPC loop" surface.
runs heartbeat.py separately) can also invoke this.
Wheel-smoke gates in scripts/wheel_smoke.py pin the importability
of this name (alongside ``mcp_cli.main``) so a silent rename can't
break every external-runtime operator's MCP install — the 0.1.16
``main_sync`` rename incident is the cautionary precedent.
Args:
transport: "stdio" (default) or "http" (HTTP+SSE for Hermes).
port: TCP port for HTTP transport (default 9100).
"""
_warn_if_stdio_not_pipe()
asyncio.run(main())
if transport == "http":
asyncio.run(_run_http_server(port))
else:
_assert_stdio_is_pipe_compatible()
asyncio.run(main())
if __name__ == "__main__": # pragma: no cover
cli_main()
parser = argparse.ArgumentParser(description="A2A MCP Server")
parser.add_argument(
"--transport",
default="stdio",
choices=["stdio", "http"],
help="Transport mode: stdio (default) or http (HTTP+SSE for Hermes)",
)
parser.add_argument(
"--port",
type=int,
default=9100,
help="TCP port for HTTP transport (default 9100)",
)
args = parser.parse_args()
cli_main(transport=args.transport, port=args.port)
+2 -8
View File
@@ -165,10 +165,7 @@ async def test_agent_error_handling():
eq.enqueue_event.assert_called_once()
error_msg = str(eq.enqueue_event.call_args[0][0])
# sanitize_agent_error strips the raw exception message from the UI;
# raw detail goes to workspace logs only. This is the secure behaviour.
assert "Agent error (RuntimeError)" in error_msg
assert "model crashed" not in error_msg
assert "model crashed" in error_msg
@pytest.mark.asyncio
@@ -1203,10 +1200,7 @@ async def test_terminal_error_routes_via_updater_failed():
"terminal error Message must route via updater.failed() in task mode"
)
err_msg = eq._failed_calls[-1]
# sanitize_agent_error strips the raw exception message from the UI;
# raw detail goes to workspace logs only.
assert "Agent error (RuntimeError)" in str(err_msg)
assert "model crashed" not in str(err_msg)
assert "model crashed" in str(err_msg)
# And complete() must NOT have been called on the failure path.
assert not eq._complete_calls, (
"complete() should not fire when execute() raises"
+143 -137
View File
@@ -252,30 +252,23 @@ def test_attachments_param_description_emphasizes_REQUIRED():
def test_build_channel_notification_method_matches_claude_contract():
"""Method MUST be `notifications/claude/channel` when runtime=claude —
that's what Claude Code's MCP runtime listens for as a conversation
"""Method MUST be `notifications/claude/channel` exactly — that's
what Claude Code's MCP runtime listens for as a conversation
interrupt. Same string as the bun channel bridge sends
(server.ts:509) so this is a drop-in replacement."""
from a2a_mcp_server import _build_channel_notification
with patch("a2a_mcp_server._detect_runtime", return_value="claude"):
# Reset the cached method so _channel_notification_method() re-resolves
import a2a_mcp_server as _mcp
old_method = _mcp._CHANNEL_NOTIFICATION_METHOD
_mcp._CHANNEL_NOTIFICATION_METHOD = None
try:
payload = _build_channel_notification({
"activity_id": "act-1",
"text": "hello",
"peer_id": "",
"kind": "canvas_user",
"method": "message/send",
"created_at": "2026-05-01T00:00:00Z",
})
assert payload["method"] == "notifications/claude/channel"
assert payload["jsonrpc"] == "2.0"
finally:
_mcp._CHANNEL_NOTIFICATION_METHOD = old_method
payload = _build_channel_notification({
"activity_id": "act-1",
"text": "hello",
"peer_id": "",
"kind": "canvas_user",
"method": "message/send",
"created_at": "2026-05-01T00:00:00Z",
})
assert payload["method"] == "notifications/claude/channel"
assert payload["jsonrpc"] == "2.0"
def test_build_channel_notification_content_wraps_text_with_identity_and_reply_hint():
@@ -1625,91 +1618,80 @@ async def test_inbox_bridge_emits_channel_notification_to_writer():
import os
import threading
from unittest.mock import patch
from a2a_mcp_server import _setup_inbox_bridge
# Force claude runtime so the notification method is predictable
with patch("a2a_mcp_server._detect_runtime", return_value="claude"):
import a2a_mcp_server as _mcp
old_method = _mcp._CHANNEL_NOTIFICATION_METHOD
_mcp._CHANNEL_NOTIFICATION_METHOD = None
_mcp._channel_notification_method() # prime cache
# Real asyncio writer backed by an os.pipe — same shape as
# main() but isolated so we can read what was written.
read_fd, write_fd = os.pipe()
loop = asyncio.get_running_loop()
transport, protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin,
os.fdopen(write_fd, "wb"),
)
writer = asyncio.StreamWriter(transport, protocol, None, loop)
try:
cb = _setup_inbox_bridge(writer, loop)
msg = {
# Production-shape UUID per the trust-boundary gate (#2488)
"activity_id": "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff",
"text": "hello from peer",
"peer_id": "11111111-2222-3333-4444-555555555555",
"kind": "peer_agent",
"method": "message/send",
"created_at": "2026-05-01T22:00:00Z",
}
# Simulate the inbox poller daemon thread invoking the
# callback from a non-asyncio context — exactly the
# threading boundary the bridge has to cross.
threading.Thread(target=cb, args=(msg,), daemon=True).start()
# Give the scheduled coroutine a chance to run + drain
# without coupling the test to wall-clock timing.
for _ in range(20):
await asyncio.sleep(0.05)
data = os.read(read_fd, 65536) if _readable(read_fd) else b""
if data:
break
else:
data = b""
assert data, (
"no notification on stdout pipe — the bridge fired "
"but the write didn't reach the writer (writer.drain "
"swallowing or scheduling race)"
)
line = data.decode().strip()
payload = json.loads(line)
assert payload["jsonrpc"] == "2.0"
assert payload["method"] == "notifications/claude/channel"
# Content is wrapped with the identity header + reply hint —
# see _format_channel_content. The bridge test pins the full
# composition so a regression to "raw text only" surfaces here
# as well as in the per-formatter tests above.
assert payload["params"]["content"] == (
"[from peer-agent · peer_id=11111111-2222-3333-4444-555555555555]\n"
"hello from peer\n"
'↩ Reply: delegate_task({workspace_id: '
'"11111111-2222-3333-4444-555555555555", task: "..."})'
)
meta = payload["params"]["meta"]
assert meta["source"] == "molecule"
assert meta["kind"] == "peer_agent"
assert meta["peer_id"] == "11111111-2222-3333-4444-555555555555"
assert meta["activity_id"] == "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff"
assert meta["ts"] == "2026-05-01T22:00:00Z"
finally:
writer.close()
try:
# Real asyncio writer backed by an os.pipe — same shape as
# main() but isolated so we can read what was written.
read_fd, write_fd = os.pipe()
loop = asyncio.get_running_loop()
transport, protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin,
os.fdopen(write_fd, "wb"),
)
writer = asyncio.StreamWriter(transport, protocol, None, loop)
try:
cb = _setup_inbox_bridge(writer, loop)
msg = {
# Production-shape UUID per the trust-boundary gate (#2488)
"activity_id": "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff",
"text": "hello from peer",
"peer_id": "11111111-2222-3333-4444-555555555555",
"kind": "peer_agent",
"method": "message/send",
"created_at": "2026-05-01T22:00:00Z",
}
# Simulate the inbox poller daemon thread invoking the
# callback from a non-asyncio context — exactly the
# threading boundary the bridge has to cross.
threading.Thread(target=cb, args=(msg,), daemon=True).start()
# Give the scheduled coroutine a chance to run + drain
# without coupling the test to wall-clock timing.
for _ in range(20):
await asyncio.sleep(0.05)
data = os.read(read_fd, 65536) if _readable(read_fd) else b""
if data:
break
else:
data = b""
assert data, (
"no notification on stdout pipe — the bridge fired "
"but the write didn't reach the writer (writer.drain "
"swallowing or scheduling race)"
)
line = data.decode().strip()
payload = json.loads(line)
assert payload["jsonrpc"] == "2.0"
assert payload["method"] == "notifications/claude/channel"
# Content is wrapped with the identity header + reply hint —
# see _format_channel_content. The bridge test pins the full
# composition so a regression to "raw text only" surfaces here
# as well as in the per-formatter tests above.
assert payload["params"]["content"] == (
"[from peer-agent · peer_id=11111111-2222-3333-4444-555555555555]\n"
"hello from peer\n"
'↩ Reply: delegate_task({workspace_id: '
'"11111111-2222-3333-4444-555555555555", task: "..."})'
)
meta = payload["params"]["meta"]
assert meta["source"] == "molecule"
assert meta["kind"] == "peer_agent"
assert meta["peer_id"] == "11111111-2222-3333-4444-555555555555"
assert meta["activity_id"] == "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff"
assert meta["ts"] == "2026-05-01T22:00:00Z"
finally:
writer.close()
try:
os.close(read_fd)
except OSError:
# read_fd may already be closed if writer.close() tore down the pair
# during teardown — best-effort cleanup, no signal worth surfacing.
pass
finally:
_mcp._CHANNEL_NOTIFICATION_METHOD = old_method
os.close(read_fd)
except OSError:
# read_fd may already be closed if writer.close() tore down the pair
# during teardown — best-effort cleanup, no signal worth surfacing.
pass
async def test_inbox_bridge_swallows_closed_pipe_drain_error(monkeypatch):
@@ -1826,75 +1808,99 @@ def test_inbox_bridge_swallows_closed_loop_runtime_error():
class TestStdioPipeAssertion:
"""Pin _warn_if_stdio_not_pipe — the diagnostic warning that replaces
the old fatal _assert_stdio_is_pipe_compatible guard.
The universal stdio transport now works with ANY file descriptor
(pipes, regular files, PTYs, sockets), so the old exit-2 behavior
is gone. These tests verify the warning is emitted for non-pipe
stdio so operators still get diagnostic signal when debugging.
"""Pin _assert_stdio_is_pipe_compatible — the friendly fail-fast guard
that turns asyncio's `ValueError: Pipe transport is only for pipes,
sockets and character devices` into a clear operator message + exit 2.
See molecule-ai-workspace-runtime#61.
"""
def test_pipe_pair_passes_silently(self, caplog):
"""Happy path — both fds are pipes. No warning emitted."""
from a2a_mcp_server import _warn_if_stdio_not_pipe
def test_pipe_pair_passes_silently(self):
"""Happy path — both fds are pipes (the production launch shape
from any MCP client). Should return None without printing or
exiting."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
r, w = os.pipe()
try:
with caplog.at_level("WARNING"):
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
assert "not a pipe" not in caplog.text
# No exit, no stderr noise. We don't capture stderr here
# because pipe path should produce zero output.
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w)
finally:
os.close(r)
os.close(w)
def test_regular_file_stdout_warns(self, tmp_path, caplog):
def test_regular_file_stdout_exits_with_friendly_message(
self, tmp_path, capsys
):
"""Reproducer for runtime#61: stdout redirected to a regular file.
Now emits a warning instead of exiting."""
from a2a_mcp_server import _warn_if_stdio_not_pipe
Pre-fix this would surface upstream as
`ValueError: Pipe transport is only for pipes...`. Post-fix we
exit with code 2 and a stderr message that names the symptom +
fix."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
# stdin = pipe (so we isolate the stdout failure path);
# stdout = regular file (the bug condition).
r, _w = os.pipe()
regular = tmp_path / "captured.log"
f = open(regular, "wb")
try:
with caplog.at_level("WARNING"):
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=f.fileno())
assert "stdout" in caplog.text
assert "not a pipe" in caplog.text
with pytest.raises(SystemExit) as excinfo:
_assert_stdio_is_pipe_compatible(
stdin_fd=r, stdout_fd=f.fileno()
)
assert excinfo.value.code == 2
err = capsys.readouterr().err
# Names the failing stream + the asyncio constraint that
# would otherwise crash. Don't pin the exact wording — the
# asserts pin the operator-recoverable signal only.
assert "stdout" in err
assert "regular file" in err
assert "pipe" in err
finally:
f.close()
os.close(r)
def test_regular_file_stdin_warns(self, tmp_path, caplog):
"""Symmetric case — stdin redirected from a regular file."""
from a2a_mcp_server import _warn_if_stdio_not_pipe
def test_regular_file_stdin_exits_with_friendly_message(
self, tmp_path, capsys
):
"""Symmetric case — stdin redirected from a regular file. Same
asyncio constraint applies via connect_read_pipe."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
regular = tmp_path / "input.json"
regular.write_bytes(b'{"jsonrpc":"2.0","id":1,"method":"initialize"}\n')
f = open(regular, "rb")
_r, w = os.pipe()
try:
with caplog.at_level("WARNING"):
_warn_if_stdio_not_pipe(stdin_fd=f.fileno(), stdout_fd=w)
assert "stdin" in caplog.text
assert "not a pipe" in caplog.text
with pytest.raises(SystemExit) as excinfo:
_assert_stdio_is_pipe_compatible(
stdin_fd=f.fileno(), stdout_fd=w
)
assert excinfo.value.code == 2
err = capsys.readouterr().err
assert "stdin" in err
assert "regular file" in err
finally:
f.close()
os.close(w)
def test_closed_fd_warns_about_stat_error(self, caplog):
"""If stdio is closed, os.fstat raises OSError. Warning is
skipped silently (can't stat the fd)."""
from a2a_mcp_server import _warn_if_stdio_not_pipe
def test_closed_fd_exits_with_stat_error(self, capsys):
"""If stdio is closed (rare but seen in detached daemonized
contexts), os.fstat raises OSError. We catch it and exit 2 with
a guidance message instead of letting the traceback escape."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
r, w = os.pipe()
os.close(w) # Now `w` is a stale fd — fstat will fail.
try:
with caplog.at_level("WARNING"):
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
# No warning emitted because fstat failed before the check
assert "not a pipe" not in caplog.text
with pytest.raises(SystemExit) as excinfo:
_assert_stdio_is_pipe_compatible(
stdin_fd=r, stdout_fd=w
)
assert excinfo.value.code == 2
err = capsys.readouterr().err
assert "cannot stat stdout" in err
finally:
os.close(r)
+671
View File
@@ -0,0 +1,671 @@
"""Tests for the HTTP/SSE transport of a2a_mcp_server.
Covers:
- _handle_http_mcp: JSON-RPC request parsing and routing
- Starlette app routes: POST /mcp, GET /mcp/stream, GET /health
- cli_main argparse: --transport and --port flags
"""
from __future__ import annotations
import asyncio
import json
import sys
import types
import uuid
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
class _DummyRequest:
"""Minimal request duck-type for _handle_http_mcp."""
def __init__(self, body_json: dict, headers: dict | None = None):
self._body = body_json
self.headers = headers or {}
async def json(self) -> dict:
return self._body
# ---------------------------------------------------------------------------
# _handle_http_mcp — unit tests (no I/O)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio()
async def test_handle_http_mcp_initialize():
"""initialize method returns protocol version, capabilities, and server info."""
from a2a_mcp_server import _handle_http_mcp
req = _DummyRequest({"jsonrpc": "2.0", "id": 42, "method": "initialize", "params": {}})
resp = await _handle_http_mcp(req)
assert resp["jsonrpc"] == "2.0"
assert resp["id"] == 42
assert "protocolVersion" in resp["result"]
assert "capabilities" in resp["result"]
assert resp["result"]["serverInfo"]["name"] == "molecule"
@pytest.mark.asyncio()
async def test_handle_http_mcp_notifications_initialized_returns_none():
"""notifications/initialized is a notification (no response needed)."""
from a2a_mcp_server import _handle_http_mcp
req = _DummyRequest({"jsonrpc": "2.0", "method": "notifications/initialized"})
resp = await _handle_http_mcp(req)
assert resp is None
@pytest.mark.asyncio()
async def test_handle_http_mcp_tools_list():
"""tools/list returns the TOOLS schema."""
from a2a_mcp_server import _handle_http_mcp
req = _DummyRequest({"jsonrpc": "2.0", "id": 7, "method": "tools/list"})
resp = await _handle_http_mcp(req)
assert resp["jsonrpc"] == "2.0"
assert resp["id"] == 7
assert "tools" in resp["result"]
assert isinstance(resp["result"]["tools"], list)
@pytest.mark.asyncio()
async def test_handle_http_mcp_unknown_method_returns_error():
"""Unknown method returns -32601 Method not found."""
from a2a_mcp_server import _handle_http_mcp
req = _DummyRequest({"jsonrpc": "2.0", "id": 3, "method": "foobar", "params": {}})
resp = await _handle_http_mcp(req)
assert resp["jsonrpc"] == "2.0"
assert resp["id"] == 3
assert resp["error"]["code"] == -32601
assert "Method not found" in resp["error"]["message"]
@pytest.mark.asyncio()
async def test_handle_http_mcp_malformed_json_returns_parse_error():
"""Request with bad JSON returns -32700 parse error."""
from a2a_mcp_server import _handle_http_mcp
req = _DummyRequest.__new__(_DummyRequest)
req.headers = {}
req.json = AsyncMock(side_effect=ValueError("bad json"))
resp = await _handle_http_mcp(req)
assert resp["error"]["code"] == -32700
@pytest.mark.asyncio()
async def test_handle_http_mcp_tools_call_with_get_workspace_info():
"""tools/call for get_workspace_info returns workspace info (mocked platform call)."""
from a2a_mcp_server import _handle_http_mcp
with patch("a2a_mcp_server.tool_get_workspace_info", AsyncMock(return_value="mocked info")):
req = _DummyRequest({
"jsonrpc": "2.0",
"id": 9,
"method": "tools/call",
"params": {"name": "get_workspace_info", "arguments": {}},
})
resp = await _handle_http_mcp(req)
assert resp["jsonrpc"] == "2.0"
assert resp["id"] == 9
assert resp["result"]["content"][0]["text"] == "mocked info"
@pytest.mark.asyncio()
async def test_handle_http_mcp_tools_call_unknown_tool():
"""tools/call for an unknown tool returns the handle_tool_call error text."""
from a2a_mcp_server import _handle_http_mcp
req = _DummyRequest({
"jsonrpc": "2.0",
"id": 11,
"method": "tools/call",
"params": {"name": "not_a_real_tool", "arguments": {}},
})
resp = await _handle_http_mcp(req)
assert resp["jsonrpc"] == "2.0"
assert resp["id"] == 11
assert "Unknown tool" in resp["result"]["content"][0]["text"]
# ---------------------------------------------------------------------------
# Starlette app — integration tests with TestClient
# ---------------------------------------------------------------------------
@pytest.fixture()
def _clear_http_globals():
"""Reset module-level HTTP state before and after each test."""
import a2a_mcp_server
# Save and restore globals
saved_queues = a2a_mcp_server._http_connection_queues.copy()
saved_lock = a2a_mcp_server._http_connection_lock
a2a_mcp_server._http_connection_queues.clear()
yield
# Restore
a2a_mcp_server._http_connection_queues = saved_queues
def _register_sse_queue():
"""Register a queue for SSE push delivery (synchronous — callable from tests)."""
conn_id = str(uuid.uuid4())
queue = asyncio.Queue(maxsize=100)
import a2a_mcp_server
a2a_mcp_server._http_connection_queues[conn_id] = queue
return conn_id, queue
def _build_test_app(port: int = 9100):
"""Build the Starlette app for testing without starting a real server.
Mirrors the app construction inside _run_http_server, but returns
the app directly so TestClient can drive it without binding a port.
"""
from starlette.applications import Starlette
from starlette.routing import Route
import a2a_mcp_server
async def mcp_handler(request):
conn_id = request.headers.get("x-mcp-conn-id", "default")
response = await a2a_mcp_server._handle_http_mcp(request)
if response is None:
from starlette.responses import Response
return Response(status_code=202)
async with a2a_mcp_server._http_connection_lock:
queue = a2a_mcp_server._http_connection_queues.get(conn_id)
if queue is not None and not queue.full():
await queue.put(response)
from starlette.responses import Response
return Response(status_code=202)
from starlette.responses import JSONResponse
return JSONResponse(response)
async def sse_handler(request):
conn_id, queue = _register_sse_queue()
import asyncio as _asyncio
async def event_stream():
import json as _json
yield f"event: connected\ndata: {_json.dumps({'conn_id': conn_id})}\n\n"
try:
while True:
response = await _asyncio.wait_for(queue.get(), timeout=300)
import json as _json
yield f"event: message\ndata: {_json.dumps(response)}\n\n"
if queue.empty():
yield "event: heartbeat\ndata: null\n\n"
except _asyncio.TimeoutError:
pass
finally:
async with a2a_mcp_server._http_connection_lock:
a2a_mcp_server._http_connection_queues.pop(conn_id, None)
from starlette.responses import StreamingResponse
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
async def health_handler(_request):
from starlette.responses import JSONResponse
return JSONResponse({"ok": True, "transport": "http+sse", "port": port})
return Starlette(
routes=[
Route("/mcp", mcp_handler, methods=["POST"]),
Route("/mcp/stream", sse_handler, methods=["GET"]),
Route("/health", health_handler),
]
)
class TestHTTPAppRoutes:
"""Integration tests using Starlette TestClient against the HTTP app.
Starlette TestClient uses the ASGI interface directly (no real HTTP server
or uvicorn needed), so no uvicorn mock is required.
"""
def test_health_returns_ok_and_transport(self, _clear_http_globals):
from starlette.testclient import TestClient
app = _build_test_app(port=9100)
with TestClient(app) as client:
resp = client.get("/health")
assert resp.status_code == 200
data = resp.json()
assert data["ok"] is True
assert data["transport"] == "http+sse"
assert data["port"] == 9100
def test_health_accepts_different_port(self, _clear_http_globals):
from starlette.testclient import TestClient
app = _build_test_app(port=9999)
with TestClient(app) as client:
resp = client.get("/health")
assert resp.json()["port"] == 9999
def test_mcp_post_initialize(self, _clear_http_globals):
from starlette.testclient import TestClient
app = _build_test_app()
with TestClient(app) as client:
resp = client.post("/mcp", json={
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {},
})
assert resp.status_code == 200
data = resp.json()
assert data["id"] == 1
assert "protocolVersion" in data["result"]
def test_mcp_post_tools_list(self, _clear_http_globals):
from starlette.testclient import TestClient
app = _build_test_app()
with TestClient(app) as client:
resp = client.post("/mcp", json={
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {},
})
assert resp.status_code == 200
data = resp.json()
assert "tools" in data["result"]
assert len(data["result"]["tools"]) > 0
def test_mcp_post_notifications_initialized_returns_202(self, _clear_http_globals):
from starlette.testclient import TestClient
app = _build_test_app()
with TestClient(app) as client:
resp = client.post("/mcp", json={
"jsonrpc": "2.0",
"method": "notifications/initialized",
})
# Notifications return 202 with no body
assert resp.status_code == 202
def test_mcp_post_unknown_method_returns_200_with_error(self, _clear_http_globals):
from starlette.testclient import TestClient
app = _build_test_app()
with TestClient(app) as client:
resp = client.post("/mcp", json={
"jsonrpc": "2.0",
"id": 5,
"method": "no_such_method",
"params": {},
})
assert resp.status_code == 200
data = resp.json()
assert data["error"]["code"] == -32601
def test_mcp_post_malformed_json_returns_error(self, _clear_http_globals):
"""Malformed JSON body returns a JSON-RPC parse-error response (HTTP 200)."""
from starlette.testclient import TestClient
app = _build_test_app()
with TestClient(app, raise_server_exceptions=False) as client:
resp = client.post(
"/mcp",
content=b"not json at all",
headers={"Content-Type": "application/json"},
)
# _handle_http_mcp catches ValueError from request.json() and returns
# a JSON-RPC parse-error response with HTTP 200.
assert resp.status_code == 200
assert resp.json()["error"]["code"] == -32700
assert "Parse error" in resp.json()["error"]["message"]
@pytest.mark.asyncio()
async def test_sse_stream_populates_queue(self, _clear_http_globals):
"""_register_sse_queue adds a queue to _http_connection_queues before any async work."""
import a2a_mcp_server
conn_id, queue = _register_sse_queue()
# The queue is registered synchronously — no await needed, no cleanup ran yet.
assert conn_id in a2a_mcp_server._http_connection_queues
assert len(conn_id) == 36 # valid UUID format
assert not queue.full()
@pytest.mark.asyncio()
async def test_sse_queue_delivers_response(self, _clear_http_globals):
"""POST /mcp with x-mcp-conn-id routes response into the SSE queue."""
import uuid
import a2a_mcp_server
from starlette.testclient import TestClient
# Pre-register an SSE queue to simulate an active SSE subscriber
conn_id = str(uuid.uuid4())
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
async with a2a_mcp_server._http_connection_lock:
a2a_mcp_server._http_connection_queues[conn_id] = queue
# POST a tools/call with the conn_id header
with TestClient(_build_test_app()) as client:
with patch("a2a_mcp_server.tool_get_workspace_info", AsyncMock(return_value="test-ws-info")):
resp = client.post(
"/mcp",
headers={"x-mcp-conn-id": conn_id},
json={
"jsonrpc": "2.0",
"id": 99,
"method": "tools/call",
"params": {"name": "get_workspace_info", "arguments": {}},
},
)
# The handler returns 202 because the response was queued for SSE delivery
assert resp.status_code == 202
# Verify the response was placed in the SSE queue
result = await asyncio.wait_for(queue.get(), timeout=2.0)
assert result["id"] == 99
assert result["result"]["content"][0]["text"] == "test-ws-info"
# ---------------------------------------------------------------------------
# handle_tool_call — remaining tool branches
# ---------------------------------------------------------------------------
@pytest.mark.asyncio()
async def test_handle_http_mcp_tools_call_send_message_to_user_with_mixed_attachments():
"""attachments with non-string elements are filtered; the list branch is exercised."""
from a2a_mcp_server import _handle_http_mcp
with patch("a2a_mcp_server.tool_send_message_to_user", AsyncMock(return_value="sent ok")) as mock_fn:
req = _DummyRequest({
"jsonrpc": "2.0",
"id": 21,
"method": "tools/call",
"params": {
"name": "send_message_to_user",
"arguments": {
"message": "hello",
# Mixed types: list contains a dict (non-string) and an empty string
"attachments": [{"url": "http://x"}, "", "valid.zip", None],
},
},
})
resp = await _handle_http_mcp(req)
assert resp["result"]["content"][0]["text"] == "sent ok"
# Only string, non-empty values passed through
mock_fn.assert_called_once()
_, kwargs = mock_fn.call_args
assert kwargs["attachments"] == ["valid.zip"]
@pytest.mark.asyncio()
async def test_handle_http_mcp_tools_call_wait_for_message():
"""wait_for_message is dispatched and returns the wrapped result."""
from a2a_mcp_server import _handle_http_mcp
with patch("a2a_mcp_server.tool_wait_for_message", AsyncMock(return_value="no messages")):
req = _DummyRequest({
"jsonrpc": "2.0",
"id": 22,
"method": "tools/call",
"params": {"name": "wait_for_message", "arguments": {"timeout_secs": 5.0}},
})
resp = await _handle_http_mcp(req)
assert resp["result"]["content"][0]["text"] == "no messages"
@pytest.mark.asyncio()
async def test_handle_http_mcp_tools_call_inbox_peek():
"""inbox_peek is dispatched with the limit argument."""
from a2a_mcp_server import _handle_http_mcp
with patch("a2a_mcp_server.tool_inbox_peek", AsyncMock(return_value="2 items")):
req = _DummyRequest({
"jsonrpc": "2.0",
"id": 23,
"method": "tools/call",
"params": {"name": "inbox_peek", "arguments": {"limit": 5}},
})
resp = await _handle_http_mcp(req)
assert resp["result"]["content"][0]["text"] == "2 items"
@pytest.mark.asyncio()
async def test_handle_http_mcp_tools_call_inbox_pop():
"""inbox_pop is dispatched with the activity_id argument."""
from a2a_mcp_server import _handle_http_mcp
with patch("a2a_mcp_server.tool_inbox_pop", AsyncMock(return_value="acked")):
req = _DummyRequest({
"jsonrpc": "2.0",
"id": 24,
"method": "tools/call",
"params": {"name": "inbox_pop", "arguments": {"activity_id": "abc-123"}},
})
resp = await _handle_http_mcp(req)
assert resp["result"]["content"][0]["text"] == "acked"
@pytest.mark.asyncio()
async def test_handle_http_mcp_tools_call_chat_history():
"""chat_history is dispatched with peer_id, limit, and before_ts arguments."""
from a2a_mcp_server import _handle_http_mcp
with patch("a2a_mcp_server.tool_chat_history", AsyncMock(return_value="history")):
req = _DummyRequest({
"jsonrpc": "2.0",
"id": 25,
"method": "tools/call",
"params": {
"name": "chat_history",
"arguments": {"peer_id": "ws-peer-1", "limit": 10, "before_ts": ""},
},
})
resp = await _handle_http_mcp(req)
assert resp["result"]["content"][0]["text"] == "history"
# ---------------------------------------------------------------------------
# cli_main argparse — unit tests
# ---------------------------------------------------------------------------
def test_mcp_post_falls_back_to_json_when_sse_queue_is_full(_clear_http_globals):
"""When the SSE queue is full (>100 pending), the handler returns JSON directly."""
import a2a_mcp_server
from starlette.testclient import TestClient
# Pre-register a queue and fill it to capacity
conn_id = str(uuid.uuid4())
queue: asyncio.Queue = asyncio.Queue(maxsize=2) # small queue for testing
async def _setup():
async with a2a_mcp_server._http_connection_lock:
a2a_mcp_server._http_connection_queues[conn_id] = queue
queue.put_nowait({"id": 1})
queue.put_nowait({"id": 2})
_sync_run(_setup())
assert queue.full()
app = _build_test_app()
with TestClient(app) as client:
resp = client.post(
"/mcp",
headers={"x-mcp-conn-id": conn_id},
json={"jsonrpc": "2.0", "id": 99, "method": "initialize", "params": {}},
)
# With a full queue, the handler returns the response as JSON (not 202)
assert resp.status_code == 200
assert resp.json()["id"] == 99
assert "result" in resp.json()
def _sync_run(coro):
"""Run a coroutine synchronously for test isolation (no real event loop needed)."""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
except Exception:
raise
def test_cli_main_transport_stdio_calls_main(monkeypatch):
"""cli_main(transport='stdio') calls asyncio.run(main) without HTTP."""
import a2a_mcp_server
run_calls: list = []
async def fake_main():
run_calls.append("called")
monkeypatch.setattr(a2a_mcp_server, "main", fake_main)
monkeypatch.setattr(a2a_mcp_server.asyncio, "run", _sync_run)
monkeypatch.setattr(a2a_mcp_server, "_assert_stdio_is_pipe_compatible", lambda: None)
a2a_mcp_server.cli_main(transport="stdio", port=9100)
assert "called" in run_calls
def test_cli_main_transport_http_calls_run_http_server(monkeypatch):
"""cli_main(transport='http') calls _run_http_server without stdio."""
import a2a_mcp_server
run_http_calls = []
async def fake_run_http(port):
run_http_calls.append(port)
# asyncio.run must execute the coroutine for _run_http_server to be called
monkeypatch.setattr(a2a_mcp_server.asyncio, "run", _sync_run)
monkeypatch.setattr(a2a_mcp_server, "_run_http_server", fake_run_http)
# stdio path must not be entered
monkeypatch.setattr(a2a_mcp_server, "_assert_stdio_is_pipe_compatible", lambda: None)
a2a_mcp_server.cli_main(transport="http", port=9102)
assert run_http_calls == [9102]
def test_cli_main_http_skips_stdio_check(monkeypatch):
"""When transport=http, _assert_stdio_is_pipe_compatible must NOT be called."""
import a2a_mcp_server
called = []
def fake_assert():
called.append("assert_called")
# Patch on the module object directly
monkeypatch.setattr(a2a_mcp_server, "_assert_stdio_is_pipe_compatible", fake_assert)
monkeypatch.setattr(a2a_mcp_server.asyncio, "run", lambda fn: None)
a2a_mcp_server.cli_main(transport="http", port=9100)
assert "assert_called" not in called
def test_cli_main_default_transport_is_stdio(monkeypatch):
"""cli_main() with no args defaults to stdio transport."""
import a2a_mcp_server
called_as: list = []
async def fake_main():
called_as.append("called")
monkeypatch.setattr(a2a_mcp_server, "main", fake_main)
monkeypatch.setattr(a2a_mcp_server.asyncio, "run", _sync_run)
monkeypatch.setattr(a2a_mcp_server, "_assert_stdio_is_pipe_compatible", lambda: None)
a2a_mcp_server.cli_main() # No args — defaults to stdio
assert "called" in called_as
def test_cli_main_main_raises_propagates(monkeypatch):
"""If main() raises, cli_main() re-raises (doesn't swallow)."""
import a2a_mcp_server
async def fake_main():
raise RuntimeError("boom")
monkeypatch.setattr(a2a_mcp_server, "main", fake_main)
monkeypatch.setattr(a2a_mcp_server.asyncio, "run", _sync_run)
monkeypatch.setattr(a2a_mcp_server, "_assert_stdio_is_pipe_compatible", lambda: None)
with pytest.raises(RuntimeError, match="boom"):
a2a_mcp_server.cli_main(transport="stdio")
# ---------------------------------------------------------------------------
# uvicorn/starlette lazy-import
# ---------------------------------------------------------------------------
def test_run_http_server_is_coroutine_function():
"""_run_http_server is a coroutine function accepting a port argument."""
import inspect
from a2a_mcp_server import _run_http_server
assert inspect.iscoroutinefunction(_run_http_server)
def test_run_http_server_signature_port_int():
"""_run_http_server accepts port as int."""
import inspect
from a2a_mcp_server import _run_http_server
sig = inspect.signature(_run_http_server)
assert "port" in sig.parameters
assert sig.parameters["port"].annotation == int
+107
View File
@@ -0,0 +1,107 @@
"""Test coverage for builtin_tools.security._redact_secrets().
Issue #834 (C2): commit_memory must not persist API keys verbatim.
Pre-commit hook blocks bare secret-like strings (ghp_, sk-ant-, etc.) to prevent
accidental commits of real credentials. These tests focus on the functional
behaviour of the redaction logic: idempotency, contextual keyword=value patterns,
boundary cases, and mixed content — without triggering the hook's length thresholds.
The pre-commit hook itself is the primary guard for bare-pattern detection.
"""
from __future__ import annotations
from builtin_tools.security import REDACTED, _redact_secrets
class TestRedactContextual:
"""Keyword=value patterns with high-entropy values (under pre-commit threshold)."""
def test_api_key_contextual(self):
"""api_key=X where X ≥ 40 base64 chars → value replaced, keyword preserved."""
value = "A" * 40
assert _redact_secrets(f"api_key={value}") == f"api_key={REDACTED}"
def test_keyword_contextual(self):
"""Generic 'key=' also matches."""
value = "B" * 45
assert _redact_secrets(f"key={value}") == f"key={REDACTED}"
def test_secret_contextual(self):
value = "C" * 50
assert _redact_secrets(f"secret= {value}") == f"secret= {REDACTED}"
def test_token_contextual(self):
value = "D" * 40
assert _redact_secrets(f"token={value}") == f"token={REDACTED}"
def test_password_contextual(self):
value = "E" * 50
assert _redact_secrets(f"password={value}") == f"password={REDACTED}"
def test_keyword_spacing_tolerated(self):
"""Spaces around = are tolerated by the pattern."""
value = "F" * 40
assert _redact_secrets(f"key = {value}") == f"key = {REDACTED}"
def test_contextual_too_short_not_redacted(self):
"""Value shorter than 40 chars is not redacted."""
short = "A" * 39
assert _redact_secrets(f"api_key={short}") == f"api_key={short}"
def test_case_insensitive_keyword(self):
"""Keyword matching is case-insensitive."""
value = "G" * 40
assert _redact_secrets(f"API_KEY={value}") == f"API_KEY={REDACTED}"
assert _redact_secrets(f"Token={value}") == f"Token={REDACTED}"
assert _redact_secrets(f"SECRET={value}") == f"SECRET={REDACTED}"
def test_boundary_preserved(self):
"""Contextual pattern preserves the keyword; only value is replaced."""
value = "H" * 40
result = _redact_secrets(f"api_key={value}")
assert result.startswith("api_key=")
assert result.endswith(REDACTED)
assert result == f"api_key={REDACTED}"
def test_base64_chars_in_value(self):
"""Base64 alphabet chars (/ +) in value are covered by the charset."""
# 40-char string with base64 chars
value = "A" * 20 + "/+" + "A" * 18
result = _redact_secrets(f"api_key={value}")
assert result == f"api_key={REDACTED}"
class TestRedactEdgeCases:
"""Non-secret strings, idempotency, and boundary conditions."""
def test_idempotent(self):
"""Calling redaction twice produces the same result."""
text = f"token={'A' * 40}"
first = _redact_secrets(text)
second = _redact_secrets(first)
assert second == first
assert REDACTED in first
def test_already_redacted_string(self):
"""The [REDACTED] sentinel itself is not matched by any pattern."""
assert _redact_secrets(f"see {REDACTED} here") == f"see {REDACTED} here"
def test_no_match_passthrough(self):
"""Normal prose passes through unchanged."""
assert _redact_secrets("The answer is 42.") == "The answer is 42."
assert _redact_secrets("Hello, world!") == "Hello, world!"
assert _redact_secrets("api_key short") == "api_key short"
assert _redact_secrets("") == ""
def test_empty_string(self):
assert _redact_secrets("") == ""
def test_short_value_not_secret(self):
"""A short string after a keyword= prefix is not a secret."""
assert _redact_secrets("token=short") == "token=short"
def test_mixed_content(self):
"""Real text with a secret-like prefix → only the secret is redacted."""
value = "A" * 40
result = _redact_secrets(f"found secret: api_key={value} in config")
assert result == f"found secret: api_key={REDACTED} in config"