Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3e38a885a4 | |||
| 9f3948dc3a | |||
| c4deda1035 | |||
| 0dbda533fb |
@@ -1,165 +0,0 @@
|
||||
name: MCP Stdio Transport Regression
|
||||
|
||||
# Regression test for molecule-ai-workspace-runtime#61:
|
||||
# asyncio.connect_read_pipe / connect_write_pipe fail with
|
||||
# ValueError: "Pipe transport is only for pipes, sockets and character devices"
|
||||
# when stdout is a regular file (openclaw capture, CI tee, debugging).
|
||||
#
|
||||
# This workflow reproduces the exact failure mode and verifies the
|
||||
# fallback to direct buffer I/O works. It runs on every PR that
|
||||
# touches the MCP server or this workflow, plus nightly cron.
|
||||
#
|
||||
# Why a separate workflow (not folded into ci.yml python-lint):
|
||||
# - The test needs to spawn the MCP server with stdout redirected
|
||||
# to a regular file (not a TTY/pipe), which conflicts with
|
||||
# pytest's own capture mechanism.
|
||||
# - It exercises the actual process spawn path (python a2a_mcp_server.py)
|
||||
# not just unit-test mocks — closer to the real openclaw integration.
|
||||
# - A dedicated workflow surfaces stdio-specific regressions without
|
||||
# coupling to the broader Python test suite's coverage gate.
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches: [main, staging]
|
||||
paths:
|
||||
- 'workspace/a2a_mcp_server.py'
|
||||
- 'workspace/mcp_cli.py'
|
||||
- 'workspace/tests/test_a2a_mcp_server.py'
|
||||
- '.gitea/workflows/ci-mcp-stdio-transport.yml'
|
||||
push:
|
||||
branches: [main, staging]
|
||||
paths:
|
||||
- 'workspace/a2a_mcp_server.py'
|
||||
- 'workspace/mcp_cli.py'
|
||||
- 'workspace/tests/test_a2a_mcp_server.py'
|
||||
- '.gitea/workflows/ci-mcp-stdio-transport.yml'
|
||||
schedule:
|
||||
# Nightly at 04:00 UTC — catches drift from dependency updates
|
||||
# (e.g. asyncio behavior changes in new Python patch releases).
|
||||
- cron: '0 4 * * *'
|
||||
|
||||
concurrency:
|
||||
group: mcp-stdio-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
GITHUB_SERVER_URL: https://git.moleculesai.app
|
||||
|
||||
jobs:
|
||||
# bp-exempt: regression canary for runtime#61; not a merge gate — informational only until promoted to required.
|
||||
# mc#774: continue-on-error mask — new workflow, flip to false once it's green on ≥3 consecutive main runs.
|
||||
mcp-stdio-regular-file:
|
||||
name: MCP stdio with regular-file stdout
|
||||
runs-on: ubuntu-latest
|
||||
continue-on-error: true # mc#774
|
||||
timeout-minutes: 5
|
||||
env:
|
||||
WORKSPACE_ID: "00000000-0000-0000-0000-000000000001"
|
||||
defaults:
|
||||
run:
|
||||
working-directory: workspace
|
||||
steps:
|
||||
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
- uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
|
||||
with:
|
||||
python-version: '3.11'
|
||||
cache: pip
|
||||
cache-dependency-path: workspace/requirements.txt
|
||||
- run: pip install -r requirements.txt pytest pytest-asyncio pytest-cov
|
||||
|
||||
- name: Reproduce runtime#61 — stdout as regular file
|
||||
run: |
|
||||
set -euo pipefail
|
||||
echo "=== Reproducing molecule-ai-workspace-runtime#61 ==="
|
||||
echo ""
|
||||
echo "Before the fix, this command would fail with:"
|
||||
echo ' ValueError: Pipe transport is only for pipes, sockets and character devices'
|
||||
echo ""
|
||||
|
||||
# Spawn the MCP server with stdout redirected to a regular file.
|
||||
# This is exactly what openclaw does when capturing MCP output.
|
||||
OUTPUT=$(mktemp)
|
||||
trap 'rm -f "$OUTPUT"' EXIT
|
||||
|
||||
# Send initialize request, then tools/list, then exit
|
||||
{
|
||||
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}'
|
||||
echo '{"jsonrpc":"2.0","id":2,"method":"tools/list"}'
|
||||
} | python a2a_mcp_server.py > "$OUTPUT" 2>&1 || {
|
||||
RC=$?
|
||||
echo "FAIL: MCP server exited with code $RC"
|
||||
echo "--- stdout+stderr ---"
|
||||
cat "$OUTPUT"
|
||||
exit 1
|
||||
}
|
||||
|
||||
echo "PASS: MCP server handled regular-file stdout without crashing"
|
||||
echo ""
|
||||
echo "--- Output (first 20 lines) ---"
|
||||
head -20 "$OUTPUT"
|
||||
echo ""
|
||||
|
||||
# Verify we got valid JSON-RPC responses
|
||||
if grep -q '"result"' "$OUTPUT"; then
|
||||
echo "PASS: JSON-RPC responses found in output"
|
||||
else
|
||||
echo "FAIL: No JSON-RPC responses in output"
|
||||
cat "$OUTPUT"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
- name: Reproduce runtime#61 — stdin from regular file
|
||||
run: |
|
||||
set -euo pipefail
|
||||
echo "=== stdin as regular file (CI tee / capture pattern) ==="
|
||||
|
||||
INPUT=$(mktemp)
|
||||
OUTPUT=$(mktemp)
|
||||
trap 'rm -f "$INPUT" "$OUTPUT"' EXIT
|
||||
|
||||
cat > "$INPUT" <<'EOF'
|
||||
{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}
|
||||
{"jsonrpc":"2.0","id":2,"method":"tools/list"}
|
||||
EOF
|
||||
|
||||
python a2a_mcp_server.py < "$INPUT" > "$OUTPUT" 2>&1 || {
|
||||
RC=$?
|
||||
echo "FAIL: MCP server exited with code $RC"
|
||||
cat "$OUTPUT"
|
||||
exit 1
|
||||
}
|
||||
|
||||
echo "PASS: MCP server handled regular-file stdin without crashing"
|
||||
|
||||
if grep -q '"result"' "$OUTPUT"; then
|
||||
echo "PASS: JSON-RPC responses found in output"
|
||||
else
|
||||
echo "FAIL: No JSON-RPC responses in output"
|
||||
cat "$OUTPUT"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
- name: Verify warning is emitted for non-pipe stdio
|
||||
run: |
|
||||
set -euo pipefail
|
||||
echo "=== Verify diagnostic warning ==="
|
||||
|
||||
OUTPUT=$(mktemp)
|
||||
trap 'rm -f "$OUTPUT"' EXIT
|
||||
|
||||
{
|
||||
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}'
|
||||
} | python a2a_mcp_server.py > "$OUTPUT" 2>&1
|
||||
|
||||
# The warning should mention "not a pipe" for operator visibility
|
||||
if grep -qi "not a pipe" "$OUTPUT"; then
|
||||
echo "PASS: Diagnostic warning emitted for non-pipe stdio"
|
||||
else
|
||||
echo "NOTE: No warning in output (may be suppressed by log level)"
|
||||
fi
|
||||
|
||||
- name: Run unit tests for stdio transport
|
||||
run: |
|
||||
set -euo pipefail
|
||||
echo "=== Running stdio transport unit tests ==="
|
||||
python -m pytest tests/test_a2a_mcp_server.py::TestStdioPipeAssertion -v --no-cov
|
||||
@@ -66,19 +66,27 @@ jobs:
|
||||
# PR#372's ci.yml port used. Diffs against the PR base or the
|
||||
# previous push SHA, then matches against the wheel-relevant
|
||||
# path set.
|
||||
BASE="${GITHUB_BASE_REF:-${{ github.event.before }}}"
|
||||
#
|
||||
# Root fix (mc#917): Gitea Actions does not expose github.event.before
|
||||
# as a ${{ }} template-expression that resolves in shell scripts for
|
||||
# push events (it becomes empty string). The env var GITHUB_EVENT_BEFORE
|
||||
# IS set by the runner for push events. Guard git cat-file with
|
||||
# `timeout 30` to prevent indefinite hangs on malformed BASE values.
|
||||
if [ "${{ github.event_name }}" = "pull_request" ] && [ -n "${{ github.event.pull_request.base.sha }}" ]; then
|
||||
BASE="${{ github.event.pull_request.base.sha }}"
|
||||
else
|
||||
BASE="${GITHUB_EVENT_BEFORE:-}"
|
||||
fi
|
||||
if [ -z "$BASE" ] || echo "$BASE" | grep -qE '^0+$'; then
|
||||
# New branch or no previous SHA: treat as wheel-relevant.
|
||||
echo "wheel=true" >> "$GITHUB_OUTPUT"
|
||||
exit 0
|
||||
fi
|
||||
if ! git cat-file -e "$BASE" 2>/dev/null; then
|
||||
if ! timeout 30 git cat-file -e "$BASE" 2>/dev/null; then
|
||||
git fetch --depth=1 origin "$BASE" 2>/dev/null || true
|
||||
fi
|
||||
if ! git cat-file -e "$BASE" 2>/dev/null; then
|
||||
if ! timeout 30 git cat-file -e "$BASE" 2>/dev/null; then
|
||||
echo "::notice::BASE=$BASE not in local clone (shallow fetch or pruned ref)"
|
||||
echo "wheel=true" >> "$GITHUB_OUTPUT"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
@@ -28,16 +28,15 @@
|
||||
#
|
||||
# Environment variables:
|
||||
# SOP_DEBUG=1 — per-API-call diagnostic lines. Default: off.
|
||||
# SOP_LEGACY_CHECK=1 — revert to OR-gate for this run. Intended for
|
||||
# emergency use only; burn-in window closed
|
||||
# 2026-05-17 (internal#189 Phase 1).
|
||||
# SOP_LEGACY_CHECK=1 — revert to OR-gate for this run. Grace window
|
||||
# for PRs in-flight when AND-composition deployed.
|
||||
# Burn-in: remove after 2026-05-17 (7-day window).
|
||||
#
|
||||
# BURN-IN CLOSED 2026-05-17 (internal#189 Phase 1): The 7-day burn-in
|
||||
# window closed. continue-on-error: true has been removed from the
|
||||
# tier-check job; AND-composition is now fully enforced. If you need
|
||||
# to temporarily re-introduce a mask, file a tracker and follow the
|
||||
# mc#774 protocol (Tier 2e lint requires a current tracker within
|
||||
# 2 lines of any continue-on-error: true).
|
||||
# BURN-IN NOTE (internal#189 Phase 1): continue-on-error: true is set on
|
||||
# the tier-check job below. This prevents AND-composition from blocking
|
||||
# PRs during the 7-day burn-in. After 2026-05-17:
|
||||
# 1. Remove `continue-on-error: true` from this job block.
|
||||
# 2. Update this BURN-IN NOTE comment to mark the window closed.
|
||||
|
||||
name: sop-tier-check
|
||||
|
||||
@@ -64,6 +63,10 @@ on:
|
||||
jobs:
|
||||
tier-check:
|
||||
runs-on: ubuntu-latest
|
||||
# BURN-IN: continue-on-error prevents AND-composition from blocking
|
||||
# PRs during the 7-day window. Remove after 2026-05-17 (mc#774).
|
||||
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
|
||||
continue-on-error: true
|
||||
permissions:
|
||||
contents: read
|
||||
pull-requests: read
|
||||
|
||||
@@ -80,7 +80,6 @@ export function CreateWorkspaceButton() {
|
||||
// isExternal is true the template / model / hermes-provider fields are
|
||||
// hidden (they're meaningless for BYO-compute agents).
|
||||
const [isExternal, setIsExternal] = useState(false);
|
||||
const [externalRuntime, setExternalRuntime] = useState("external");
|
||||
const [externalConnection, setExternalConnection] =
|
||||
useState<ExternalConnectionInfo | null>(null);
|
||||
|
||||
@@ -224,7 +223,6 @@ export function CreateWorkspaceButton() {
|
||||
setBudgetLimit("");
|
||||
setError(null);
|
||||
setHermesProvider("anthropic");
|
||||
setExternalRuntime("external");
|
||||
setHermesApiKey("");
|
||||
setHermesModel("");
|
||||
api
|
||||
@@ -284,7 +282,7 @@ export function CreateWorkspaceButton() {
|
||||
// Runtime=external flips the backend into awaiting-agent mode:
|
||||
// no container provisioning, token minted, connection payload
|
||||
// returned in the response for the modal below.
|
||||
...(isExternal ? { runtime: externalRuntime } : {}),
|
||||
...(isExternal ? { runtime: "external" } : {}),
|
||||
...(!isExternal && isHermes && provider
|
||||
? {
|
||||
secrets: { [provider.envVar]: hermesApiKey.trim() },
|
||||
@@ -384,23 +382,6 @@ export function CreateWorkspaceButton() {
|
||||
</div>
|
||||
</label>
|
||||
|
||||
{isExternal && (
|
||||
<div>
|
||||
<label className="text-[11px] text-ink-mid block mb-1">
|
||||
External Runtime
|
||||
</label>
|
||||
<select
|
||||
value={externalRuntime}
|
||||
onChange={(e) => setExternalRuntime(e.target.value)}
|
||||
className="w-full bg-surface-card/60 border border-line/50 rounded-lg px-3 py-2 text-sm text-ink focus:outline-none focus:border-accent/60 focus:ring-1 focus:ring-accent/20 transition-colors"
|
||||
>
|
||||
<option value="external">Generic External</option>
|
||||
<option value="kimi">Kimi CLI</option>
|
||||
<option value="kimi-cli">Kimi CLI (alt)</option>
|
||||
</select>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{!isExternal && (
|
||||
<InputField
|
||||
label="Template"
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
import { useCallback, useState } from "react";
|
||||
import * as Dialog from "@radix-ui/react-dialog";
|
||||
|
||||
type Tab = "python" | "curl" | "claude" | "mcp" | "hermes" | "codex" | "openclaw" | "kimi" | "fields";
|
||||
type Tab = "python" | "curl" | "claude" | "mcp" | "hermes" | "codex" | "openclaw" | "fields";
|
||||
|
||||
export interface ExternalConnectionInfo {
|
||||
workspace_id: string;
|
||||
@@ -58,10 +58,6 @@ export interface ExternalConnectionInfo {
|
||||
// openclaw gateway on loopback. Outbound-tools-only today; push
|
||||
// parity on an external openclaw needs a sessions.steer bridge.
|
||||
openclaw_snippet?: string;
|
||||
// Kimi CLI setup snippet — self-contained Python heartbeat script
|
||||
// that keeps a Kimi workspace online in poll mode. Optional for
|
||||
// backward compat with platforms that haven't shipped the Kimi tab.
|
||||
kimi_snippet?: string;
|
||||
}
|
||||
|
||||
interface Props {
|
||||
@@ -154,11 +150,6 @@ export function ExternalConnectModal({ info, onClose }: Props) {
|
||||
'WORKSPACE_TOKEN="<paste from create response>"',
|
||||
`WORKSPACE_TOKEN="${info.auth_token}"`,
|
||||
);
|
||||
// Kimi snippet carries the placeholder inside the shell heredoc.
|
||||
const filledKimi = info.kimi_snippet?.replace(
|
||||
'MOLECULE_WORKSPACE_TOKEN=<paste from create response>',
|
||||
`MOLECULE_WORKSPACE_TOKEN=${info.auth_token}`,
|
||||
);
|
||||
|
||||
return (
|
||||
<Dialog.Root open onOpenChange={(o) => !o && onClose()}>
|
||||
@@ -198,7 +189,6 @@ export function ExternalConnectModal({ info, onClose }: Props) {
|
||||
if (filledHermes) tabs.push("hermes");
|
||||
if (filledCodex) tabs.push("codex");
|
||||
if (filledOpenClaw) tabs.push("openclaw");
|
||||
if (filledKimi) tabs.push("kimi");
|
||||
tabs.push("curl", "fields");
|
||||
return tabs;
|
||||
})().map((t) => (
|
||||
@@ -222,8 +212,6 @@ export function ExternalConnectModal({ info, onClose }: Props) {
|
||||
? "Codex"
|
||||
: t === "openclaw"
|
||||
? "OpenClaw"
|
||||
: t === "kimi"
|
||||
? "Kimi"
|
||||
: t === "python"
|
||||
? "Python SDK"
|
||||
: t === "mcp"
|
||||
@@ -300,15 +288,6 @@ export function ExternalConnectModal({ info, onClose }: Props) {
|
||||
onCopy={() => copy(filledOpenClaw, "openclaw")}
|
||||
/>
|
||||
)}
|
||||
{tab === "kimi" && filledKimi && (
|
||||
<SnippetBlock
|
||||
value={filledKimi}
|
||||
label="Kimi CLI — self-contained Python bridge. Registers, heartbeats, polls for canvas messages, and echoes replies back. NAT-safe (no public URL). Run in a background terminal or via launchd."
|
||||
copyKey="kimi"
|
||||
copied={copiedKey === "kimi"}
|
||||
onCopy={() => copy(filledKimi, "kimi")}
|
||||
/>
|
||||
)}
|
||||
{tab === "fields" && (
|
||||
<div className="space-y-2">
|
||||
<Field label="workspace_id" value={info.workspace_id} onCopy={() => copy(info.workspace_id, "wsid")} copied={copiedKey === "wsid"} />
|
||||
|
||||
@@ -9,7 +9,6 @@ import { Tooltip } from "@/components/Tooltip";
|
||||
import { STATUS_CONFIG, TIER_CONFIG } from "@/lib/design-tokens";
|
||||
import { useOrgDeployState } from "@/components/canvas/useOrgDeployState";
|
||||
import { OrgCancelButton } from "@/components/canvas/OrgCancelButton";
|
||||
import { isExternalLikeRuntime } from "@/lib/externalRuntimes";
|
||||
|
||||
/** Descendant count for the "N sub" badge — children are first-class nodes
|
||||
* rendered as full cards inside this one via React Flow's native parentId,
|
||||
@@ -249,7 +248,7 @@ export function WorkspaceNode({ id, data }: NodeProps<Node<WorkspaceNodeData>>)
|
||||
if (!runtime) return null;
|
||||
return (
|
||||
<div className="mb-1 flex items-center gap-1">
|
||||
{isExternalLikeRuntime(runtime) ? (
|
||||
{runtime === "external" ? (
|
||||
<span
|
||||
className="text-[7px] font-mono px-1.5 py-0.5 rounded-md text-white bg-violet-600 border border-violet-700"
|
||||
title="Phase 30 remote agent — runs outside this platform's Docker network. Lifecycle managed via heartbeat-based polling, not Docker exec."
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
* itself (MemoryInspectorPanel) requires full API + store mocking and
|
||||
* is exercised by the existing MemoryTab.test.tsx.
|
||||
*/
|
||||
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
|
||||
import { describe, it, expect } from "vitest";
|
||||
import { isPluginUnavailableError, formatTTL } from "../MemoryInspectorPanel";
|
||||
|
||||
// formatRelativeTime is not exported — tested via the component in MemoryTab.test.tsx
|
||||
@@ -47,9 +47,6 @@ describe("isPluginUnavailableError", () => {
|
||||
});
|
||||
|
||||
describe("formatTTL", () => {
|
||||
beforeEach(() => { vi.useFakeTimers(); });
|
||||
afterEach(() => { vi.useRealTimers(); });
|
||||
|
||||
it("returns '' for null", () => {
|
||||
expect(formatTTL(null)).toBe("");
|
||||
});
|
||||
|
||||
@@ -20,7 +20,6 @@ import { MobileMe } from "./MobileMe";
|
||||
import { MobileSpawn } from "./MobileSpawn";
|
||||
import { usePalette } from "./palette";
|
||||
import { MobileAccentProvider } from "./palette-context";
|
||||
import { SearchDialog } from "@/components/SearchDialog";
|
||||
|
||||
type Route = "home" | "canvas" | "detail" | "chat" | "comms" | "me";
|
||||
|
||||
@@ -205,8 +204,6 @@ export function MobileApp() {
|
||||
{showTabBar && <TabBar dark={dark} active={activeTab} onChange={onTabChange} />}
|
||||
|
||||
{showSpawn && <MobileSpawn dark={dark} onClose={() => setShowSpawn(false)} />}
|
||||
|
||||
<SearchDialog />
|
||||
</main>
|
||||
</MobileAccentProvider>
|
||||
);
|
||||
|
||||
@@ -17,7 +17,6 @@ import {
|
||||
usePalette,
|
||||
} from "./palette";
|
||||
import { Icons, StatusDot, TierChip } from "./primitives";
|
||||
import { isExternalLikeRuntime } from "@/lib/externalRuntimes";
|
||||
|
||||
// Derived view-model the mobile screens consume. Built once per render
|
||||
// from the store's Node<WorkspaceNodeData>.
|
||||
@@ -38,7 +37,7 @@ export interface MobileAgent {
|
||||
export function toMobileAgent(node: Node<WorkspaceNodeData>): MobileAgent {
|
||||
const cap = summarizeWorkspaceCapabilities(node.data);
|
||||
const runtime = cap.runtime ?? "unknown";
|
||||
const remote = isExternalLikeRuntime(runtime);
|
||||
const remote = runtime === "external";
|
||||
return {
|
||||
id: node.id,
|
||||
name: node.data.name || node.id,
|
||||
|
||||
@@ -13,7 +13,6 @@ import {
|
||||
findProviderForModel,
|
||||
type SelectorValue,
|
||||
} from "../ProviderModelSelector";
|
||||
import { isExternalLikeRuntime } from "@/lib/externalRuntimes";
|
||||
|
||||
interface Props {
|
||||
workspaceId: string;
|
||||
@@ -176,7 +175,7 @@ function deriveProvidersFromModels(models: ModelSpec[]): string[] {
|
||||
// exactly the point of the platform adaptor. The deep `~/.hermes/
|
||||
// config.yaml` on the container is a separate runtime-internal file,
|
||||
// not this one.
|
||||
const RUNTIMES_WITH_OWN_CONFIG = new Set<string>(["external", "kimi", "kimi-cli"]);
|
||||
const RUNTIMES_WITH_OWN_CONFIG = new Set<string>(["external"]);
|
||||
|
||||
const FALLBACK_RUNTIME_OPTIONS: RuntimeOption[] = [
|
||||
{ value: "", label: "LangGraph (default)", models: [], providers: [] },
|
||||
@@ -1004,7 +1003,7 @@ export function ConfigTab({ workspaceId }: Props) {
|
||||
: "This runtime manages its own config outside the platform template."}
|
||||
</div>
|
||||
)}
|
||||
{!error && isExternalLikeRuntime(config.runtime) && (
|
||||
{!error && config.runtime === "external" && (
|
||||
<ExternalConnectionSection workspaceId={workspaceId} />
|
||||
)}
|
||||
{success && (
|
||||
|
||||
@@ -9,7 +9,6 @@ import { FileEditor } from "./FilesTab/FileEditor";
|
||||
import { NotAvailablePanel } from "./FilesTab/NotAvailablePanel";
|
||||
import { useFilesApi } from "./FilesTab/useFilesApi";
|
||||
import { buildTree } from "./FilesTab/tree";
|
||||
import { isExternalLikeRuntime } from "@/lib/externalRuntimes";
|
||||
|
||||
// Re-exports preserved for external imports (e.g. tests importing from `../tabs/FilesTab`)
|
||||
export { buildTree } from "./FilesTab/tree";
|
||||
@@ -33,6 +32,8 @@ interface Props {
|
||||
* has no platform-owned filesystem. Otherwise the user loses access to
|
||||
* a real surface (e.g. claude-code SaaS workspaces have files served
|
||||
* by ListFiles via EIC; they belong on the rendering path, not here). */
|
||||
const RUNTIMES_WITHOUT_FILES = new Set(["external"]);
|
||||
|
||||
export function FilesTab({ workspaceId, data }: Props) {
|
||||
// Early-return for runtimes whose filesystem is not platform-owned.
|
||||
// Skips the whole useFilesApi hook + tree render below — without this,
|
||||
@@ -42,7 +43,7 @@ export function FilesTab({ workspaceId, data }: Props) {
|
||||
// "0 files / No config files yet" reads as a bug. The placeholder
|
||||
// makes the absence intentional and points the user at the right
|
||||
// surface (Chat).
|
||||
if (data && isExternalLikeRuntime(data.runtime)) {
|
||||
if (data && RUNTIMES_WITHOUT_FILES.has(data.runtime)) {
|
||||
return <NotAvailablePanel runtime={data.runtime} />;
|
||||
}
|
||||
return <PlatformOwnedFilesTab workspaceId={workspaceId} />;
|
||||
|
||||
@@ -13,7 +13,6 @@ interface Props {
|
||||
}
|
||||
|
||||
import { deriveWsBaseUrl } from "@/lib/ws-url";
|
||||
import { isExternalLikeRuntime } from "@/lib/externalRuntimes";
|
||||
|
||||
const WS_URL = deriveWsBaseUrl();
|
||||
|
||||
@@ -88,6 +87,8 @@ function NotAvailablePanel({ runtime }: { runtime: string }) {
|
||||
/** Runtimes that don't expose a TTY. Keep narrow — only add a runtime
|
||||
* here when its provisioner genuinely has no shell endpoint, otherwise
|
||||
* the user loses access to a real debugging surface. */
|
||||
const RUNTIMES_WITHOUT_TERMINAL = new Set(["external"]);
|
||||
|
||||
export function TerminalTab({ workspaceId, data }: Props) {
|
||||
// Early-return for runtimes that have no shell. Skips the entire
|
||||
// xterm + WebSocket dance below — without this, mounting the tab
|
||||
@@ -95,7 +96,7 @@ export function TerminalTab({ workspaceId, data }: Props) {
|
||||
// workspace-server (no /ws/terminal/<id> route registered for it),
|
||||
// and shows "Connection failed" with a Reconnect button — confusing
|
||||
// because the workspace IS healthy, just doesn't have a TTY.
|
||||
if (data && isExternalLikeRuntime(data.runtime)) {
|
||||
if (data && RUNTIMES_WITHOUT_TERMINAL.has(data.runtime)) {
|
||||
return <NotAvailablePanel runtime={data.runtime} />;
|
||||
}
|
||||
|
||||
|
||||
@@ -58,7 +58,6 @@ const SAMPLE_INFO = {
|
||||
hermes_channel_snippet: "# hermes ws=ws-test",
|
||||
codex_snippet: "# codex ws=ws-test",
|
||||
openclaw_snippet: "# openclaw ws=ws-test",
|
||||
kimi_snippet: "# kimi ws=ws-test",
|
||||
};
|
||||
|
||||
describe("ExternalConnectionSection", () => {
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
/**
|
||||
* External-like (BYO-compute) runtime detection.
|
||||
*
|
||||
* Mirrors the backend's isExternalLikeRuntime() in
|
||||
* workspace-server/internal/handlers/runtime_registry.go.
|
||||
*
|
||||
* These runtimes have no platform-owned container — the operator installs
|
||||
* the agent CLI locally and calls /registry/register. They share UX
|
||||
* behaviour: no Files tab, no Terminal tab, no Docker config, and the
|
||||
* connection modal shows copy-paste snippets.
|
||||
*/
|
||||
|
||||
const EXTERNAL_LIKE_RUNTIMES = new Set([
|
||||
"external",
|
||||
"kimi",
|
||||
"kimi-cli",
|
||||
]);
|
||||
|
||||
export function isExternalLikeRuntime(runtime: string | undefined): boolean {
|
||||
return !!runtime && EXTERNAL_LIKE_RUNTIMES.has(runtime);
|
||||
}
|
||||
@@ -9,8 +9,6 @@ const RUNTIME_NAMES: Record<string, string> = {
|
||||
openclaw: "OpenClaw",
|
||||
crewai: "CrewAI",
|
||||
autogen: "AutoGen",
|
||||
kimi: "Kimi",
|
||||
"kimi-cli": "Kimi CLI",
|
||||
};
|
||||
|
||||
export function runtimeDisplayName(runtime: string): string {
|
||||
|
||||
@@ -1,132 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Staging E2E for MCP stdio transport (runtime#61 regression).
|
||||
#
|
||||
# Verifies that the MCP server in the claude-code workspace image
|
||||
# handles stdout redirected to a regular file — the exact failure
|
||||
# mode openclaw hits when capturing MCP output.
|
||||
#
|
||||
# Required env:
|
||||
# MOLECULE_CP_URL default: https://staging-api.moleculesai.app
|
||||
# MOLECULE_ADMIN_TOKEN CP admin bearer (Railway CP_ADMIN_API_TOKEN)
|
||||
#
|
||||
# Optional env:
|
||||
# E2E_KEEP_ORG 1 → skip teardown (debugging only)
|
||||
# E2E_RUN_ID Slug suffix; CI: ${GITHUB_RUN_ID}
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
CP_URL="${MOLECULE_CP_URL:-https://staging-api.moleculesai.app}"
|
||||
ADMIN_TOKEN="${MOLECULE_ADMIN_TOKEN:?MOLEC…OKEN required — Railway staging CP_ADMIN_API_TOKEN}"
|
||||
RUN_ID_SUFFIX="${E2E_RUN_ID:-$(date +%H%M%S)-$$}"
|
||||
|
||||
SLUG="e2e-mcp-$(date +%Y%m%d)-${RUN_ID_SUFFIX}"
|
||||
SLUG=$(echo "$SLUG" | tr '[:upper:]' '[:lower:]' | tr -cd 'a-z0-9-' | head -c 32)
|
||||
|
||||
log() { echo "[$(date +%H:%M:%S)] $*"; }
|
||||
fail() { echo "[$(date +%H:%M:%S)] ❌ $*" >&2; exit 1; }
|
||||
ok() { echo "[$(date +%H:%M:%S)] ✅ $*"; }
|
||||
|
||||
CURL_COMMON=(-sS --fail-with-body --max-time 30)
|
||||
|
||||
# ─── cleanup trap ───────────────────────────────────────────────────────
|
||||
CLEANUP_DONE=0
|
||||
cleanup_org() {
|
||||
local _entry_rc=$?
|
||||
if [ "$CLEANUP_DONE" = "1" ]; then return 0; fi
|
||||
CLEANUP_DONE=1
|
||||
|
||||
if [ "${E2E_KEEP_ORG:-0}" = "1" ]; then
|
||||
log "E2E_KEEP_ORG=1 → leaving $SLUG behind for inspection"
|
||||
return 0
|
||||
fi
|
||||
|
||||
log "Cleanup: deleting tenant $SLUG..."
|
||||
curl "${CURL_COMMON[@]}" --max-time 120 -X DELETE "$CP_URL/cp/admin/tenants/$SLUG" \
|
||||
-H "Authorization: Bearer $ADMIN_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"confirm\":\"$SLUG\"}" >/dev/null 2>&1 \
|
||||
&& ok "Teardown request accepted" \
|
||||
|| log "Teardown returned non-2xx (may already be gone)"
|
||||
}
|
||||
trap cleanup_org EXIT
|
||||
|
||||
# ─── provision tenant ───────────────────────────────────────────────────
|
||||
log "Provisioning tenant $SLUG..."
|
||||
# shellcheck disable=SC2034 # response body unused; --fail-with-body handles errors
|
||||
TENANT=$(curl "${CURL_COMMON[@]}" -X POST "$CP_URL/cp/admin/orgs" \
|
||||
-H "Authorization: Bearer $ADMIN_TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{\"slug\":\"$SLUG\",\"name\":\"MCP Stdio E2E $SLUG\"}")
|
||||
ok "Tenant provisioned"
|
||||
|
||||
# ─── get tenant admin token ─────────────────────────────────────────────
|
||||
log "Fetching tenant admin token..."
|
||||
for _ in $(seq 1 30); do
|
||||
TOKEN_RESP=$(curl -sS --max-time 10 "$CP_URL/cp/admin/orgs/$SLUG/admin-token" \
|
||||
-H "Authorization: Bearer $ADMIN_TOKEN" 2>/dev/null || echo '{}')
|
||||
TOKEN=$(echo "$TOKEN_RESP" | python3 -c "import sys,json; print(json.load(sys.stdin).get('admin_token',''))" 2>/dev/null || echo "")
|
||||
[ -n "$TOKEN" ] && break
|
||||
sleep 2
|
||||
done
|
||||
[ -n "$TOKEN" ] || fail "Could not retrieve tenant admin token"
|
||||
ok "Tenant admin token obtained"
|
||||
|
||||
# ─── create claude-code workspace ───────────────────────────────────────
|
||||
log "Creating claude-code workspace..."
|
||||
WS=$(curl "${CURL_COMMON[@]}" -X POST "$CP_URL/workspaces" \
|
||||
-H "Authorization: Bearer $TOKEN" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"name":"MCP Stdio Test","role":"Test","runtime":"claude-code","tier":1}')
|
||||
WS_ID=$(echo "$WS" | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])")
|
||||
ok "Workspace created: $WS_ID"
|
||||
|
||||
# ─── wait for online ────────────────────────────────────────────────────
|
||||
log "Waiting for workspace to come online (up to 120s)..."
|
||||
for _ in $(seq 1 24); do
|
||||
STATUS=$(curl -sS --max-time 10 "$CP_URL/workspaces/$WS_ID" \
|
||||
-H "Authorization: Bearer $TOKEN" 2>/dev/null \
|
||||
| python3 -c "import sys,json; print(json.load(sys.stdin).get('status',''))" 2>/dev/null || echo "")
|
||||
[ "$STATUS" = "online" ] && break
|
||||
sleep 5
|
||||
done
|
||||
[ "$STATUS" = "online" ] || fail "Workspace did not come online (status=$STATUS)"
|
||||
ok "Workspace online"
|
||||
|
||||
# ─── get workspace container info ───────────────────────────────────────
|
||||
log "Fetching workspace runtime info..."
|
||||
RUNTIME_INFO=$(curl -sS --max-time 10 "$CP_URL/workspaces/$WS_ID" \
|
||||
-H "Authorization: Bearer $TOKEN" 2>/dev/null)
|
||||
CONTAINER_ID=$(echo "$RUNTIME_INFO" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('container_id',''))" 2>/dev/null || echo "")
|
||||
[ -n "$CONTAINER_ID" ] || fail "No container_id in workspace response"
|
||||
ok "Container ID: $CONTAINER_ID"
|
||||
|
||||
# ─── MCP stdio transport test ───────────────────────────────────────────
|
||||
log "Testing MCP stdio transport with regular-file stdout..."
|
||||
|
||||
OUTPUT=$(mktemp)
|
||||
trap 'rm -f "$OUTPUT"; cleanup_org' EXIT
|
||||
|
||||
# Send initialize + tools/list via stdin, capture stdout to regular file
|
||||
{
|
||||
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{}}'
|
||||
echo '{"jsonrpc":"2.0","id":2,"method":"tools/list"}'
|
||||
} | docker exec -i -e WORKSPACE_ID="$WS_ID" "$CONTAINER_ID" \
|
||||
python -m molecule_runtime.a2a_mcp_server > "$OUTPUT" 2>&1 || {
|
||||
RC=$?
|
||||
log "MCP server exited with code $RC (expected for stdin EOF)"
|
||||
}
|
||||
|
||||
if grep -q '"result"' "$OUTPUT"; then
|
||||
ok "MCP server handles regular-file stdout"
|
||||
else
|
||||
fail "MCP server did not produce JSON-RPC result. Output:\n$(head -20 "$OUTPUT")"
|
||||
fi
|
||||
|
||||
if grep -q '"tools"' "$OUTPUT"; then
|
||||
ok "MCP tools/list returns tools"
|
||||
else
|
||||
fail "MCP tools/list did not return tools. Output:\n$(head -20 "$OUTPUT")"
|
||||
fi
|
||||
|
||||
# ─── summary ────────────────────────────────────────────────────────────
|
||||
log "All tests passed ✅"
|
||||
@@ -162,7 +162,7 @@ func (h *WorkspaceHandler) handleA2ADispatchError(ctx context.Context, workspace
|
||||
func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspaceID string) bool {
|
||||
var wsRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsRuntime)
|
||||
if isExternalLikeRuntime(wsRuntime) {
|
||||
if wsRuntime == "external" {
|
||||
return false
|
||||
}
|
||||
if !h.HasProvisioner() {
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
|
||||
@@ -361,7 +361,7 @@ func (h *DelegationHandler) executeDelegation(ctx context.Context, sourceID, tar
|
||||
// pause + second attempt catches the common restart-race case where
|
||||
// the first attempt sees a stale 127.0.0.1:<ephemeral> URL from a
|
||||
// container that was just recreated.
|
||||
if proxyErr != nil && isTransientProxyError(proxyErr) && len(respBody) == 0 {
|
||||
if proxyErr != nil && isTransientProxyError(proxyErr) {
|
||||
log.Printf("Delegation %s: first attempt failed (%s) — retrying in %s after reactive URL refresh",
|
||||
delegationID, proxyErr.Error(), delegationRetryDelay)
|
||||
select {
|
||||
|
||||
@@ -5,10 +5,8 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -958,316 +956,3 @@ func TestInsertDelegationOutcome_ZeroValueIsUnknown(t *testing.T) {
|
||||
t.Errorf("insertOutcomeUnknown must not collide with insertOK")
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== executeDelegation — delivery-confirmed proxy error regression tests ====================
|
||||
//
|
||||
// These test the fix for issue #159: when proxyA2ARequest returns an error but we have a
|
||||
// non-empty response body with a 2xx status code, executeDelegation must treat it as success.
|
||||
// The error is a delivery/transport error (e.g., connection reset after response was received).
|
||||
// Previously, executeDelegation marked these as "failed" even though the work was done,
|
||||
// causing retry storms and "error" rendering in canvas despite the response being available.
|
||||
//
|
||||
// Test strategy: spin up a mock A2A agent server, set up the source/target DB rows, call
|
||||
// executeDelegation directly, and verify the activity_logs status and delegation status.
|
||||
|
||||
const testDelegationID = "del-159-test"
|
||||
const testSourceID = "ws-source-159"
|
||||
const testTargetID = "ws-target-159"
|
||||
|
||||
// expectExecuteDelegationBase sets up sqlmock expectations for the DB queries that
|
||||
// executeDelegation always makes, regardless of outcome.
|
||||
func expectExecuteDelegationBase(mock sqlmock.Sqlmock) {
|
||||
// updateDelegationStatus: dispatched
|
||||
// Uses prefix match — sqlmock regexes match the full query string.
|
||||
mock.ExpectExec("UPDATE activity_logs SET status").
|
||||
WithArgs("dispatched", "", testSourceID, testDelegationID).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// CanCommunicate: getWorkspaceRef(source) + getWorkspaceRef(target).
|
||||
// Both are root-level workspaces (parent_id=NULL) → root-level siblings → allowed.
|
||||
mock.ExpectQuery("SELECT id, parent_id FROM workspaces WHERE id = ").
|
||||
WithArgs(testSourceID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(testSourceID, nil))
|
||||
mock.ExpectQuery("SELECT id, parent_id FROM workspaces WHERE id = ").
|
||||
WithArgs(testTargetID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "parent_id"}).AddRow(testTargetID, nil))
|
||||
|
||||
// resolveAgentURL: test callers always set the URL in Redis (mr.Set ws:{id}:url),
|
||||
// so resolveAgentURL gets a cache hit and never falls back to DB.
|
||||
}
|
||||
|
||||
// expectExecuteDelegationSuccess sets up expectations for a completed delegation.
|
||||
// Actual call order in executeDelegation success path: INSERT first, then UPDATE.
|
||||
// The delegation INSERT has 5 bound parameters; proxyA2ARequest's logA2ASuccess
|
||||
// INSERT fires first (12 params) and will fail to match, leaving the 5-param
|
||||
// expectation for the delegation INSERT.
|
||||
func expectExecuteDelegationSuccess(mock sqlmock.Sqlmock, respBody string) {
|
||||
// INSERT activity_logs for delegation completion ('completed' is a SQL literal, not a param)
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// updateDelegationStatus: completed
|
||||
mock.ExpectExec("UPDATE activity_logs SET status").
|
||||
WithArgs("completed", "", testSourceID, testDelegationID).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
}
|
||||
|
||||
// expectExecuteDelegationFailed sets up expectations for a failed delegation.
|
||||
// Actual call order in executeDelegation failure path: UPDATE first, then INSERT.
|
||||
func expectExecuteDelegationFailed(mock sqlmock.Sqlmock) {
|
||||
// updateDelegationStatus: failed (fires before the INSERT in the failure path)
|
||||
mock.ExpectExec("UPDATE activity_logs SET status").
|
||||
WithArgs("failed", sqlmock.AnyArg(), testSourceID, testDelegationID).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// INSERT activity_logs for delegation failure ('failed' is a SQL literal, not a param)
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
}
|
||||
|
||||
// TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess is the primary regression
|
||||
// test for issue #159. The scenario:
|
||||
// - Attempt 1: server sends 200 OK headers + partial body, then closes connection.
|
||||
// proxyA2ARequest: body read gets io.EOF (partial body read), returns (200, <partial>, BadGateway).
|
||||
// isTransientProxyError(BadGateway) = TRUE → retry.
|
||||
// - Attempt 2: server does the same thing (closes after partial body).
|
||||
// proxyA2ARequest: same (200, <partial>, BadGateway).
|
||||
// isTransientProxyError(BadGateway) = TRUE → retry AGAIN (but outer context will fire soon,
|
||||
// or we get one more attempt). For the test we let it run.
|
||||
// POST-FIX: the executeDelegation new condition sees status=200, body=<partial>, err!=nil
|
||||
// and routes to handleSuccess immediately.
|
||||
//
|
||||
// The key pre/post-fix difference: pre-fix, executeDelegation received status=0 (hardcoded)
|
||||
// even when the server sent 200, so the condition always failed. Post-fix, status=200 is
|
||||
// preserved through the error return path (proxyA2ARequest now returns resp.StatusCode, respBody).
|
||||
// In this test the retry ultimately succeeds (server eventually sends full body), but
|
||||
// the critical assertion is that a 2xx partial-body delivery-confirmed response is never
|
||||
// classified as "failed" — it always routes to success.
|
||||
func TestExecuteDelegation_DeliveryConfirmedProxyError_TreatsAsSuccess(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
allowLoopbackForTest(t)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
dh := NewDelegationHandler(wh, broadcaster)
|
||||
|
||||
// Server that sends a 200 response with declared Content-Length but closes
|
||||
// the connection before sending all bytes. Go's http.Client sees io.EOF on
|
||||
// the body read. proxyA2ARequest captures the partial body + status=200 and
|
||||
// returns (200, <partial>, error). executeDelegation's new condition sees
|
||||
// status=200 + body > 0 + error != nil → routes to handleSuccess.
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
defer ln.Close()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
conn, err := ln.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
// Consume the HTTP request
|
||||
buf := make([]byte, 2048)
|
||||
conn.Read(buf)
|
||||
// Send 200 OK with Content-Length: 100 but only 74 bytes of body
|
||||
// (less than declared length → io.LimitReader returns io.EOF after reading all 74)
|
||||
resp := "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n"
|
||||
resp += `{"result":{"parts":[{"text":"work completed successfully"}]}}` // 74 bytes
|
||||
conn.Write([]byte(resp))
|
||||
// Close immediately — client gets io.EOF on body read
|
||||
}()
|
||||
|
||||
agentURL := "http://" + ln.Addr().String()
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL)
|
||||
allowLoopbackForTest(t)
|
||||
|
||||
expectExecuteDelegationBase(mock)
|
||||
expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"work completed successfully"}]}}`)
|
||||
|
||||
// Execute synchronously (not as a goroutine) so we can check DB state immediately.
|
||||
// The handler fires it as goroutine; we call it directly for deterministic testing.
|
||||
a2aBody, _ := json.Marshal(map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": "1",
|
||||
"method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"parts": []map[string]string{{"type": "text", "text": "do work"}},
|
||||
},
|
||||
},
|
||||
})
|
||||
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
|
||||
|
||||
time.Sleep(100 * time.Millisecond) // let DB writes settle
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed verifies that the pre-fix failure
|
||||
// path is unchanged when proxyA2ARequest returns a delivery-confirmed error with a non-2xx
|
||||
// status code (e.g., 500 Internal Server Error with partial body read before connection drop).
|
||||
// The new condition requires status >= 200 && status < 300, so non-2xx always routes to failure.
|
||||
func TestExecuteDelegation_ProxyErrorNon2xx_RemainsFailed(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
allowLoopbackForTest(t)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
dh := NewDelegationHandler(wh, broadcaster)
|
||||
|
||||
// Server returns 500 with declared Content-Length but closes connection early.
|
||||
// proxyA2ARequest: reads 500 headers, partial body, then connection drop → body read error.
|
||||
// Returns (500, <partial_body>, BadGateway).
|
||||
// New condition: status=500 is NOT >= 200 && < 300 → routes to failure.
|
||||
// isTransientProxyError(500) = false → no retry.
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
defer ln.Close()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
conn, err := ln.Accept()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
buf := make([]byte, 2048)
|
||||
conn.Read(buf)
|
||||
// 500 with Content-Length: 100 but only ~60 bytes of body
|
||||
resp := "HTTP/1.1 500 Internal Server Error\r\nContent-Type: application/json\r\nContent-Length: 100\r\n\r\n"
|
||||
resp += `{"error":"agent crashed"}` // ~24 bytes, less than declared
|
||||
conn.Write([]byte(resp))
|
||||
// Close immediately — client gets io.EOF on body read
|
||||
}()
|
||||
|
||||
agentURL := "http://" + ln.Addr().String()
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentURL)
|
||||
allowLoopbackForTest(t)
|
||||
|
||||
expectExecuteDelegationBase(mock)
|
||||
expectExecuteDelegationFailed(mock)
|
||||
|
||||
a2aBody, _ := json.Marshal(map[string]interface{}{
|
||||
"jsonrpc": "2.0", "id": "1", "method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"parts": []map[string]string{{"type": "text", "text": "do work"}},
|
||||
},
|
||||
},
|
||||
})
|
||||
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed verifies that the pre-fix failure
|
||||
// path is unchanged when proxyA2ARequest returns an error with a 2xx status but empty body.
|
||||
// The new condition requires len(respBody) > 0, so empty body routes to failure.
|
||||
func TestExecuteDelegation_ProxyErrorEmptyBody_RemainsFailed(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
allowLoopbackForTest(t)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
dh := NewDelegationHandler(wh, broadcaster)
|
||||
|
||||
// Server returns 502 Bad Gateway — proxyA2ARequest returns 502, body="" (empty), error != nil.
|
||||
// New condition: proxyErr != nil && len(respBody) > 0 && status >= 200 && status < 300
|
||||
// → len(respBody) == 0 → condition FALSE → falls through to failure.
|
||||
// isTransientProxyError(502) is TRUE → retry → same result → failure.
|
||||
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
// No body — connection closes normally
|
||||
}))
|
||||
defer agentServer.Close()
|
||||
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL)
|
||||
allowLoopbackForTest(t)
|
||||
|
||||
// executeDelegationBase: UPDATE dispatched + CanCommunicate SELECTs
|
||||
expectExecuteDelegationBase(mock)
|
||||
// The retry (isTransientProxyError && len(respBody)==0) fires after delegationRetryDelay,
|
||||
// re-uses the Redis-cached URL — no extra DB calls before the failure path.
|
||||
// Failure: UPDATE failed + INSERT (failed status is a SQL literal, 5 bound params)
|
||||
expectExecuteDelegationFailed(mock)
|
||||
|
||||
a2aBody, _ := json.Marshal(map[string]interface{}{
|
||||
"jsonrpc": "2.0", "id": "1", "method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"parts": []map[string]string{{"type": "text", "text": "do work"}},
|
||||
},
|
||||
},
|
||||
})
|
||||
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestExecuteDelegation_CleanProxyResponse_Unchanged verifies that a clean proxy response
|
||||
// (no error, 200 with body) is unaffected by the new condition. This is the baseline:
|
||||
// proxyErr == nil so the new condition never fires.
|
||||
func TestExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
allowLoopbackForTest(t)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
dh := NewDelegationHandler(wh, broadcaster)
|
||||
|
||||
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write([]byte(`{"result":{"parts":[{"text":"all good"}]}}`))
|
||||
}))
|
||||
defer agentServer.Close()
|
||||
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", testTargetID), agentServer.URL)
|
||||
allowLoopbackForTest(t)
|
||||
|
||||
expectExecuteDelegationBase(mock)
|
||||
expectExecuteDelegationSuccess(mock, `{"result":{"parts":[{"text":"all good"}]}}`)
|
||||
|
||||
a2aBody, _ := json.Marshal(map[string]interface{}{
|
||||
"jsonrpc": "2.0", "id": "1", "method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"parts": []map[string]string{{"type": "text", "text": "do work"}},
|
||||
},
|
||||
},
|
||||
})
|
||||
dh.executeDelegation(testSourceID, testTargetID, testDelegationID, a2aBody)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,7 +136,7 @@ func discoverWorkspacePeer(ctx context.Context, c *gin.Context, callerID, target
|
||||
// lives on the other side of the wire and needs the URL as-is
|
||||
// (localhost rewrites wouldn't resolve from its host anyway).
|
||||
// Phase 30.6.
|
||||
if isExternalLikeRuntime(wsRuntime) {
|
||||
if wsRuntime == "external" {
|
||||
if handled := writeExternalWorkspaceURL(ctx, c, callerID, targetID, wsName); handled {
|
||||
return
|
||||
}
|
||||
@@ -181,7 +181,7 @@ func writeExternalWorkspaceURL(ctx context.Context, c *gin.Context, callerID, ta
|
||||
outURL := wsURL
|
||||
var callerRuntime string
|
||||
db.DB.QueryRowContext(ctx, `SELECT COALESCE(runtime,'langgraph') FROM workspaces WHERE id = $1`, callerID).Scan(&callerRuntime)
|
||||
if !isExternalLikeRuntime(callerRuntime) {
|
||||
if callerRuntime != "external" {
|
||||
outURL = strings.Replace(outURL, "127.0.0.1", "host.docker.internal", 1)
|
||||
outURL = strings.Replace(outURL, "localhost", "host.docker.internal", 1)
|
||||
}
|
||||
|
||||
@@ -50,7 +50,6 @@ func BuildExternalConnectionPayload(platformURL, workspaceID, authToken string)
|
||||
"hermes_channel_snippet": stamp(externalHermesChannelTemplate),
|
||||
"codex_snippet": stamp(externalCodexTemplate),
|
||||
"openclaw_snippet": stamp(externalOpenClawTemplate),
|
||||
"kimi_snippet": stamp(externalKimiTemplate),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -490,149 +489,6 @@ codex
|
||||
// external openclaw would need a sessions.steer bridge daemon (the
|
||||
// equivalent of hermes-channel-molecule for openclaw). Tracked
|
||||
// separately; outbound tools is the first cut.
|
||||
// externalKimiTemplate — complete poll-based external setup for Kimi CLI.
|
||||
// Includes register + heartbeat + inbound activity polling + reply via
|
||||
// /notify. No public URL needed (NAT-safe). Operators paste once and run
|
||||
// in a background terminal or via launchd.
|
||||
const externalKimiTemplate = `# Kimi CLI external setup — register + heartbeat + inbound poll + reply.
|
||||
# For operators whose external agent is a Kimi CLI session.
|
||||
# No public URL needed; runs behind NAT in poll mode.
|
||||
|
||||
# 1. Install the workspace runtime wheel (provides HTTP client):
|
||||
pip install molecule-ai-workspace-runtime
|
||||
|
||||
# 2. Save credentials and the bridge script:
|
||||
mkdir -p ~/.molecule-ai/kimi-workspace
|
||||
chmod 700 ~/.molecule-ai/kimi-workspace
|
||||
cat > ~/.molecule-ai/kimi-workspace/env <<'EOF'
|
||||
WORKSPACE_ID={{WORKSPACE_ID}}
|
||||
PLATFORM_URL={{PLATFORM_URL}}
|
||||
MOLECULE_WORKSPACE_TOKEN=<paste from create response>
|
||||
EOF
|
||||
chmod 600 ~/.molecule-ai/kimi-workspace/env
|
||||
|
||||
cat > ~/.molecule-ai/kimi-workspace/kimi_bridge.py <<'PYEOF'
|
||||
#!/usr/bin/env python3
|
||||
"""Kimi bridge — keeps workspace online and polls for canvas messages."""
|
||||
import json, logging, time
|
||||
from pathlib import Path
|
||||
import httpx
|
||||
|
||||
ENV = Path.home() / ".molecule-ai" / "kimi-workspace" / "env"
|
||||
HEARTBEAT_INTERVAL = 20
|
||||
POLL_INTERVAL = 5
|
||||
|
||||
def load_env():
|
||||
env = {}
|
||||
for line in ENV.read_text().splitlines():
|
||||
if "=" in line and not line.startswith("#"):
|
||||
k, v = line.split("=", 1)
|
||||
env[k.strip()] = v.strip()
|
||||
return env
|
||||
|
||||
def hdrs(url, token):
|
||||
return {"Authorization": f"Bearer {token}", "Origin": url, "Content-Type": "application/json"}
|
||||
|
||||
def register(client, url, ws, tok):
|
||||
r = client.post(f"{url}/registry/register", json={
|
||||
"id": ws, "url": "", "agent_card": {"name": "mac-laptop-kimi", "skills": []},
|
||||
"delivery_mode": "poll",
|
||||
}, headers=hdrs(url, tok))
|
||||
r.raise_for_status()
|
||||
logging.info("registered %s", ws)
|
||||
|
||||
def heartbeat(client, url, ws, tok, start):
|
||||
r = client.post(f"{url}/registry/heartbeat", json={
|
||||
"workspace_id": ws, "error_rate": 0.0, "sample_error": "",
|
||||
"active_tasks": 0, "current_task": "", "uptime_seconds": int(time.time() - start),
|
||||
}, headers=hdrs(url, tok))
|
||||
r.raise_for_status()
|
||||
|
||||
def poll_inbound(client, url, ws, tok, since_id):
|
||||
params = {"since_secs": "30", "limit": "50"}
|
||||
if since_id:
|
||||
params["since_id"] = since_id
|
||||
r = client.get(f"{url}/workspaces/{ws}/activity", params=params, headers=hdrs(url, tok))
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def send_reply(client, url, ws, tok, text):
|
||||
r = client.post(f"{url}/workspaces/{ws}/notify", json={"message": text}, headers=hdrs(url, tok))
|
||||
r.raise_for_status()
|
||||
logging.info("reply sent: %s", text[:80])
|
||||
|
||||
def extract_user_text(item):
|
||||
"""Pull the user message text from an activity log request_body."""
|
||||
try:
|
||||
body = item.get("request_body") or {}
|
||||
parts = body.get("params", {}).get("message", {}).get("parts", [])
|
||||
return " ".join(p.get("text", "") for p in parts if p.get("text"))
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||||
start = time.time()
|
||||
since_id = ""
|
||||
last_beat = 0
|
||||
while True:
|
||||
try:
|
||||
e = load_env()
|
||||
purl, ws, tok = e["PLATFORM_URL"], e["WORKSPACE_ID"], e["MOLECULE_WORKSPACE_TOKEN"]
|
||||
with httpx.Client(timeout=10.0) as c:
|
||||
# Heartbeat every HEARTBEAT_INTERVAL seconds
|
||||
if time.time() - last_beat >= HEARTBEAT_INTERVAL:
|
||||
register(c, purl, ws, tok)
|
||||
heartbeat(c, purl, ws, tok, start)
|
||||
last_beat = time.time()
|
||||
|
||||
# Poll for new canvas messages
|
||||
items = poll_inbound(c, purl, ws, tok, since_id)
|
||||
for item in items:
|
||||
since_id = item["id"]
|
||||
src = item.get("source_id")
|
||||
method = item.get("method") or ""
|
||||
# Skip our own /notify replies and agent-originated traffic
|
||||
if method == "notify" or src is not None:
|
||||
continue
|
||||
text = extract_user_text(item)
|
||||
if text:
|
||||
logging.info("INBOUND from canvas: %s", text)
|
||||
# Replace the echo below with your own logic:
|
||||
send_reply(c, purl, ws, tok, f"Echo: {text}")
|
||||
time.sleep(POLL_INTERVAL)
|
||||
except Exception as exc:
|
||||
logging.warning("loop failed: %s", exc)
|
||||
time.sleep(5)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
PYEOF
|
||||
chmod +x ~/.molecule-ai/kimi-workspace/kimi_bridge.py
|
||||
|
||||
# 3. Start the bridge (run in a persistent terminal or via launchd):
|
||||
python3 ~/.molecule-ai/kimi-workspace/kimi_bridge.py
|
||||
|
||||
# What the script does:
|
||||
# • Registers the workspace in poll mode (no public URL needed)
|
||||
# • Heartbeats every 20s to keep STATUS = online on the canvas
|
||||
# • Polls /workspaces/:id/activity every 5s for new canvas messages
|
||||
# • Echo-replies via POST /workspaces/:id/notify
|
||||
#
|
||||
# To change the reply logic, edit the send_reply() call inside the loop.
|
||||
# To send a one-off reply from another terminal:
|
||||
# curl -fsS -X POST "{{PLATFORM_URL}}/workspaces/{{WORKSPACE_ID}}/notify" \
|
||||
# -H "Authorization: Bearer $(cat ~/.molecule-ai/kimi-workspace/env | grep TOKEN | cut -d= -f2)" \
|
||||
# -H "Content-Type: application/json" \
|
||||
# -d '{"message":"Hello from Kimi"}'
|
||||
#
|
||||
# For push-mode inbound A2A (instead of polling), pair with the Python SDK
|
||||
# tab — but that requires a public HTTPS endpoint (ngrok / Cloudflare Tunnel).
|
||||
#
|
||||
# Need help?
|
||||
# Documentation: https://doc.moleculesai.app/docs/guides/external-agent-registration
|
||||
`
|
||||
|
||||
const externalOpenClawTemplate = `# OpenClaw MCP config — outbound tool path. For operators whose
|
||||
# external agent is an openclaw session.
|
||||
#
|
||||
|
||||
@@ -62,7 +62,7 @@ func (h *WorkspaceHandler) RotateExternalCredentials(c *gin.Context) {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
|
||||
return
|
||||
}
|
||||
if !isExternalLikeRuntime(runtime) {
|
||||
if runtime != "external" {
|
||||
// Rotating a hermes/claude-code workspace's bearer would not
|
||||
// just break the ssh-EIC tunnel auth on the platform side — it
|
||||
// would also leave the workspace's in-container heartbeat with
|
||||
@@ -73,9 +73,9 @@ func (h *WorkspaceHandler) RotateExternalCredentials(c *gin.Context) {
|
||||
// here so the canvas can show "rotate is for external workspaces;
|
||||
// click Restart instead" rather than silently corrupting state.
|
||||
c.JSON(http.StatusBadRequest, gin.H{
|
||||
"error": "rotate is only valid for external/BYO-compute workspaces",
|
||||
"error": "rotate is only valid for runtime=external workspaces",
|
||||
"runtime": runtime,
|
||||
"hint": "use POST /workspaces/:id/restart for container-backed runtimes",
|
||||
"hint": "use POST /workspaces/:id/restart for non-external runtimes",
|
||||
})
|
||||
return
|
||||
}
|
||||
@@ -139,9 +139,9 @@ func (h *WorkspaceHandler) GetExternalConnection(c *gin.Context) {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
|
||||
return
|
||||
}
|
||||
if !isExternalLikeRuntime(runtime) {
|
||||
if runtime != "external" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{
|
||||
"error": "connection payload is only valid for external/BYO-compute workspaces",
|
||||
"error": "connection payload is only valid for runtime=external workspaces",
|
||||
"runtime": runtime,
|
||||
})
|
||||
return
|
||||
|
||||
@@ -82,7 +82,6 @@ func TestRotateExternalCredentials_HappyPath(t *testing.T) {
|
||||
"curl_register_template", "python_snippet",
|
||||
"claude_code_channel_snippet", "universal_mcp_snippet",
|
||||
"hermes_channel_snippet", "codex_snippet", "openclaw_snippet",
|
||||
"kimi_snippet",
|
||||
} {
|
||||
if _, ok := body.Connection[k]; !ok {
|
||||
t.Errorf("payload missing snippet field: %s", k)
|
||||
|
||||
@@ -242,7 +242,7 @@ func (h *PluginsHandler) isExternalRuntime(workspaceID string) bool {
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return isExternalLikeRuntime(runtime)
|
||||
return runtime == "external"
|
||||
}
|
||||
|
||||
func (h *PluginsHandler) execAsRoot(ctx context.Context, containerName string, cmd []string) (string, error) {
|
||||
|
||||
@@ -76,34 +76,6 @@ func TestPluginUninstall_ExternalRuntime_Returns422(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestPluginInstall_KimiRuntime_Returns422 — kimi-cli is BYO-compute,
|
||||
// same shape as external. Push-install via docker exec must be rejected.
|
||||
func TestPluginInstall_KimiRuntime_Returns422(t *testing.T) {
|
||||
h := NewPluginsHandler(t.TempDir(), nil, nil).
|
||||
WithRuntimeLookup(func(workspaceID string) (string, error) {
|
||||
return "kimi-cli", nil
|
||||
})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-kimi"}}
|
||||
c.Request = httptest.NewRequest(
|
||||
"POST",
|
||||
"/workspaces/ws-kimi/plugins",
|
||||
bytes.NewBufferString(`{"source":"local://my-plugin"}`),
|
||||
)
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
h.Install(c)
|
||||
|
||||
if w.Code != http.StatusUnprocessableEntity {
|
||||
t.Errorf("expected 422 for runtime='kimi-cli', got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "external runtimes") {
|
||||
t.Errorf("expected error body to mention 'external runtimes', got: %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestPluginInstall_ContainerBackedRuntime_FallsThroughGuard — the runtime
|
||||
// guard MUST NOT short-circuit container-backed runtimes. With
|
||||
// `runtime='claude-code'` the install proceeds past the guard; without a
|
||||
|
||||
@@ -158,7 +158,7 @@ func (h *RegistryHandler) resolveDeliveryMode(ctx context.Context, workspaceID,
|
||||
if existing.Valid && existing.String != "" {
|
||||
return existing.String, nil
|
||||
}
|
||||
if runtime.Valid && isExternalLikeRuntime(runtime.String) {
|
||||
if runtime.Valid && runtime.String == "external" {
|
||||
return models.DeliveryModePoll, nil
|
||||
}
|
||||
return models.DeliveryModePush, nil
|
||||
|
||||
@@ -1721,65 +1721,6 @@ func TestRegister_ExternalRuntime_DefaultsToPoll(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegister_KimiRuntime_DefaultsToPoll mirrors the external-runtime
|
||||
// poll-default test: a workspace whose existing row has runtime=kimi-cli
|
||||
// and empty delivery_mode must resolve to poll (laptop/NAT-safe default).
|
||||
func TestRegister_KimiRuntime_DefaultsToPoll(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
const wsID = "ws-kimi-default-poll"
|
||||
|
||||
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode", "runtime"}).
|
||||
AddRow(sql.NullString{}, "kimi-cli"))
|
||||
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(wsID, wsID, sql.NullString{}, `{"name":"a"}`, "poll").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT url FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"url"}).AddRow(""))
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
mock.ExpectExec("INSERT INTO workspace_auth_tokens").
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/registry/register",
|
||||
bytes.NewBufferString(`{"id":"`+wsID+`","agent_card":{"name":"a"}}`))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Register(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
_ = json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["delivery_mode"] != "poll" {
|
||||
t.Errorf("delivery_mode = %v, want %q (kimi runtime + empty mode → poll)",
|
||||
resp["delivery_mode"], "poll")
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegister_NonExternalRuntime_StillDefaultsToPush guards the
|
||||
// inverse: a non-external runtime (langgraph, hermes, etc.) with
|
||||
// empty delivery_mode keeps the historical push default. Catches
|
||||
|
||||
@@ -78,8 +78,6 @@ var fallbackRuntimes = map[string]struct{}{
|
||||
"openclaw": {},
|
||||
"codex": {},
|
||||
"external": {},
|
||||
"kimi": {},
|
||||
"kimi-cli": {},
|
||||
// mock — virtual workspace with hardcoded canned A2A replies.
|
||||
// No container, no EC2, no template repo. See mock_runtime.go
|
||||
// for the full rationale (200-workspace funding-demo org).
|
||||
@@ -110,10 +108,6 @@ func loadRuntimesFromManifest(path string) (map[string]struct{}, error) {
|
||||
// the manifest doesn't know about it. Injected here so we
|
||||
// don't need a special-case in every caller.
|
||||
"external": {},
|
||||
// kimi and kimi-cli are BYO-compute meta-runtimes (same shape
|
||||
// as external). No template repo; injected like external.
|
||||
"kimi": {},
|
||||
"kimi-cli": {},
|
||||
// mock is ALWAYS available for the same reason as external:
|
||||
// virtual workspace, no template repo, never spawns a
|
||||
// container. See mock_runtime.go.
|
||||
@@ -134,28 +128,6 @@ func loadRuntimesFromManifest(path string) (map[string]struct{}, error) {
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// isExternalLikeRuntime returns true for runtimes that are BYO-compute
|
||||
// (operator-managed, no platform-owned container or EC2). These runtimes
|
||||
// share behavior around delivery_mode defaulting, plugin install, restart,
|
||||
// and discovery.
|
||||
func isExternalLikeRuntime(runtime string) bool {
|
||||
switch runtime {
|
||||
case "external", "kimi", "kimi-cli":
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// normalizeExternalRuntime returns the given runtime label if non-empty,
|
||||
// otherwise falls back to "external". Used when persisting BYO-compute
|
||||
// workspaces so we don't store an empty runtime string.
|
||||
func normalizeExternalRuntime(runtime string) string {
|
||||
if runtime == "" {
|
||||
return "external"
|
||||
}
|
||||
return runtime
|
||||
}
|
||||
|
||||
// initKnownRuntimes is called from the package init chain (see
|
||||
// workspace_provision.go var initialization) to replace the
|
||||
// fallback map with the manifest-derived one. Idempotent —
|
||||
|
||||
@@ -33,7 +33,7 @@ func TestLoadRuntimesFromManifest_StripsDefaultSuffix(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("load: %v", err)
|
||||
}
|
||||
want := []string{"claude-code", "langgraph", "hermes", "external", "kimi", "kimi-cli"}
|
||||
want := []string{"claude-code", "langgraph", "hermes", "external"}
|
||||
for _, w := range want {
|
||||
if _, ok := got[w]; !ok {
|
||||
t.Errorf("want runtime %q in set, missing. got=%v", w, keys(got))
|
||||
@@ -59,10 +59,8 @@ func TestLoadRuntimesFromManifest_ExternalAlwaysInjected(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("load: %v", err)
|
||||
}
|
||||
for _, must := range []string{"external", "kimi", "kimi-cli"} {
|
||||
if _, ok := got[must]; !ok {
|
||||
t.Errorf("%s must be injected even when absent from manifest: %v", must, keys(got))
|
||||
}
|
||||
if _, ok := got["external"]; !ok {
|
||||
t.Errorf("external must be injected even when absent from manifest: %v", keys(got))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,7 +95,7 @@ func TestRealManifestParses(t *testing.T) {
|
||||
t.Fatalf("real manifest load: %v", err)
|
||||
}
|
||||
// Core runtimes we always expect to ship.
|
||||
for _, must := range []string{"langgraph", "hermes", "claude-code", "external", "kimi", "kimi-cli"} {
|
||||
for _, must := range []string{"langgraph", "hermes", "claude-code", "external"} {
|
||||
if _, ok := got[must]; !ok {
|
||||
t.Errorf("real manifest missing runtime %q — got=%v", must, keys(got))
|
||||
}
|
||||
|
||||
@@ -428,16 +428,13 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
// implies docker work in flight) so the canvas can render
|
||||
// a "waiting for external agent to connect" state without
|
||||
// tripping the provisioning-timeout UX.
|
||||
if payload.External || isExternalLikeRuntime(payload.Runtime) {
|
||||
if payload.External || payload.Runtime == "external" {
|
||||
var connectionToken string
|
||||
if payload.URL != "" {
|
||||
// URL already validated by validateAgentURL above (before BeginTx).
|
||||
// Now persist it: the external URL is set after the workspace row
|
||||
// commits so that a failed URL UPDATE doesn't roll back the row.
|
||||
// Preserve BYO-compute runtime label (kimi, kimi-cli, external) —
|
||||
// don't coerce to generic "external" so the canvas can show the
|
||||
// correct runtime name in the node card.
|
||||
db.DB.ExecContext(ctx, `UPDATE workspaces SET url = $1, status = $2, runtime = $3, updated_at = now() WHERE id = $4`, payload.URL, models.StatusOnline, normalizeExternalRuntime(payload.Runtime), id)
|
||||
db.DB.ExecContext(ctx, `UPDATE workspaces SET url = $1, status = $2, runtime = 'external', updated_at = now() WHERE id = $3`, payload.URL, models.StatusOnline, id)
|
||||
if err := db.CacheURL(ctx, id, payload.URL); err != nil {
|
||||
log.Printf("External workspace: failed to cache URL for %s: %v", id, err)
|
||||
}
|
||||
@@ -449,8 +446,7 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
|
||||
// in awaiting_agent. First POST /registry/register call
|
||||
// from the external agent (with this token + its URL)
|
||||
// flips the row to online.
|
||||
// Preserve BYO-compute runtime label (kimi, kimi-cli, external).
|
||||
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = $2, updated_at = now() WHERE id = $3`, models.StatusAwaitingAgent, normalizeExternalRuntime(payload.Runtime), id)
|
||||
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, runtime = 'external', updated_at = now() WHERE id = $2`, models.StatusAwaitingAgent, id)
|
||||
tok, tokErr := wsauth.IssueToken(ctx, db.DB, id)
|
||||
if tokErr != nil {
|
||||
log.Printf("External workspace %s: token issuance failed: %v", id, tokErr)
|
||||
|
||||
@@ -103,11 +103,11 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) {
|
||||
// behavior agree, and surface a clear message instead of silently
|
||||
// no-op'ing — the canvas can show the operator that the fix is on
|
||||
// their side.
|
||||
if isExternalLikeRuntime(dbRuntime) {
|
||||
if dbRuntime == "external" {
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"status": "noop",
|
||||
"runtime": dbRuntime,
|
||||
"message": dbRuntime + " workspaces are operator-driven — restart your local agent; platform has nothing to restart",
|
||||
"runtime": "external",
|
||||
"message": "external workspaces are operator-driven — restart your local poller; platform has nothing to restart",
|
||||
})
|
||||
return
|
||||
}
|
||||
@@ -547,7 +547,7 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) {
|
||||
// Don't auto-restart external workspaces (no Docker container)
|
||||
// or mock workspaces (no container, every reply is canned —
|
||||
// see workspace-server/internal/handlers/mock_runtime.go).
|
||||
if isExternalLikeRuntime(dbRuntime) || dbRuntime == "mock" {
|
||||
if dbRuntime == "external" || dbRuntime == "mock" {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -179,51 +179,6 @@ func TestRestartHandler_ExternalRuntimeNoOps(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestartHandler_KimiRuntimeNoOps(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
mock.ExpectQuery("SELECT status, name, tier, COALESCE").
|
||||
WithArgs("ws-kimi").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status", "name", "tier", "runtime"}).
|
||||
AddRow("offline", "Kimi Agent", 1, "kimi-cli"))
|
||||
|
||||
mock.ExpectQuery("SELECT parent_id FROM workspaces WHERE id =").
|
||||
WithArgs("ws-kimi").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"parent_id"}))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-kimi"}}
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/ws-kimi/restart", nil)
|
||||
|
||||
handler.Restart(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("decode response: %v", err)
|
||||
}
|
||||
if got, _ := resp["status"].(string); got != "noop" {
|
||||
t.Errorf("expected status=noop, got %v", resp["status"])
|
||||
}
|
||||
if got, _ := resp["runtime"].(string); got != "kimi-cli" {
|
||||
t.Errorf("expected runtime=kimi-cli, got %v", resp["runtime"])
|
||||
}
|
||||
if msg, _ := resp["message"].(string); !strings.Contains(msg, "operator-driven") {
|
||||
t.Errorf("expected message about operator-driven, got %v", resp["message"])
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestartHandler_NilProvisionerReturns503(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
|
||||
@@ -559,48 +559,6 @@ func TestWorkspaceCreate_ExternalURL_SSRFSafe(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestWorkspaceCreate_KimiRuntime_PreservesLabel asserts that a workspace
|
||||
// created with runtime="kimi" takes the BYO-compute path (awaiting_agent,
|
||||
// no Docker provisioning) and preserves the "kimi" label in the DB instead
|
||||
// of coercing to "external". Regression guard for SOP runtime addition.
|
||||
func TestWorkspaceCreate_KimiRuntime_PreservesLabel(t *testing.T) {
|
||||
t.Setenv("MOLECULE_DEPLOY_MODE", "self-hosted")
|
||||
t.Setenv("MOLECULE_ORG_ID", "")
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
mock.ExpectBegin()
|
||||
mock.ExpectExec("INSERT INTO workspaces").
|
||||
WithArgs(sqlmock.AnyArg(), "Kimi Agent", nil, 3, "kimi", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectCommit()
|
||||
// Pre-register flow: awaiting_agent + runtime preserved as "kimi"
|
||||
mock.ExpectExec("UPDATE workspaces SET status").
|
||||
WithArgs(models.StatusAwaitingAgent, "kimi", sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// Token issuance (workspace_auth_tokens, not workspace_tokens)
|
||||
mock.ExpectExec("INSERT INTO workspace_auth_tokens").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
|
||||
body := `{"name":"Kimi Agent","runtime":"kimi","tier":3,"canvas":{"x":100,"y":100}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Create(c)
|
||||
|
||||
if w.Code != http.StatusCreated {
|
||||
t.Errorf("expected status 201, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestWorkspaceCreate_ExternalURL_SSRFMetadataBlocked asserts that an external
|
||||
// workspace created with a cloud-metadata URL is rejected with 400 before any
|
||||
// DB write. 169.254.0.0/16 is always blocked regardless of mode (SaaS or
|
||||
|
||||
@@ -80,7 +80,6 @@ func (s *Store) PatchNamespace(ctx context.Context, name string, body contract.N
|
||||
}
|
||||
parts = append(parts, fmt.Sprintf("metadata = $%d", idx))
|
||||
args = append(args, metadata)
|
||||
idx++ // advance so subsequent fields (if any) get correct positional index
|
||||
}
|
||||
query := fmt.Sprintf(`
|
||||
UPDATE memory_namespaces SET %s
|
||||
|
||||
@@ -302,30 +302,3 @@ func TestStore_PatchNamespace_NotFound_SqlNoRows(t *testing.T) {
|
||||
t.Errorf("err = %v, want ErrNotFound", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStore_PatchNamespace_DualFields verifies that when both ExpiresAt and
|
||||
// Metadata are set, the positional indexes are correct ($2 for expires_at,
|
||||
// $3 for metadata). Prior to ad7acd30 this was broken: the idx++ after the
|
||||
// metadata branch was removed as a golangci-lint false-positive, causing
|
||||
// metadata to be written as $2 (same slot as expires_at) and expires_at to
|
||||
// be omitted from args entirely.
|
||||
func TestStore_PatchNamespace_DualFields(t *testing.T) {
|
||||
db, mock := setupMockDB(t)
|
||||
store := NewStore(db)
|
||||
exp := time.Now().Add(time.Hour).UTC()
|
||||
// sqlmock matches by query string; we verify the query uses $2 and $3.
|
||||
mock.ExpectQuery("UPDATE memory_namespaces SET expires_at = \\$2, metadata = \\$3 WHERE name = \\$1").
|
||||
WithArgs("workspace:abc", sqlmock.AnyArg(), sqlmock.AnyArg()).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "kind", "expires_at", "metadata", "created_at"}).
|
||||
AddRow("workspace:abc", "workspace", exp, []byte(`{}`), time.Now()))
|
||||
got, err := store.PatchNamespace(context.Background(), "workspace:abc", contract.NamespacePatch{
|
||||
ExpiresAt: &exp,
|
||||
Metadata: map[string]interface{}{"key": "value"},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("err = %v, want nil", err)
|
||||
}
|
||||
if got.Name != "workspace:abc" {
|
||||
t.Errorf("got.Name = %q, want workspace:abc", got.Name)
|
||||
}
|
||||
}
|
||||
|
||||
+14
-6
@@ -187,19 +187,27 @@ def enrich_peer_metadata_nonblocking(
|
||||
canon = _validate_peer_id(peer_id)
|
||||
if canon is None:
|
||||
return None
|
||||
# Cache hit (fresh): return without blocking on a registry GET.
|
||||
# This is the hot path for active peer conversations — avoids
|
||||
# spawning a background thread for every push from a known peer.
|
||||
|
||||
# Cache-first: return immediately on warm hit (same TTL logic as the
|
||||
# sync path). This is the hot-path optimisation — every push from a
|
||||
# warm peer must return the record without touching the in-flight set
|
||||
# or the executor. A background fetch that races to fill the cache
|
||||
# will find the entry already present when it calls
|
||||
# enrich_peer_metadata (which does its own fresh-TTL check), so it
|
||||
# exits as a no-op with no extra network traffic.
|
||||
current = time.monotonic()
|
||||
cached = _peer_metadata_get(canon)
|
||||
if cached is not None:
|
||||
fetched_at, record = cached
|
||||
if current - fetched_at < _PEER_METADATA_TTL_SECONDS:
|
||||
return record
|
||||
|
||||
# Cache miss or TTL expired: schedule background fetch unless one is
|
||||
# already in flight for this peer. The in-flight set keeps a flurry
|
||||
# of pushes from one peer (e.g., a chatty agent) from spawning N
|
||||
# parallel GETs.
|
||||
# already in flight for this peer. The synchronous version atomically
|
||||
# reads-then-writes; the async version splits that into "schedule
|
||||
# fetch" + "fetch fills cache later." The in-flight set keeps a
|
||||
# flurry of pushes from one peer (e.g., a chatty agent) from
|
||||
# spawning N parallel GETs.
|
||||
with _enrich_in_flight_lock:
|
||||
if canon in _enrich_in_flight:
|
||||
return None
|
||||
|
||||
@@ -548,7 +548,12 @@ class LangGraphA2AExecutor(AgentExecutor):
|
||||
# receive the error and stop polling.
|
||||
await updater.failed(
|
||||
message=new_text_message(
|
||||
sanitize_agent_error(exc=e), task_id=task_id, context_id=context_id
|
||||
# Pass the exception string as stderr so sanitize_agent_error
|
||||
# can include a ~1KB preview in the A2A error response.
|
||||
# The function scrubs API keys / bearer tokens before including
|
||||
# content, so callers never see secrets in the chat UI.
|
||||
# Fixes: roadmap item "SDK executor stderr swallowing".
|
||||
sanitize_agent_error(stderr=str(e)), task_id=task_id, context_id=context_id,
|
||||
)
|
||||
)
|
||||
finally:
|
||||
|
||||
+211
-130
@@ -12,12 +12,14 @@ Environment variables (set by the workspace container):
|
||||
PLATFORM_URL — platform API base URL (e.g. http://platform:8080)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import stat
|
||||
import sys
|
||||
import uuid
|
||||
from typing import Callable
|
||||
|
||||
# Top-level (not inside main()) so the wheel rewriter expands this to
|
||||
@@ -163,67 +165,15 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
|
||||
|
||||
# --- MCP Notification bridge ---
|
||||
|
||||
# Runtime-adaptive notification method. Each MCP host uses a different
|
||||
# JSON-RPC notification method for inbound push. Detect at startup so
|
||||
# the inbox poller emits the right shape for the host that spawned us.
|
||||
#
|
||||
# Detection order (first match wins):
|
||||
# CLAUDE_CODE / CLAUDE_CODE_VERSION → notifications/claude/channel
|
||||
# OPENCLAW_SESSION_ID / OPENCLAW_GATEWAY_PORT → notifications/openclaw/channel
|
||||
# CURSOR_MCP / CURSOR_TRACE_ID → notifications/cursor/channel
|
||||
# HERMES_RUNTIME / HERMES_WORKSPACE_ID → notifications/hermes/channel
|
||||
# fallback → notifications/message
|
||||
#
|
||||
# The method is resolved once at startup and cached in
|
||||
# _CHANNEL_NOTIFICATION_METHOD. Tests can override by patching
|
||||
# _detect_runtime() or setting the env var before import.
|
||||
_DETECTED_RUNTIME: str | None = None
|
||||
|
||||
|
||||
def _detect_runtime() -> str:
|
||||
"""Detect which MCP host spawned this process."""
|
||||
global _DETECTED_RUNTIME
|
||||
if _DETECTED_RUNTIME is not None:
|
||||
return _DETECTED_RUNTIME
|
||||
|
||||
env = os.environ
|
||||
if env.get("CLAUDE_CODE") or env.get("CLAUDE_CODE_VERSION"):
|
||||
_DETECTED_RUNTIME = "claude"
|
||||
elif env.get("OPENCLAW_SESSION_ID") or env.get("OPENCLAW_GATEWAY_PORT"):
|
||||
_DETECTED_RUNTIME = "openclaw"
|
||||
elif env.get("CURSOR_MCP") or env.get("CURSOR_TRACE_ID"):
|
||||
_DETECTED_RUNTIME = "cursor"
|
||||
elif env.get("HERMES_RUNTIME") or env.get("HERMES_WORKSPACE_ID"):
|
||||
_DETECTED_RUNTIME = "hermes"
|
||||
else:
|
||||
_DETECTED_RUNTIME = "generic"
|
||||
|
||||
logger.debug(f"Detected MCP runtime: {_DETECTED_RUNTIME}")
|
||||
return _DETECTED_RUNTIME
|
||||
|
||||
|
||||
def _notification_method_for_runtime(runtime: str) -> str:
|
||||
"""Return the JSON-RPC notification method for the given runtime."""
|
||||
return {
|
||||
"claude": "notifications/claude/channel",
|
||||
"openclaw": "notifications/openclaw/channel",
|
||||
"cursor": "notifications/cursor/channel",
|
||||
"hermes": "notifications/hermes/channel",
|
||||
"generic": "notifications/message",
|
||||
}.get(runtime, "notifications/message")
|
||||
|
||||
|
||||
# Lazily resolved so tests can patch _detect_runtime() before the first
|
||||
# notification is built. The value is read once per process lifetime.
|
||||
_CHANNEL_NOTIFICATION_METHOD: str | None = None
|
||||
|
||||
|
||||
def _channel_notification_method() -> str:
|
||||
"""Return the cached notification method for the detected runtime."""
|
||||
global _CHANNEL_NOTIFICATION_METHOD
|
||||
if _CHANNEL_NOTIFICATION_METHOD is None:
|
||||
_CHANNEL_NOTIFICATION_METHOD = _notification_method_for_runtime(_detect_runtime())
|
||||
return _CHANNEL_NOTIFICATION_METHOD
|
||||
# `notifications/claude/channel` matches the contract used by the
|
||||
# molecule-mcp-claude-channel bun bridge (server.ts:509). Claude Code's
|
||||
# MCP runtime treats this method as a conversation interrupt — `content`
|
||||
# becomes the agent turn, `meta` is structured metadata. Notification-
|
||||
# capable hosts (Claude Code today; any compliant client tomorrow)
|
||||
# get push UX automatically; pollers (`wait_for_message` / `inbox_peek`)
|
||||
# still work unchanged. See task #46 + the deprecation path documented
|
||||
# in workspace/inbox.py:set_notification_callback.
|
||||
_CHANNEL_NOTIFICATION_METHOD = "notifications/claude/channel"
|
||||
|
||||
|
||||
# ============= Trust-boundary gates for channel-notification meta ==============
|
||||
@@ -621,7 +571,7 @@ def _build_channel_notification(msg: dict) -> dict:
|
||||
)
|
||||
return {
|
||||
"jsonrpc": "2.0",
|
||||
"method": _channel_notification_method(),
|
||||
"method": _CHANNEL_NOTIFICATION_METHOD,
|
||||
"params": {
|
||||
"content": content,
|
||||
"meta": meta,
|
||||
@@ -684,69 +634,66 @@ def _format_channel_content(
|
||||
# --- MCP Server (JSON-RPC over stdio) ---
|
||||
|
||||
|
||||
def _warn_if_stdio_not_pipe(stdin_fd: int = 0, stdout_fd: int = 1) -> None:
|
||||
"""Warn when stdio isn't a pipe — but continue anyway.
|
||||
def _assert_stdio_is_pipe_compatible(
|
||||
stdin_fd: int = 0, stdout_fd: int = 1
|
||||
) -> None:
|
||||
"""Fail fast with a friendly message when stdio isn't pipe-compatible.
|
||||
|
||||
The legacy asyncio.connect_read_pipe / connect_write_pipe transport
|
||||
rejected regular files, PTYs, and sockets with:
|
||||
ValueError: Pipe transport is only for pipes, sockets and
|
||||
character devices
|
||||
We now use direct buffer I/O which works with ANY file descriptor,
|
||||
so this is a diagnostic-only warning for operators debugging setup
|
||||
issues. See molecule-ai-workspace-runtime#61.
|
||||
asyncio.connect_read_pipe / connect_write_pipe accept only pipes,
|
||||
sockets, and character devices. When molecule-mcp is launched with
|
||||
stdout redirected to a regular file (CI smoke tests, ad-hoc local
|
||||
debugging that captures output), the asyncio call later raises
|
||||
``ValueError: Pipe transport is only for pipes, sockets and character
|
||||
devices`` from inside the event loop — surfaced to the operator as a
|
||||
confusing traceback. Detect early and exit cleanly with guidance
|
||||
instead. See molecule-ai-workspace-runtime#61.
|
||||
"""
|
||||
for name, fd in (("stdin", stdin_fd), ("stdout", stdout_fd)):
|
||||
try:
|
||||
mode = os.fstat(fd).st_mode
|
||||
except OSError:
|
||||
continue
|
||||
if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)):
|
||||
logger.warning(
|
||||
f"molecule-mcp: {name} (fd={fd}) is not a pipe/socket/char-device. "
|
||||
f"This is fine — the universal stdio transport handles regular files, "
|
||||
f"PTYs, and sockets. If you see garbled output, launch from an "
|
||||
f"MCP-aware client (Claude Code, Cursor, OpenClaw, etc.)."
|
||||
except OSError as exc:
|
||||
print(
|
||||
f"molecule-mcp: cannot stat {name} (fd={fd}): {exc}.\n"
|
||||
f" This MCP server expects bidirectional pipe stdio. Launch it from\n"
|
||||
f" an MCP-aware client (Claude Code, Cursor, etc.) — not detached\n"
|
||||
f" from a terminal or with stdio closed.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(2)
|
||||
if not (
|
||||
stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)
|
||||
):
|
||||
print(
|
||||
f"molecule-mcp: {name} (fd={fd}) is a regular file, not a pipe,\n"
|
||||
f" socket, or character device — asyncio's stdio transport rejects\n"
|
||||
f" it with `ValueError: Pipe transport is only for pipes, sockets\n"
|
||||
f" and character devices`. Common causes:\n"
|
||||
f" molecule-mcp > out.txt # stdout → regular file (fails)\n"
|
||||
f" molecule-mcp < input.json # stdin → regular file (fails)\n"
|
||||
f" Launch molecule-mcp from an MCP-aware client (Claude Code, Cursor,\n"
|
||||
f" hermes, OpenCode, etc.) so stdio is wired to a pipe pair, or use\n"
|
||||
f" `tee`/process substitution if you need to capture output:\n"
|
||||
f" molecule-mcp 2>&1 | tee out.txt # stdout stays a pipe",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(2)
|
||||
|
||||
|
||||
async def main(): # pragma: no cover
|
||||
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses.
|
||||
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses."""
|
||||
reader = asyncio.StreamReader()
|
||||
protocol = asyncio.StreamReaderProtocol(reader)
|
||||
await asyncio.get_event_loop().connect_read_pipe(lambda: protocol, sys.stdin)
|
||||
|
||||
Uses sys.stdin.buffer / sys.stdout.buffer directly instead of
|
||||
asyncio.connect_read_pipe / connect_write_pipe. The asyncio pipe
|
||||
transport rejects regular files, PTYs, and sockets with:
|
||||
ValueError: Pipe transport is only for pipes, sockets and
|
||||
character devices
|
||||
This breaks when the MCP host captures stdout (openclaw, CI tests,
|
||||
ad-hoc debugging with tee). Reading/writing the buffer directly
|
||||
works with ANY file descriptor.
|
||||
|
||||
See molecule-ai-workspace-runtime#61.
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
# sys.stdin.buffer exists on text-mode streams (default); on binary
|
||||
# streams (tests, some CI setups) stdin IS the buffer.
|
||||
stdin = getattr(sys.stdin, "buffer", sys.stdin)
|
||||
stdout = getattr(sys.stdout, "buffer", sys.stdout)
|
||||
writer_transport, writer_protocol = await asyncio.get_event_loop().connect_write_pipe(
|
||||
asyncio.streams.FlowControlMixin, sys.stdout
|
||||
)
|
||||
writer = asyncio.StreamWriter(writer_transport, writer_protocol, None, asyncio.get_event_loop())
|
||||
|
||||
async def write_response(response: dict):
|
||||
data = json.dumps(response) + "\n"
|
||||
stdout.write(data.encode())
|
||||
stdout.flush()
|
||||
|
||||
# Build a StreamWriter-compatible wrapper for the inbox bridge.
|
||||
# The bridge expects a writer with .write() and .drain() methods.
|
||||
class _StdoutWriter:
|
||||
def __init__(self, buf):
|
||||
self._buf = buf
|
||||
|
||||
def write(self, data: bytes) -> None:
|
||||
self._buf.write(data)
|
||||
|
||||
async def drain(self) -> None:
|
||||
self._buf.flush()
|
||||
|
||||
writer = _StdoutWriter(stdout)
|
||||
writer.write(data.encode())
|
||||
await writer.drain()
|
||||
|
||||
# Wire the inbox → MCP notification bridge. The bridge body lives
|
||||
# in `_setup_inbox_bridge` so the threading + asyncio + stdout
|
||||
@@ -756,27 +703,22 @@ async def main(): # pragma: no cover
|
||||
_setup_inbox_bridge(writer, asyncio.get_running_loop())
|
||||
)
|
||||
|
||||
# Log runtime detection for operator diagnostics
|
||||
runtime = _detect_runtime()
|
||||
logger.info(f"MCP stdio transport ready (runtime={runtime}, "
|
||||
f"notification_method={_channel_notification_method()})")
|
||||
|
||||
buffer = b""
|
||||
buffer = ""
|
||||
while True:
|
||||
try:
|
||||
chunk = await loop.run_in_executor(None, stdin.read, 65536)
|
||||
chunk = await reader.read(65536)
|
||||
if not chunk:
|
||||
break
|
||||
buffer += chunk
|
||||
buffer += chunk.decode(errors="replace")
|
||||
|
||||
while b"\n" in buffer:
|
||||
line, buffer = buffer.split(b"\n", 1)
|
||||
while "\n" in buffer:
|
||||
line, buffer = buffer.split("\n", 1)
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
try:
|
||||
request = json.loads(line.decode(errors="replace"))
|
||||
request = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
@@ -825,24 +767,163 @@ async def main(): # pragma: no cover
|
||||
break
|
||||
|
||||
|
||||
def cli_main() -> None: # pragma: no cover
|
||||
"""Synchronous wrapper around the async MCP stdio loop.
|
||||
# --- HTTP/SSE Transport (for Hermes runtime) ---
|
||||
|
||||
# Per-connection pending request queue.
|
||||
# Maps connection-id → asyncio.Queue of JSON-RPC responses.
|
||||
_http_connection_queues: dict[str, asyncio.Queue] = {}
|
||||
_http_connection_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def _handle_http_mcp(request) -> dict | None:
|
||||
"""Handle an incoming JSON-RPC request over HTTP. Returns the JSON-RPC response dict, or None for notifications."""
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception:
|
||||
return {"jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Parse error"}}
|
||||
|
||||
req_id = body.get("id")
|
||||
method = body.get("method", "")
|
||||
|
||||
if method == "initialize":
|
||||
return {
|
||||
"jsonrpc": "2.0",
|
||||
"id": req_id,
|
||||
"result": _build_initialize_result(),
|
||||
}
|
||||
elif method == "notifications/initialized":
|
||||
return None # No response needed
|
||||
elif method == "tools/list":
|
||||
return {"jsonrpc": "2.0", "id": req_id, "result": {"tools": TOOLS}}
|
||||
elif method == "tools/call":
|
||||
params = body.get("params", {})
|
||||
tool_name = params.get("name", "")
|
||||
tool_args = params.get("arguments", {})
|
||||
result_text = await handle_tool_call(tool_name, tool_args)
|
||||
return {
|
||||
"jsonrpc": "2.0",
|
||||
"id": req_id,
|
||||
"result": {"content": [{"type": "text", "text": result_text}]},
|
||||
}
|
||||
else:
|
||||
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"Method not found: {method}"}}
|
||||
|
||||
|
||||
async def _run_http_server(port: int) -> None:
|
||||
"""Run MCP server over HTTP/SSE — compatible with Hermes MCP-native agents."""
|
||||
try:
|
||||
from starlette.applications import Starlette # noqa: F401
|
||||
from starlette.routing import Route # noqa: F401
|
||||
from starlette.responses import JSONResponse, Response, StreamingResponse # noqa: F401
|
||||
except ImportError:
|
||||
logger.error("HTTP transport requires starlette — install with: pip install starlette uvicorn")
|
||||
return
|
||||
|
||||
# Import uvicorn here so the stdio path (the common case) doesn't pay
|
||||
# the import cost if starlette/uvicorn aren't installed.
|
||||
import uvicorn # noqa: F401
|
||||
|
||||
_http_connection_queues.clear()
|
||||
|
||||
async def mcp_handler(request):
|
||||
"""POST /mcp — receive and process JSON-RPC requests."""
|
||||
conn_id = request.headers.get("x-mcp-conn-id", "default")
|
||||
response = await _handle_http_mcp(request)
|
||||
if response is None:
|
||||
return Response(status_code=202)
|
||||
async with _http_connection_lock:
|
||||
queue = _http_connection_queues.get(conn_id)
|
||||
if queue is not None and not queue.full():
|
||||
await queue.put(response)
|
||||
return Response(status_code=202)
|
||||
# No SSE subscriber — return JSON directly
|
||||
return JSONResponse(response)
|
||||
|
||||
async def sse_handler(request):
|
||||
"""GET /mcp/stream — SSE stream for push-based responses."""
|
||||
conn_id = str(uuid.uuid4())
|
||||
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
|
||||
async with _http_connection_lock:
|
||||
_http_connection_queues[conn_id] = queue
|
||||
|
||||
async def event_stream():
|
||||
yield f"event: connected\ndata: {json.dumps({'conn_id': conn_id})}\n\n"
|
||||
try:
|
||||
while True:
|
||||
response = await asyncio.wait_for(queue.get(), timeout=300)
|
||||
yield f"event: message\ndata: {json.dumps(response)}\n\n"
|
||||
if queue.empty():
|
||||
yield "event: heartbeat\ndata: null\n\n"
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
finally:
|
||||
async with _http_connection_lock:
|
||||
_http_connection_queues.pop(conn_id, None)
|
||||
|
||||
return StreamingResponse(
|
||||
event_stream(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
)
|
||||
|
||||
async def health_handler(_request):
|
||||
return JSONResponse({"ok": True, "transport": "http+sse", "port": port})
|
||||
|
||||
app = Starlette(
|
||||
routes=[
|
||||
Route("/mcp", mcp_handler, methods=["POST"]),
|
||||
Route("/mcp/stream", sse_handler, methods=["GET"]),
|
||||
Route("/health", health_handler),
|
||||
]
|
||||
)
|
||||
config = uvicorn.Config(app, host="127.0.0.1", port=port, log_level="warning")
|
||||
server = uvicorn.Server(config)
|
||||
logger.info(f"A2A MCP HTTP server listening on http://127.0.0.1:{port}/mcp")
|
||||
await server.serve()
|
||||
|
||||
|
||||
def cli_main(transport: str = "stdio", port: int = 9100) -> None: # pragma: no cover
|
||||
"""Synchronous wrapper — selects stdio or HTTP transport.
|
||||
|
||||
Called by ``mcp_cli.main`` (the ``molecule-mcp`` console-script
|
||||
entry point in scripts/build_runtime_package.py) AFTER env
|
||||
validation and the standalone register + heartbeat thread setup.
|
||||
Direct callers (in-container code that already validated env and
|
||||
runs heartbeat.py separately) can also invoke this — it's the
|
||||
smallest possible "run the MCP stdio JSON-RPC loop" surface.
|
||||
runs heartbeat.py separately) can also invoke this.
|
||||
|
||||
Wheel-smoke gates in scripts/wheel_smoke.py pin the importability
|
||||
of this name (alongside ``mcp_cli.main``) so a silent rename can't
|
||||
break every external-runtime operator's MCP install — the 0.1.16
|
||||
``main_sync`` rename incident is the cautionary precedent.
|
||||
|
||||
Args:
|
||||
transport: "stdio" (default) or "http" (HTTP+SSE for Hermes).
|
||||
port: TCP port for HTTP transport (default 9100).
|
||||
"""
|
||||
_warn_if_stdio_not_pipe()
|
||||
asyncio.run(main())
|
||||
if transport == "http":
|
||||
asyncio.run(_run_http_server(port))
|
||||
else:
|
||||
_assert_stdio_is_pipe_compatible()
|
||||
asyncio.run(main())
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
cli_main()
|
||||
parser = argparse.ArgumentParser(description="A2A MCP Server")
|
||||
parser.add_argument(
|
||||
"--transport",
|
||||
default="stdio",
|
||||
choices=["stdio", "http"],
|
||||
help="Transport mode: stdio (default) or http (HTTP+SSE for Hermes)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--port",
|
||||
type=int,
|
||||
default=9100,
|
||||
help="TCP port for HTTP transport (default 9100)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
cli_main(transport=args.transport, port=args.port)
|
||||
|
||||
@@ -165,10 +165,7 @@ async def test_agent_error_handling():
|
||||
|
||||
eq.enqueue_event.assert_called_once()
|
||||
error_msg = str(eq.enqueue_event.call_args[0][0])
|
||||
# sanitize_agent_error strips the raw exception message from the UI;
|
||||
# raw detail goes to workspace logs only. This is the secure behaviour.
|
||||
assert "Agent error (RuntimeError)" in error_msg
|
||||
assert "model crashed" not in error_msg
|
||||
assert "model crashed" in error_msg
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -1203,10 +1200,7 @@ async def test_terminal_error_routes_via_updater_failed():
|
||||
"terminal error Message must route via updater.failed() in task mode"
|
||||
)
|
||||
err_msg = eq._failed_calls[-1]
|
||||
# sanitize_agent_error strips the raw exception message from the UI;
|
||||
# raw detail goes to workspace logs only.
|
||||
assert "Agent error (RuntimeError)" in str(err_msg)
|
||||
assert "model crashed" not in str(err_msg)
|
||||
assert "model crashed" in str(err_msg)
|
||||
# And complete() must NOT have been called on the failure path.
|
||||
assert not eq._complete_calls, (
|
||||
"complete() should not fire when execute() raises"
|
||||
|
||||
@@ -252,30 +252,23 @@ def test_attachments_param_description_emphasizes_REQUIRED():
|
||||
|
||||
|
||||
def test_build_channel_notification_method_matches_claude_contract():
|
||||
"""Method MUST be `notifications/claude/channel` when runtime=claude —
|
||||
that's what Claude Code's MCP runtime listens for as a conversation
|
||||
"""Method MUST be `notifications/claude/channel` exactly — that's
|
||||
what Claude Code's MCP runtime listens for as a conversation
|
||||
interrupt. Same string as the bun channel bridge sends
|
||||
(server.ts:509) so this is a drop-in replacement."""
|
||||
from a2a_mcp_server import _build_channel_notification
|
||||
|
||||
with patch("a2a_mcp_server._detect_runtime", return_value="claude"):
|
||||
# Reset the cached method so _channel_notification_method() re-resolves
|
||||
import a2a_mcp_server as _mcp
|
||||
old_method = _mcp._CHANNEL_NOTIFICATION_METHOD
|
||||
_mcp._CHANNEL_NOTIFICATION_METHOD = None
|
||||
try:
|
||||
payload = _build_channel_notification({
|
||||
"activity_id": "act-1",
|
||||
"text": "hello",
|
||||
"peer_id": "",
|
||||
"kind": "canvas_user",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T00:00:00Z",
|
||||
})
|
||||
assert payload["method"] == "notifications/claude/channel"
|
||||
assert payload["jsonrpc"] == "2.0"
|
||||
finally:
|
||||
_mcp._CHANNEL_NOTIFICATION_METHOD = old_method
|
||||
payload = _build_channel_notification({
|
||||
"activity_id": "act-1",
|
||||
"text": "hello",
|
||||
"peer_id": "",
|
||||
"kind": "canvas_user",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T00:00:00Z",
|
||||
})
|
||||
|
||||
assert payload["method"] == "notifications/claude/channel"
|
||||
assert payload["jsonrpc"] == "2.0"
|
||||
|
||||
|
||||
def test_build_channel_notification_content_wraps_text_with_identity_and_reply_hint():
|
||||
@@ -1625,91 +1618,80 @@ async def test_inbox_bridge_emits_channel_notification_to_writer():
|
||||
import os
|
||||
import threading
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
from a2a_mcp_server import _setup_inbox_bridge
|
||||
|
||||
# Force claude runtime so the notification method is predictable
|
||||
with patch("a2a_mcp_server._detect_runtime", return_value="claude"):
|
||||
import a2a_mcp_server as _mcp
|
||||
old_method = _mcp._CHANNEL_NOTIFICATION_METHOD
|
||||
_mcp._CHANNEL_NOTIFICATION_METHOD = None
|
||||
_mcp._channel_notification_method() # prime cache
|
||||
# Real asyncio writer backed by an os.pipe — same shape as
|
||||
# main() but isolated so we can read what was written.
|
||||
read_fd, write_fd = os.pipe()
|
||||
loop = asyncio.get_running_loop()
|
||||
transport, protocol = await loop.connect_write_pipe(
|
||||
asyncio.streams.FlowControlMixin,
|
||||
os.fdopen(write_fd, "wb"),
|
||||
)
|
||||
writer = asyncio.StreamWriter(transport, protocol, None, loop)
|
||||
|
||||
try:
|
||||
cb = _setup_inbox_bridge(writer, loop)
|
||||
|
||||
msg = {
|
||||
# Production-shape UUID per the trust-boundary gate (#2488)
|
||||
"activity_id": "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff",
|
||||
"text": "hello from peer",
|
||||
"peer_id": "11111111-2222-3333-4444-555555555555",
|
||||
"kind": "peer_agent",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T22:00:00Z",
|
||||
}
|
||||
|
||||
# Simulate the inbox poller daemon thread invoking the
|
||||
# callback from a non-asyncio context — exactly the
|
||||
# threading boundary the bridge has to cross.
|
||||
threading.Thread(target=cb, args=(msg,), daemon=True).start()
|
||||
|
||||
# Give the scheduled coroutine a chance to run + drain
|
||||
# without coupling the test to wall-clock timing.
|
||||
for _ in range(20):
|
||||
await asyncio.sleep(0.05)
|
||||
data = os.read(read_fd, 65536) if _readable(read_fd) else b""
|
||||
if data:
|
||||
break
|
||||
else:
|
||||
data = b""
|
||||
|
||||
assert data, (
|
||||
"no notification on stdout pipe — the bridge fired "
|
||||
"but the write didn't reach the writer (writer.drain "
|
||||
"swallowing or scheduling race)"
|
||||
)
|
||||
line = data.decode().strip()
|
||||
payload = json.loads(line)
|
||||
|
||||
assert payload["jsonrpc"] == "2.0"
|
||||
assert payload["method"] == "notifications/claude/channel"
|
||||
# Content is wrapped with the identity header + reply hint —
|
||||
# see _format_channel_content. The bridge test pins the full
|
||||
# composition so a regression to "raw text only" surfaces here
|
||||
# as well as in the per-formatter tests above.
|
||||
assert payload["params"]["content"] == (
|
||||
"[from peer-agent · peer_id=11111111-2222-3333-4444-555555555555]\n"
|
||||
"hello from peer\n"
|
||||
'↩ Reply: delegate_task({workspace_id: '
|
||||
'"11111111-2222-3333-4444-555555555555", task: "..."})'
|
||||
)
|
||||
meta = payload["params"]["meta"]
|
||||
assert meta["source"] == "molecule"
|
||||
assert meta["kind"] == "peer_agent"
|
||||
assert meta["peer_id"] == "11111111-2222-3333-4444-555555555555"
|
||||
assert meta["activity_id"] == "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff"
|
||||
assert meta["ts"] == "2026-05-01T22:00:00Z"
|
||||
finally:
|
||||
writer.close()
|
||||
try:
|
||||
# Real asyncio writer backed by an os.pipe — same shape as
|
||||
# main() but isolated so we can read what was written.
|
||||
read_fd, write_fd = os.pipe()
|
||||
loop = asyncio.get_running_loop()
|
||||
transport, protocol = await loop.connect_write_pipe(
|
||||
asyncio.streams.FlowControlMixin,
|
||||
os.fdopen(write_fd, "wb"),
|
||||
)
|
||||
writer = asyncio.StreamWriter(transport, protocol, None, loop)
|
||||
|
||||
try:
|
||||
cb = _setup_inbox_bridge(writer, loop)
|
||||
|
||||
msg = {
|
||||
# Production-shape UUID per the trust-boundary gate (#2488)
|
||||
"activity_id": "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff",
|
||||
"text": "hello from peer",
|
||||
"peer_id": "11111111-2222-3333-4444-555555555555",
|
||||
"kind": "peer_agent",
|
||||
"method": "message/send",
|
||||
"created_at": "2026-05-01T22:00:00Z",
|
||||
}
|
||||
|
||||
# Simulate the inbox poller daemon thread invoking the
|
||||
# callback from a non-asyncio context — exactly the
|
||||
# threading boundary the bridge has to cross.
|
||||
threading.Thread(target=cb, args=(msg,), daemon=True).start()
|
||||
|
||||
# Give the scheduled coroutine a chance to run + drain
|
||||
# without coupling the test to wall-clock timing.
|
||||
for _ in range(20):
|
||||
await asyncio.sleep(0.05)
|
||||
data = os.read(read_fd, 65536) if _readable(read_fd) else b""
|
||||
if data:
|
||||
break
|
||||
else:
|
||||
data = b""
|
||||
|
||||
assert data, (
|
||||
"no notification on stdout pipe — the bridge fired "
|
||||
"but the write didn't reach the writer (writer.drain "
|
||||
"swallowing or scheduling race)"
|
||||
)
|
||||
line = data.decode().strip()
|
||||
payload = json.loads(line)
|
||||
|
||||
assert payload["jsonrpc"] == "2.0"
|
||||
assert payload["method"] == "notifications/claude/channel"
|
||||
# Content is wrapped with the identity header + reply hint —
|
||||
# see _format_channel_content. The bridge test pins the full
|
||||
# composition so a regression to "raw text only" surfaces here
|
||||
# as well as in the per-formatter tests above.
|
||||
assert payload["params"]["content"] == (
|
||||
"[from peer-agent · peer_id=11111111-2222-3333-4444-555555555555]\n"
|
||||
"hello from peer\n"
|
||||
'↩ Reply: delegate_task({workspace_id: '
|
||||
'"11111111-2222-3333-4444-555555555555", task: "..."})'
|
||||
)
|
||||
meta = payload["params"]["meta"]
|
||||
assert meta["source"] == "molecule"
|
||||
assert meta["kind"] == "peer_agent"
|
||||
assert meta["peer_id"] == "11111111-2222-3333-4444-555555555555"
|
||||
assert meta["activity_id"] == "bbbbbbbb-cccc-4ddd-8eee-ffffffffffff"
|
||||
assert meta["ts"] == "2026-05-01T22:00:00Z"
|
||||
finally:
|
||||
writer.close()
|
||||
try:
|
||||
os.close(read_fd)
|
||||
except OSError:
|
||||
# read_fd may already be closed if writer.close() tore down the pair
|
||||
# during teardown — best-effort cleanup, no signal worth surfacing.
|
||||
pass
|
||||
finally:
|
||||
_mcp._CHANNEL_NOTIFICATION_METHOD = old_method
|
||||
os.close(read_fd)
|
||||
except OSError:
|
||||
# read_fd may already be closed if writer.close() tore down the pair
|
||||
# during teardown — best-effort cleanup, no signal worth surfacing.
|
||||
pass
|
||||
|
||||
|
||||
async def test_inbox_bridge_swallows_closed_pipe_drain_error(monkeypatch):
|
||||
@@ -1826,75 +1808,99 @@ def test_inbox_bridge_swallows_closed_loop_runtime_error():
|
||||
|
||||
|
||||
class TestStdioPipeAssertion:
|
||||
"""Pin _warn_if_stdio_not_pipe — the diagnostic warning that replaces
|
||||
the old fatal _assert_stdio_is_pipe_compatible guard.
|
||||
|
||||
The universal stdio transport now works with ANY file descriptor
|
||||
(pipes, regular files, PTYs, sockets), so the old exit-2 behavior
|
||||
is gone. These tests verify the warning is emitted for non-pipe
|
||||
stdio so operators still get diagnostic signal when debugging.
|
||||
"""Pin _assert_stdio_is_pipe_compatible — the friendly fail-fast guard
|
||||
that turns asyncio's `ValueError: Pipe transport is only for pipes,
|
||||
sockets and character devices` into a clear operator message + exit 2.
|
||||
See molecule-ai-workspace-runtime#61.
|
||||
"""
|
||||
|
||||
def test_pipe_pair_passes_silently(self, caplog):
|
||||
"""Happy path — both fds are pipes. No warning emitted."""
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
def test_pipe_pair_passes_silently(self):
|
||||
"""Happy path — both fds are pipes (the production launch shape
|
||||
from any MCP client). Should return None without printing or
|
||||
exiting."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
|
||||
r, w = os.pipe()
|
||||
try:
|
||||
with caplog.at_level("WARNING"):
|
||||
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
|
||||
assert "not a pipe" not in caplog.text
|
||||
# No exit, no stderr noise. We don't capture stderr here
|
||||
# because pipe path should produce zero output.
|
||||
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w)
|
||||
finally:
|
||||
os.close(r)
|
||||
os.close(w)
|
||||
|
||||
def test_regular_file_stdout_warns(self, tmp_path, caplog):
|
||||
def test_regular_file_stdout_exits_with_friendly_message(
|
||||
self, tmp_path, capsys
|
||||
):
|
||||
"""Reproducer for runtime#61: stdout redirected to a regular file.
|
||||
Now emits a warning instead of exiting."""
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
Pre-fix this would surface upstream as
|
||||
`ValueError: Pipe transport is only for pipes...`. Post-fix we
|
||||
exit with code 2 and a stderr message that names the symptom +
|
||||
fix."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
|
||||
# stdin = pipe (so we isolate the stdout failure path);
|
||||
# stdout = regular file (the bug condition).
|
||||
r, _w = os.pipe()
|
||||
regular = tmp_path / "captured.log"
|
||||
f = open(regular, "wb")
|
||||
try:
|
||||
with caplog.at_level("WARNING"):
|
||||
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=f.fileno())
|
||||
assert "stdout" in caplog.text
|
||||
assert "not a pipe" in caplog.text
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
_assert_stdio_is_pipe_compatible(
|
||||
stdin_fd=r, stdout_fd=f.fileno()
|
||||
)
|
||||
assert excinfo.value.code == 2
|
||||
err = capsys.readouterr().err
|
||||
# Names the failing stream + the asyncio constraint that
|
||||
# would otherwise crash. Don't pin the exact wording — the
|
||||
# asserts pin the operator-recoverable signal only.
|
||||
assert "stdout" in err
|
||||
assert "regular file" in err
|
||||
assert "pipe" in err
|
||||
finally:
|
||||
f.close()
|
||||
os.close(r)
|
||||
|
||||
def test_regular_file_stdin_warns(self, tmp_path, caplog):
|
||||
"""Symmetric case — stdin redirected from a regular file."""
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
def test_regular_file_stdin_exits_with_friendly_message(
|
||||
self, tmp_path, capsys
|
||||
):
|
||||
"""Symmetric case — stdin redirected from a regular file. Same
|
||||
asyncio constraint applies via connect_read_pipe."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
|
||||
regular = tmp_path / "input.json"
|
||||
regular.write_bytes(b'{"jsonrpc":"2.0","id":1,"method":"initialize"}\n')
|
||||
f = open(regular, "rb")
|
||||
_r, w = os.pipe()
|
||||
try:
|
||||
with caplog.at_level("WARNING"):
|
||||
_warn_if_stdio_not_pipe(stdin_fd=f.fileno(), stdout_fd=w)
|
||||
assert "stdin" in caplog.text
|
||||
assert "not a pipe" in caplog.text
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
_assert_stdio_is_pipe_compatible(
|
||||
stdin_fd=f.fileno(), stdout_fd=w
|
||||
)
|
||||
assert excinfo.value.code == 2
|
||||
err = capsys.readouterr().err
|
||||
assert "stdin" in err
|
||||
assert "regular file" in err
|
||||
finally:
|
||||
f.close()
|
||||
os.close(w)
|
||||
|
||||
def test_closed_fd_warns_about_stat_error(self, caplog):
|
||||
"""If stdio is closed, os.fstat raises OSError. Warning is
|
||||
skipped silently (can't stat the fd)."""
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
def test_closed_fd_exits_with_stat_error(self, capsys):
|
||||
"""If stdio is closed (rare but seen in detached daemonized
|
||||
contexts), os.fstat raises OSError. We catch it and exit 2 with
|
||||
a guidance message instead of letting the traceback escape."""
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
|
||||
r, w = os.pipe()
|
||||
os.close(w) # Now `w` is a stale fd — fstat will fail.
|
||||
try:
|
||||
with caplog.at_level("WARNING"):
|
||||
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
|
||||
# No warning emitted because fstat failed before the check
|
||||
assert "not a pipe" not in caplog.text
|
||||
with pytest.raises(SystemExit) as excinfo:
|
||||
_assert_stdio_is_pipe_compatible(
|
||||
stdin_fd=r, stdout_fd=w
|
||||
)
|
||||
assert excinfo.value.code == 2
|
||||
err = capsys.readouterr().err
|
||||
assert "cannot stat stdout" in err
|
||||
finally:
|
||||
os.close(r)
|
||||
|
||||
|
||||
@@ -0,0 +1,671 @@
|
||||
"""Tests for the HTTP/SSE transport of a2a_mcp_server.
|
||||
|
||||
Covers:
|
||||
- _handle_http_mcp: JSON-RPC request parsing and routing
|
||||
- Starlette app routes: POST /mcp, GET /mcp/stream, GET /health
|
||||
- cli_main argparse: --transport and --port flags
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import sys
|
||||
import types
|
||||
import uuid
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _DummyRequest:
|
||||
"""Minimal request duck-type for _handle_http_mcp."""
|
||||
|
||||
def __init__(self, body_json: dict, headers: dict | None = None):
|
||||
self._body = body_json
|
||||
self.headers = headers or {}
|
||||
|
||||
async def json(self) -> dict:
|
||||
return self._body
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _handle_http_mcp — unit tests (no I/O)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_handle_http_mcp_initialize():
|
||||
"""initialize method returns protocol version, capabilities, and server info."""
|
||||
from a2a_mcp_server import _handle_http_mcp
|
||||
|
||||
req = _DummyRequest({"jsonrpc": "2.0", "id": 42, "method": "initialize", "params": {}})
|
||||
resp = await _handle_http_mcp(req)
|
||||
|
||||
assert resp["jsonrpc"] == "2.0"
|
||||
assert resp["id"] == 42
|
||||
assert "protocolVersion" in resp["result"]
|
||||
assert "capabilities" in resp["result"]
|
||||
assert resp["result"]["serverInfo"]["name"] == "molecule"
|
||||
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_handle_http_mcp_notifications_initialized_returns_none():
|
||||
"""notifications/initialized is a notification (no response needed)."""
|
||||
from a2a_mcp_server import _handle_http_mcp
|
||||
|
||||
req = _DummyRequest({"jsonrpc": "2.0", "method": "notifications/initialized"})
|
||||
resp = await _handle_http_mcp(req)
|
||||
|
||||
assert resp is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_handle_http_mcp_tools_list():
|
||||
"""tools/list returns the TOOLS schema."""
|
||||
from a2a_mcp_server import _handle_http_mcp
|
||||
|
||||
req = _DummyRequest({"jsonrpc": "2.0", "id": 7, "method": "tools/list"})
|
||||
resp = await _handle_http_mcp(req)
|
||||
|
||||
assert resp["jsonrpc"] == "2.0"
|
||||
assert resp["id"] == 7
|
||||
assert "tools" in resp["result"]
|
||||
assert isinstance(resp["result"]["tools"], list)
|
||||
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_handle_http_mcp_unknown_method_returns_error():
|
||||
"""Unknown method returns -32601 Method not found."""
|
||||
from a2a_mcp_server import _handle_http_mcp
|
||||
|
||||
req = _DummyRequest({"jsonrpc": "2.0", "id": 3, "method": "foobar", "params": {}})
|
||||
resp = await _handle_http_mcp(req)
|
||||
|
||||
assert resp["jsonrpc"] == "2.0"
|
||||
assert resp["id"] == 3
|
||||
assert resp["error"]["code"] == -32601
|
||||
assert "Method not found" in resp["error"]["message"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_handle_http_mcp_malformed_json_returns_parse_error():
|
||||
"""Request with bad JSON returns -32700 parse error."""
|
||||
from a2a_mcp_server import _handle_http_mcp
|
||||
|
||||
req = _DummyRequest.__new__(_DummyRequest)
|
||||
req.headers = {}
|
||||
req.json = AsyncMock(side_effect=ValueError("bad json"))
|
||||
|
||||
resp = await _handle_http_mcp(req)
|
||||
|
||||
assert resp["error"]["code"] == -32700
|
||||
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_handle_http_mcp_tools_call_with_get_workspace_info():
|
||||
"""tools/call for get_workspace_info returns workspace info (mocked platform call)."""
|
||||
from a2a_mcp_server import _handle_http_mcp
|
||||
|
||||
with patch("a2a_mcp_server.tool_get_workspace_info", AsyncMock(return_value="mocked info")):
|
||||
req = _DummyRequest({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 9,
|
||||
"method": "tools/call",
|
||||
"params": {"name": "get_workspace_info", "arguments": {}},
|
||||
})
|
||||
resp = await _handle_http_mcp(req)
|
||||
|
||||
assert resp["jsonrpc"] == "2.0"
|
||||
assert resp["id"] == 9
|
||||
assert resp["result"]["content"][0]["text"] == "mocked info"
|
||||
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_handle_http_mcp_tools_call_unknown_tool():
|
||||
"""tools/call for an unknown tool returns the handle_tool_call error text."""
|
||||
from a2a_mcp_server import _handle_http_mcp
|
||||
|
||||
req = _DummyRequest({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 11,
|
||||
"method": "tools/call",
|
||||
"params": {"name": "not_a_real_tool", "arguments": {}},
|
||||
})
|
||||
resp = await _handle_http_mcp(req)
|
||||
|
||||
assert resp["jsonrpc"] == "2.0"
|
||||
assert resp["id"] == 11
|
||||
assert "Unknown tool" in resp["result"]["content"][0]["text"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Starlette app — integration tests with TestClient
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def _clear_http_globals():
|
||||
"""Reset module-level HTTP state before and after each test."""
|
||||
import a2a_mcp_server
|
||||
|
||||
# Save and restore globals
|
||||
saved_queues = a2a_mcp_server._http_connection_queues.copy()
|
||||
saved_lock = a2a_mcp_server._http_connection_lock
|
||||
a2a_mcp_server._http_connection_queues.clear()
|
||||
yield
|
||||
# Restore
|
||||
a2a_mcp_server._http_connection_queues = saved_queues
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def _register_sse_queue():
|
||||
"""Register a queue for SSE push delivery (synchronous — callable from tests)."""
|
||||
conn_id = str(uuid.uuid4())
|
||||
queue = asyncio.Queue(maxsize=100)
|
||||
import a2a_mcp_server
|
||||
a2a_mcp_server._http_connection_queues[conn_id] = queue
|
||||
return conn_id, queue
|
||||
|
||||
|
||||
def _build_test_app(port: int = 9100):
|
||||
"""Build the Starlette app for testing without starting a real server.
|
||||
|
||||
Mirrors the app construction inside _run_http_server, but returns
|
||||
the app directly so TestClient can drive it without binding a port.
|
||||
"""
|
||||
from starlette.applications import Starlette
|
||||
from starlette.routing import Route
|
||||
|
||||
import a2a_mcp_server
|
||||
|
||||
async def mcp_handler(request):
|
||||
conn_id = request.headers.get("x-mcp-conn-id", "default")
|
||||
response = await a2a_mcp_server._handle_http_mcp(request)
|
||||
if response is None:
|
||||
from starlette.responses import Response
|
||||
return Response(status_code=202)
|
||||
async with a2a_mcp_server._http_connection_lock:
|
||||
queue = a2a_mcp_server._http_connection_queues.get(conn_id)
|
||||
if queue is not None and not queue.full():
|
||||
await queue.put(response)
|
||||
from starlette.responses import Response
|
||||
return Response(status_code=202)
|
||||
from starlette.responses import JSONResponse
|
||||
return JSONResponse(response)
|
||||
|
||||
async def sse_handler(request):
|
||||
conn_id, queue = _register_sse_queue()
|
||||
|
||||
import asyncio as _asyncio
|
||||
|
||||
async def event_stream():
|
||||
import json as _json
|
||||
yield f"event: connected\ndata: {_json.dumps({'conn_id': conn_id})}\n\n"
|
||||
try:
|
||||
while True:
|
||||
response = await _asyncio.wait_for(queue.get(), timeout=300)
|
||||
import json as _json
|
||||
yield f"event: message\ndata: {_json.dumps(response)}\n\n"
|
||||
if queue.empty():
|
||||
yield "event: heartbeat\ndata: null\n\n"
|
||||
except _asyncio.TimeoutError:
|
||||
pass
|
||||
finally:
|
||||
async with a2a_mcp_server._http_connection_lock:
|
||||
a2a_mcp_server._http_connection_queues.pop(conn_id, None)
|
||||
|
||||
from starlette.responses import StreamingResponse
|
||||
return StreamingResponse(
|
||||
event_stream(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
)
|
||||
|
||||
async def health_handler(_request):
|
||||
from starlette.responses import JSONResponse
|
||||
return JSONResponse({"ok": True, "transport": "http+sse", "port": port})
|
||||
|
||||
return Starlette(
|
||||
routes=[
|
||||
Route("/mcp", mcp_handler, methods=["POST"]),
|
||||
Route("/mcp/stream", sse_handler, methods=["GET"]),
|
||||
Route("/health", health_handler),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class TestHTTPAppRoutes:
|
||||
"""Integration tests using Starlette TestClient against the HTTP app.
|
||||
|
||||
Starlette TestClient uses the ASGI interface directly (no real HTTP server
|
||||
or uvicorn needed), so no uvicorn mock is required.
|
||||
"""
|
||||
|
||||
def test_health_returns_ok_and_transport(self, _clear_http_globals):
|
||||
from starlette.testclient import TestClient
|
||||
|
||||
app = _build_test_app(port=9100)
|
||||
with TestClient(app) as client:
|
||||
resp = client.get("/health")
|
||||
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["ok"] is True
|
||||
assert data["transport"] == "http+sse"
|
||||
assert data["port"] == 9100
|
||||
|
||||
def test_health_accepts_different_port(self, _clear_http_globals):
|
||||
from starlette.testclient import TestClient
|
||||
|
||||
app = _build_test_app(port=9999)
|
||||
with TestClient(app) as client:
|
||||
resp = client.get("/health")
|
||||
|
||||
assert resp.json()["port"] == 9999
|
||||
|
||||
def test_mcp_post_initialize(self, _clear_http_globals):
|
||||
from starlette.testclient import TestClient
|
||||
|
||||
app = _build_test_app()
|
||||
with TestClient(app) as client:
|
||||
resp = client.post("/mcp", json={
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": "initialize",
|
||||
"params": {},
|
||||
})
|
||||
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["id"] == 1
|
||||
assert "protocolVersion" in data["result"]
|
||||
|
||||
def test_mcp_post_tools_list(self, _clear_http_globals):
|
||||
from starlette.testclient import TestClient
|
||||
|
||||
app = _build_test_app()
|
||||
with TestClient(app) as client:
|
||||
resp = client.post("/mcp", json={
|
||||
"jsonrpc": "2.0",
|
||||
"id": 2,
|
||||
"method": "tools/list",
|
||||
"params": {},
|
||||
})
|
||||
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert "tools" in data["result"]
|
||||
assert len(data["result"]["tools"]) > 0
|
||||
|
||||
def test_mcp_post_notifications_initialized_returns_202(self, _clear_http_globals):
|
||||
from starlette.testclient import TestClient
|
||||
|
||||
app = _build_test_app()
|
||||
with TestClient(app) as client:
|
||||
resp = client.post("/mcp", json={
|
||||
"jsonrpc": "2.0",
|
||||
"method": "notifications/initialized",
|
||||
})
|
||||
|
||||
# Notifications return 202 with no body
|
||||
assert resp.status_code == 202
|
||||
|
||||
def test_mcp_post_unknown_method_returns_200_with_error(self, _clear_http_globals):
|
||||
from starlette.testclient import TestClient
|
||||
|
||||
app = _build_test_app()
|
||||
with TestClient(app) as client:
|
||||
resp = client.post("/mcp", json={
|
||||
"jsonrpc": "2.0",
|
||||
"id": 5,
|
||||
"method": "no_such_method",
|
||||
"params": {},
|
||||
})
|
||||
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["error"]["code"] == -32601
|
||||
|
||||
def test_mcp_post_malformed_json_returns_error(self, _clear_http_globals):
|
||||
"""Malformed JSON body returns a JSON-RPC parse-error response (HTTP 200)."""
|
||||
from starlette.testclient import TestClient
|
||||
|
||||
app = _build_test_app()
|
||||
with TestClient(app, raise_server_exceptions=False) as client:
|
||||
resp = client.post(
|
||||
"/mcp",
|
||||
content=b"not json at all",
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
# _handle_http_mcp catches ValueError from request.json() and returns
|
||||
# a JSON-RPC parse-error response with HTTP 200.
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["error"]["code"] == -32700
|
||||
assert "Parse error" in resp.json()["error"]["message"]
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_sse_stream_populates_queue(self, _clear_http_globals):
|
||||
"""_register_sse_queue adds a queue to _http_connection_queues before any async work."""
|
||||
import a2a_mcp_server
|
||||
|
||||
conn_id, queue = _register_sse_queue()
|
||||
|
||||
# The queue is registered synchronously — no await needed, no cleanup ran yet.
|
||||
assert conn_id in a2a_mcp_server._http_connection_queues
|
||||
assert len(conn_id) == 36 # valid UUID format
|
||||
assert not queue.full()
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_sse_queue_delivers_response(self, _clear_http_globals):
|
||||
"""POST /mcp with x-mcp-conn-id routes response into the SSE queue."""
|
||||
import uuid
|
||||
|
||||
import a2a_mcp_server
|
||||
from starlette.testclient import TestClient
|
||||
|
||||
# Pre-register an SSE queue to simulate an active SSE subscriber
|
||||
conn_id = str(uuid.uuid4())
|
||||
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
|
||||
async with a2a_mcp_server._http_connection_lock:
|
||||
a2a_mcp_server._http_connection_queues[conn_id] = queue
|
||||
|
||||
# POST a tools/call with the conn_id header
|
||||
with TestClient(_build_test_app()) as client:
|
||||
with patch("a2a_mcp_server.tool_get_workspace_info", AsyncMock(return_value="test-ws-info")):
|
||||
resp = client.post(
|
||||
"/mcp",
|
||||
headers={"x-mcp-conn-id": conn_id},
|
||||
json={
|
||||
"jsonrpc": "2.0",
|
||||
"id": 99,
|
||||
"method": "tools/call",
|
||||
"params": {"name": "get_workspace_info", "arguments": {}},
|
||||
},
|
||||
)
|
||||
|
||||
# The handler returns 202 because the response was queued for SSE delivery
|
||||
assert resp.status_code == 202
|
||||
|
||||
# Verify the response was placed in the SSE queue
|
||||
result = await asyncio.wait_for(queue.get(), timeout=2.0)
|
||||
assert result["id"] == 99
|
||||
assert result["result"]["content"][0]["text"] == "test-ws-info"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# handle_tool_call — remaining tool branches
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_handle_http_mcp_tools_call_send_message_to_user_with_mixed_attachments():
|
||||
"""attachments with non-string elements are filtered; the list branch is exercised."""
|
||||
from a2a_mcp_server import _handle_http_mcp
|
||||
|
||||
with patch("a2a_mcp_server.tool_send_message_to_user", AsyncMock(return_value="sent ok")) as mock_fn:
|
||||
req = _DummyRequest({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 21,
|
||||
"method": "tools/call",
|
||||
"params": {
|
||||
"name": "send_message_to_user",
|
||||
"arguments": {
|
||||
"message": "hello",
|
||||
# Mixed types: list contains a dict (non-string) and an empty string
|
||||
"attachments": [{"url": "http://x"}, "", "valid.zip", None],
|
||||
},
|
||||
},
|
||||
})
|
||||
resp = await _handle_http_mcp(req)
|
||||
|
||||
assert resp["result"]["content"][0]["text"] == "sent ok"
|
||||
# Only string, non-empty values passed through
|
||||
mock_fn.assert_called_once()
|
||||
_, kwargs = mock_fn.call_args
|
||||
assert kwargs["attachments"] == ["valid.zip"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_handle_http_mcp_tools_call_wait_for_message():
|
||||
"""wait_for_message is dispatched and returns the wrapped result."""
|
||||
from a2a_mcp_server import _handle_http_mcp
|
||||
|
||||
with patch("a2a_mcp_server.tool_wait_for_message", AsyncMock(return_value="no messages")):
|
||||
req = _DummyRequest({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 22,
|
||||
"method": "tools/call",
|
||||
"params": {"name": "wait_for_message", "arguments": {"timeout_secs": 5.0}},
|
||||
})
|
||||
resp = await _handle_http_mcp(req)
|
||||
|
||||
assert resp["result"]["content"][0]["text"] == "no messages"
|
||||
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_handle_http_mcp_tools_call_inbox_peek():
|
||||
"""inbox_peek is dispatched with the limit argument."""
|
||||
from a2a_mcp_server import _handle_http_mcp
|
||||
|
||||
with patch("a2a_mcp_server.tool_inbox_peek", AsyncMock(return_value="2 items")):
|
||||
req = _DummyRequest({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 23,
|
||||
"method": "tools/call",
|
||||
"params": {"name": "inbox_peek", "arguments": {"limit": 5}},
|
||||
})
|
||||
resp = await _handle_http_mcp(req)
|
||||
|
||||
assert resp["result"]["content"][0]["text"] == "2 items"
|
||||
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_handle_http_mcp_tools_call_inbox_pop():
|
||||
"""inbox_pop is dispatched with the activity_id argument."""
|
||||
from a2a_mcp_server import _handle_http_mcp
|
||||
|
||||
with patch("a2a_mcp_server.tool_inbox_pop", AsyncMock(return_value="acked")):
|
||||
req = _DummyRequest({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 24,
|
||||
"method": "tools/call",
|
||||
"params": {"name": "inbox_pop", "arguments": {"activity_id": "abc-123"}},
|
||||
})
|
||||
resp = await _handle_http_mcp(req)
|
||||
|
||||
assert resp["result"]["content"][0]["text"] == "acked"
|
||||
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_handle_http_mcp_tools_call_chat_history():
|
||||
"""chat_history is dispatched with peer_id, limit, and before_ts arguments."""
|
||||
from a2a_mcp_server import _handle_http_mcp
|
||||
|
||||
with patch("a2a_mcp_server.tool_chat_history", AsyncMock(return_value="history")):
|
||||
req = _DummyRequest({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 25,
|
||||
"method": "tools/call",
|
||||
"params": {
|
||||
"name": "chat_history",
|
||||
"arguments": {"peer_id": "ws-peer-1", "limit": 10, "before_ts": ""},
|
||||
},
|
||||
})
|
||||
resp = await _handle_http_mcp(req)
|
||||
|
||||
assert resp["result"]["content"][0]["text"] == "history"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# cli_main argparse — unit tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_mcp_post_falls_back_to_json_when_sse_queue_is_full(_clear_http_globals):
|
||||
"""When the SSE queue is full (>100 pending), the handler returns JSON directly."""
|
||||
import a2a_mcp_server
|
||||
from starlette.testclient import TestClient
|
||||
|
||||
# Pre-register a queue and fill it to capacity
|
||||
conn_id = str(uuid.uuid4())
|
||||
queue: asyncio.Queue = asyncio.Queue(maxsize=2) # small queue for testing
|
||||
|
||||
async def _setup():
|
||||
async with a2a_mcp_server._http_connection_lock:
|
||||
a2a_mcp_server._http_connection_queues[conn_id] = queue
|
||||
queue.put_nowait({"id": 1})
|
||||
queue.put_nowait({"id": 2})
|
||||
|
||||
_sync_run(_setup())
|
||||
assert queue.full()
|
||||
|
||||
app = _build_test_app()
|
||||
with TestClient(app) as client:
|
||||
resp = client.post(
|
||||
"/mcp",
|
||||
headers={"x-mcp-conn-id": conn_id},
|
||||
json={"jsonrpc": "2.0", "id": 99, "method": "initialize", "params": {}},
|
||||
)
|
||||
|
||||
# With a full queue, the handler returns the response as JSON (not 202)
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["id"] == 99
|
||||
assert "result" in resp.json()
|
||||
|
||||
|
||||
def _sync_run(coro):
|
||||
"""Run a coroutine synchronously for test isolation (no real event loop needed)."""
|
||||
try:
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
return loop.run_until_complete(coro)
|
||||
finally:
|
||||
loop.close()
|
||||
except Exception:
|
||||
raise
|
||||
|
||||
|
||||
def test_cli_main_transport_stdio_calls_main(monkeypatch):
|
||||
"""cli_main(transport='stdio') calls asyncio.run(main) without HTTP."""
|
||||
import a2a_mcp_server
|
||||
|
||||
run_calls: list = []
|
||||
|
||||
async def fake_main():
|
||||
run_calls.append("called")
|
||||
|
||||
monkeypatch.setattr(a2a_mcp_server, "main", fake_main)
|
||||
monkeypatch.setattr(a2a_mcp_server.asyncio, "run", _sync_run)
|
||||
monkeypatch.setattr(a2a_mcp_server, "_assert_stdio_is_pipe_compatible", lambda: None)
|
||||
|
||||
a2a_mcp_server.cli_main(transport="stdio", port=9100)
|
||||
|
||||
assert "called" in run_calls
|
||||
|
||||
|
||||
def test_cli_main_transport_http_calls_run_http_server(monkeypatch):
|
||||
"""cli_main(transport='http') calls _run_http_server without stdio."""
|
||||
import a2a_mcp_server
|
||||
|
||||
run_http_calls = []
|
||||
|
||||
async def fake_run_http(port):
|
||||
run_http_calls.append(port)
|
||||
|
||||
# asyncio.run must execute the coroutine for _run_http_server to be called
|
||||
monkeypatch.setattr(a2a_mcp_server.asyncio, "run", _sync_run)
|
||||
monkeypatch.setattr(a2a_mcp_server, "_run_http_server", fake_run_http)
|
||||
# stdio path must not be entered
|
||||
monkeypatch.setattr(a2a_mcp_server, "_assert_stdio_is_pipe_compatible", lambda: None)
|
||||
|
||||
a2a_mcp_server.cli_main(transport="http", port=9102)
|
||||
|
||||
assert run_http_calls == [9102]
|
||||
|
||||
|
||||
def test_cli_main_http_skips_stdio_check(monkeypatch):
|
||||
"""When transport=http, _assert_stdio_is_pipe_compatible must NOT be called."""
|
||||
import a2a_mcp_server
|
||||
|
||||
called = []
|
||||
|
||||
def fake_assert():
|
||||
called.append("assert_called")
|
||||
|
||||
# Patch on the module object directly
|
||||
monkeypatch.setattr(a2a_mcp_server, "_assert_stdio_is_pipe_compatible", fake_assert)
|
||||
monkeypatch.setattr(a2a_mcp_server.asyncio, "run", lambda fn: None)
|
||||
|
||||
a2a_mcp_server.cli_main(transport="http", port=9100)
|
||||
|
||||
assert "assert_called" not in called
|
||||
|
||||
|
||||
def test_cli_main_default_transport_is_stdio(monkeypatch):
|
||||
"""cli_main() with no args defaults to stdio transport."""
|
||||
import a2a_mcp_server
|
||||
|
||||
called_as: list = []
|
||||
|
||||
async def fake_main():
|
||||
called_as.append("called")
|
||||
|
||||
monkeypatch.setattr(a2a_mcp_server, "main", fake_main)
|
||||
monkeypatch.setattr(a2a_mcp_server.asyncio, "run", _sync_run)
|
||||
monkeypatch.setattr(a2a_mcp_server, "_assert_stdio_is_pipe_compatible", lambda: None)
|
||||
|
||||
a2a_mcp_server.cli_main() # No args — defaults to stdio
|
||||
|
||||
assert "called" in called_as
|
||||
|
||||
|
||||
def test_cli_main_main_raises_propagates(monkeypatch):
|
||||
"""If main() raises, cli_main() re-raises (doesn't swallow)."""
|
||||
import a2a_mcp_server
|
||||
|
||||
async def fake_main():
|
||||
raise RuntimeError("boom")
|
||||
|
||||
monkeypatch.setattr(a2a_mcp_server, "main", fake_main)
|
||||
monkeypatch.setattr(a2a_mcp_server.asyncio, "run", _sync_run)
|
||||
monkeypatch.setattr(a2a_mcp_server, "_assert_stdio_is_pipe_compatible", lambda: None)
|
||||
|
||||
with pytest.raises(RuntimeError, match="boom"):
|
||||
a2a_mcp_server.cli_main(transport="stdio")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# uvicorn/starlette lazy-import
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_run_http_server_is_coroutine_function():
|
||||
"""_run_http_server is a coroutine function accepting a port argument."""
|
||||
import inspect
|
||||
from a2a_mcp_server import _run_http_server
|
||||
|
||||
assert inspect.iscoroutinefunction(_run_http_server)
|
||||
|
||||
|
||||
def test_run_http_server_signature_port_int():
|
||||
"""_run_http_server accepts port as int."""
|
||||
import inspect
|
||||
from a2a_mcp_server import _run_http_server
|
||||
|
||||
sig = inspect.signature(_run_http_server)
|
||||
assert "port" in sig.parameters
|
||||
assert sig.parameters["port"].annotation == int
|
||||
@@ -0,0 +1,107 @@
|
||||
"""Test coverage for builtin_tools.security._redact_secrets().
|
||||
|
||||
Issue #834 (C2): commit_memory must not persist API keys verbatim.
|
||||
|
||||
Pre-commit hook blocks bare secret-like strings (ghp_, sk-ant-, etc.) to prevent
|
||||
accidental commits of real credentials. These tests focus on the functional
|
||||
behaviour of the redaction logic: idempotency, contextual keyword=value patterns,
|
||||
boundary cases, and mixed content — without triggering the hook's length thresholds.
|
||||
The pre-commit hook itself is the primary guard for bare-pattern detection.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from builtin_tools.security import REDACTED, _redact_secrets
|
||||
|
||||
|
||||
class TestRedactContextual:
|
||||
"""Keyword=value patterns with high-entropy values (under pre-commit threshold)."""
|
||||
|
||||
def test_api_key_contextual(self):
|
||||
"""api_key=X where X ≥ 40 base64 chars → value replaced, keyword preserved."""
|
||||
value = "A" * 40
|
||||
assert _redact_secrets(f"api_key={value}") == f"api_key={REDACTED}"
|
||||
|
||||
def test_keyword_contextual(self):
|
||||
"""Generic 'key=' also matches."""
|
||||
value = "B" * 45
|
||||
assert _redact_secrets(f"key={value}") == f"key={REDACTED}"
|
||||
|
||||
def test_secret_contextual(self):
|
||||
value = "C" * 50
|
||||
assert _redact_secrets(f"secret= {value}") == f"secret= {REDACTED}"
|
||||
|
||||
def test_token_contextual(self):
|
||||
value = "D" * 40
|
||||
assert _redact_secrets(f"token={value}") == f"token={REDACTED}"
|
||||
|
||||
def test_password_contextual(self):
|
||||
value = "E" * 50
|
||||
assert _redact_secrets(f"password={value}") == f"password={REDACTED}"
|
||||
|
||||
def test_keyword_spacing_tolerated(self):
|
||||
"""Spaces around = are tolerated by the pattern."""
|
||||
value = "F" * 40
|
||||
assert _redact_secrets(f"key = {value}") == f"key = {REDACTED}"
|
||||
|
||||
def test_contextual_too_short_not_redacted(self):
|
||||
"""Value shorter than 40 chars is not redacted."""
|
||||
short = "A" * 39
|
||||
assert _redact_secrets(f"api_key={short}") == f"api_key={short}"
|
||||
|
||||
def test_case_insensitive_keyword(self):
|
||||
"""Keyword matching is case-insensitive."""
|
||||
value = "G" * 40
|
||||
assert _redact_secrets(f"API_KEY={value}") == f"API_KEY={REDACTED}"
|
||||
assert _redact_secrets(f"Token={value}") == f"Token={REDACTED}"
|
||||
assert _redact_secrets(f"SECRET={value}") == f"SECRET={REDACTED}"
|
||||
|
||||
def test_boundary_preserved(self):
|
||||
"""Contextual pattern preserves the keyword; only value is replaced."""
|
||||
value = "H" * 40
|
||||
result = _redact_secrets(f"api_key={value}")
|
||||
assert result.startswith("api_key=")
|
||||
assert result.endswith(REDACTED)
|
||||
assert result == f"api_key={REDACTED}"
|
||||
|
||||
def test_base64_chars_in_value(self):
|
||||
"""Base64 alphabet chars (/ +) in value are covered by the charset."""
|
||||
# 40-char string with base64 chars
|
||||
value = "A" * 20 + "/+" + "A" * 18
|
||||
result = _redact_secrets(f"api_key={value}")
|
||||
assert result == f"api_key={REDACTED}"
|
||||
|
||||
|
||||
class TestRedactEdgeCases:
|
||||
"""Non-secret strings, idempotency, and boundary conditions."""
|
||||
|
||||
def test_idempotent(self):
|
||||
"""Calling redaction twice produces the same result."""
|
||||
text = f"token={'A' * 40}"
|
||||
first = _redact_secrets(text)
|
||||
second = _redact_secrets(first)
|
||||
assert second == first
|
||||
assert REDACTED in first
|
||||
|
||||
def test_already_redacted_string(self):
|
||||
"""The [REDACTED] sentinel itself is not matched by any pattern."""
|
||||
assert _redact_secrets(f"see {REDACTED} here") == f"see {REDACTED} here"
|
||||
|
||||
def test_no_match_passthrough(self):
|
||||
"""Normal prose passes through unchanged."""
|
||||
assert _redact_secrets("The answer is 42.") == "The answer is 42."
|
||||
assert _redact_secrets("Hello, world!") == "Hello, world!"
|
||||
assert _redact_secrets("api_key short") == "api_key short"
|
||||
assert _redact_secrets("") == ""
|
||||
|
||||
def test_empty_string(self):
|
||||
assert _redact_secrets("") == ""
|
||||
|
||||
def test_short_value_not_secret(self):
|
||||
"""A short string after a keyword= prefix is not a secret."""
|
||||
assert _redact_secrets("token=short") == "token=short"
|
||||
|
||||
def test_mixed_content(self):
|
||||
"""Real text with a secret-like prefix → only the secret is redacted."""
|
||||
value = "A" * 40
|
||||
result = _redact_secrets(f"found secret: api_key={value} in config")
|
||||
assert result == f"found secret: api_key={REDACTED} in config"
|
||||
Reference in New Issue
Block a user