Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 679ed9a697 | |||
| 785112955f | |||
| af90c80e52 | |||
| e51f7004b3 | |||
| 8868cbe1a4 | |||
| 0cf2fa6297 | |||
| 39f2dd99aa | |||
| 51f5aa82ee | |||
| 15c058071a | |||
| c7ffa43166 | |||
| 9b445366f6 | |||
| 3fadf89e43 |
@@ -0,0 +1 @@
|
||||
refire:1778784369
|
||||
@@ -69,6 +69,13 @@ name: E2E API Smoke Test
|
||||
# 2318) shows Postgres ready in 3s, Redis in 1s, Platform in 1s when
|
||||
# they DO come up. Timeouts are not the bottleneck; not bumped.
|
||||
#
|
||||
# Item #1046 (fixed 2026-05-14): Stale platform-server from cancelled runs
|
||||
# lingers on :8080 after "Stop platform" step is skipped (workflow cancelled
|
||||
# before reaching line 335). Added a pre-start "Kill stale platform-server"
|
||||
# step (line 286) that scans /proc for zombie platform-server processes
|
||||
# and kills them before the port probe or bind. Makes the ephemeral port
|
||||
# probe + start sequence deterministic.
|
||||
#
|
||||
# Item explicitly NOT fixed here: failing test `Status back online`
|
||||
# fails because the platform's langgraph workspace template image
|
||||
# (ghcr.io/molecule-ai/workspace-template-langgraph:latest) returns
|
||||
@@ -283,6 +290,35 @@ jobs:
|
||||
echo "PORT=${PLATFORM_PORT}" >> "$GITHUB_ENV"
|
||||
echo "BASE=http://127.0.0.1:${PLATFORM_PORT}" >> "$GITHUB_ENV"
|
||||
echo "Platform host port: ${PLATFORM_PORT}"
|
||||
- name: Kill stale platform-server before start (issue #1046)
|
||||
if: needs.detect-changes.outputs.api == 'true'
|
||||
run: |
|
||||
# Concurrent runs on the same host-network act_runner can leave a
|
||||
# zombie platform-server from a cancelled/timeout run. Cancelled
|
||||
# runs never reach the "Stop platform" step (line 335), so the
|
||||
# old process lingers. Kill it before the ephemeral port probe
|
||||
# or start so the port is definitively free.
|
||||
#
|
||||
# /proc scan — works on any Linux without pkill/lsof/ss.
|
||||
# comm field is truncated to 15 chars: "platform-serve" matches
|
||||
# "platform-server". Verify with cmdline to avoid false positives.
|
||||
killed=0
|
||||
for pid in $(grep -l "platform-serve" /proc/[0-9]*/comm 2>/dev/null); do
|
||||
kpid="${pid%/comm}"
|
||||
kpid="${kpid##*/}"
|
||||
cmdline=$(cat "/proc/${kpid}/cmdline" 2>/dev/null | tr '\0' ' ')
|
||||
if echo "$cmdline" | grep -q "platform-server"; then
|
||||
echo "Killing stale platform-server pid ${kpid}: ${cmdline}"
|
||||
kill "$kpid" 2>/dev/null || true
|
||||
killed=$((killed + 1))
|
||||
fi
|
||||
done
|
||||
if [ "$killed" -gt 0 ]; then
|
||||
sleep 2
|
||||
echo "Killed $killed stale process(es); port(s) released."
|
||||
else
|
||||
echo "No stale platform-server found."
|
||||
fi
|
||||
- name: Start platform (background)
|
||||
if: needs.detect-changes.outputs.api == 'true'
|
||||
working-directory: workspace-server
|
||||
@@ -346,3 +382,4 @@ jobs:
|
||||
run: |
|
||||
docker rm -f "$PG_CONTAINER" 2>/dev/null || true
|
||||
docker rm -f "$REDIS_CONTAINER" 2>/dev/null || true
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
// that the desktop ChatTab uses, but with a slimmer surface: no
|
||||
// attachments, no A2A topology overlay, no conversation tracing.
|
||||
|
||||
import { useEffect, useRef, useState } from "react";
|
||||
import { useCallback, useEffect, useRef, useState } from "react";
|
||||
|
||||
import { api } from "@/lib/api";
|
||||
import { useCanvasStore } from "@/store/canvas";
|
||||
@@ -50,26 +50,13 @@ export function MobileChat({
|
||||
}) {
|
||||
const p = usePalette(dark);
|
||||
const node = useCanvasStore((s) => s.nodes.find((n) => n.id === agentId));
|
||||
// Bootstrap from the canvas store's per-workspace message buffer so the
|
||||
// user sees their prior thread on entry. The store is updated by the
|
||||
// socket → ChatTab flows the desktop runs; on mobile we read from the
|
||||
// same buffer to keep state coherent across viewports.
|
||||
// NOTE: selector returns undefined (stable) — do NOT use ?? [] here,
|
||||
// that creates a new [] reference on every store update when the key is
|
||||
// absent, causing infinite re-render (React error #185).
|
||||
const storedMessages = useCanvasStore((s) => s.agentMessages[agentId]);
|
||||
const [messages, setMessages] = useState<ChatMessage[]>(() =>
|
||||
(storedMessages ?? []).map((m) => ({
|
||||
id: m.id,
|
||||
role: "agent",
|
||||
text: m.content,
|
||||
ts: formatStoredTimestamp(m.timestamp),
|
||||
})),
|
||||
);
|
||||
const [messages, setMessages] = useState<ChatMessage[]>([]);
|
||||
const [draft, setDraft] = useState("");
|
||||
const [tab, setTab] = useState<SubTab>("my");
|
||||
const [sending, setSending] = useState(false);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
const [historyLoading, setHistoryLoading] = useState(true);
|
||||
const [historyError, setHistoryError] = useState<string | null>(null);
|
||||
const scrollRef = useRef<HTMLDivElement>(null);
|
||||
// Synchronous re-entry guard. `setSending(true)` schedules a state
|
||||
// update but doesn't flush before a second tap can fire send() — a ref
|
||||
@@ -95,6 +82,74 @@ export function MobileChat({
|
||||
}
|
||||
}, [messages]);
|
||||
|
||||
// Load chat history on mount / agent switch.
|
||||
const loadHistory = useCallback(async () => {
|
||||
setHistoryLoading(true);
|
||||
setHistoryError(null);
|
||||
try {
|
||||
const resp = await api.get<{
|
||||
messages: Array<{
|
||||
id: string;
|
||||
role: string;
|
||||
content: string;
|
||||
timestamp: string;
|
||||
}>;
|
||||
}>(`/workspaces/${agentId}/chat-history?limit=50`);
|
||||
const loaded = (resp.messages ?? []).map((m) => ({
|
||||
id: m.id,
|
||||
role: m.role as "user" | "agent" | "system",
|
||||
text: m.content,
|
||||
ts: formatStoredTimestamp(m.timestamp),
|
||||
}));
|
||||
setMessages(loaded);
|
||||
} catch (e) {
|
||||
setHistoryError(e instanceof Error ? e.message : "Failed to load history");
|
||||
} finally {
|
||||
setHistoryLoading(false);
|
||||
}
|
||||
}, [agentId]);
|
||||
|
||||
useEffect(() => {
|
||||
let cancelled = false;
|
||||
loadHistory().then(() => {
|
||||
if (cancelled) return;
|
||||
// Consume any agent messages that arrived while history was loading.
|
||||
const consume = useCanvasStore.getState().consumeAgentMessages;
|
||||
const msgs = consume(agentId);
|
||||
if (msgs.length > 0) {
|
||||
setMessages((prev) => [
|
||||
...prev,
|
||||
...msgs.map((m) => ({
|
||||
id: m.id,
|
||||
role: "agent" as const,
|
||||
text: m.content,
|
||||
ts: formatStoredTimestamp(m.timestamp),
|
||||
})),
|
||||
]);
|
||||
}
|
||||
});
|
||||
return () => { cancelled = true; };
|
||||
}, [agentId, loadHistory]);
|
||||
|
||||
// Consume live agent pushes while the panel is mounted.
|
||||
const pendingAgentMsgs = useCanvasStore((s) => s.agentMessages[agentId]);
|
||||
useEffect(() => {
|
||||
if (!pendingAgentMsgs || pendingAgentMsgs.length === 0) return;
|
||||
const consume = useCanvasStore.getState().consumeAgentMessages;
|
||||
const msgs = consume(agentId);
|
||||
if (msgs.length > 0) {
|
||||
setMessages((prev) => [
|
||||
...prev,
|
||||
...msgs.map((m) => ({
|
||||
id: m.id,
|
||||
role: "agent" as const,
|
||||
text: m.content,
|
||||
ts: formatStoredTimestamp(m.timestamp),
|
||||
})),
|
||||
]);
|
||||
}
|
||||
}, [pendingAgentMsgs, agentId]);
|
||||
|
||||
if (!node) {
|
||||
return (
|
||||
<div
|
||||
@@ -308,7 +363,17 @@ export function MobileChat({
|
||||
Agent Comms — peer-to-peer A2A traffic surfaces in the Comms tab.
|
||||
</div>
|
||||
)}
|
||||
{tab === "my" && messages.length === 0 && (
|
||||
{tab === "my" && historyLoading && (
|
||||
<div style={{ padding: "20px 4px", textAlign: "center", color: p.text3, fontSize: 13 }}>
|
||||
Loading chat history…
|
||||
</div>
|
||||
)}
|
||||
{tab === "my" && !historyLoading && historyError && messages.length === 0 && (
|
||||
<div style={{ padding: "20px 4px", textAlign: "center", color: p.text3, fontSize: 13 }}>
|
||||
{historyError}
|
||||
</div>
|
||||
)}
|
||||
{tab === "my" && !historyLoading && !historyError && messages.length === 0 && (
|
||||
<div style={{ padding: "20px 4px", textAlign: "center", color: p.text3, fontSize: 13 }}>
|
||||
Send a message to start chatting.
|
||||
</div>
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
* NOTE: No @testing-library/jest-dom — use DOM APIs.
|
||||
*/
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { cleanup, render } from "@testing-library/react";
|
||||
import { cleanup, render, waitFor } from "@testing-library/react";
|
||||
import React from "react";
|
||||
|
||||
import { MobileChat } from "../MobileChat";
|
||||
@@ -33,7 +33,12 @@ const mockStoreState = {
|
||||
vi.mock("@/store/canvas", () => ({
|
||||
useCanvasStore: Object.assign(
|
||||
vi.fn((sel) => sel(mockStoreState)),
|
||||
{ getState: () => mockStoreState },
|
||||
{
|
||||
getState: () => ({
|
||||
...mockStoreState,
|
||||
consumeAgentMessages: vi.fn(() => []),
|
||||
}),
|
||||
},
|
||||
),
|
||||
summarizeWorkspaceCapabilities: vi.fn((data: Record<string, unknown>) => {
|
||||
const agentCard = data.agentCard as Record<string, unknown> | null;
|
||||
@@ -60,8 +65,12 @@ const { mockApiPost } = vi.hoisted(() => ({
|
||||
mockApiPost: vi.fn().mockResolvedValue({ result: { parts: [] } }),
|
||||
}));
|
||||
|
||||
const { mockApiGet } = vi.hoisted(() => ({
|
||||
mockApiGet: vi.fn().mockResolvedValue({ messages: [] }),
|
||||
}));
|
||||
|
||||
vi.mock("@/lib/api", () => ({
|
||||
api: { post: mockApiPost },
|
||||
api: { get: mockApiGet, post: mockApiPost },
|
||||
}));
|
||||
|
||||
// ─── Fixtures ────────────────────────────────────────────────────────────────
|
||||
@@ -148,6 +157,7 @@ function renderChat(agentId: string, dark = false) {
|
||||
|
||||
beforeEach(() => {
|
||||
mockOnBack.mockClear();
|
||||
mockApiGet.mockClear();
|
||||
mockStoreState.nodes = [];
|
||||
mockStoreState.agentMessages = {};
|
||||
mockApiPost.mockClear();
|
||||
@@ -266,16 +276,19 @@ describe("MobileChat — empty state", () => {
|
||||
mockStoreState.nodes = [onlineNode];
|
||||
});
|
||||
|
||||
it('shows "Send a message to start chatting." when no messages', () => {
|
||||
it('shows "Send a message to start chatting." when no messages', async () => {
|
||||
const { container } = renderChat(mockAgentId);
|
||||
expect(container.textContent ?? "").toContain("Send a message to start chatting.");
|
||||
await waitFor(() =>
|
||||
expect(container.textContent ?? "").toContain("Send a message to start chatting."),
|
||||
);
|
||||
});
|
||||
|
||||
it("shows no messages when agentMessages[agentId] is absent (undefined)", () => {
|
||||
// Explicitly set to empty to simulate no stored messages
|
||||
it("shows no messages when agentMessages[agentId] is absent (undefined)", async () => {
|
||||
mockStoreState.agentMessages = {};
|
||||
const { container } = renderChat(mockAgentId);
|
||||
expect(container.textContent ?? "").toContain("Send a message to start chatting.");
|
||||
await waitFor(() =>
|
||||
expect(container.textContent ?? "").toContain("Send a message to start chatting."),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -686,8 +686,8 @@ def _format_channel_content(
|
||||
# --- MCP Server (JSON-RPC over stdio) ---
|
||||
|
||||
|
||||
def _warn_if_stdio_not_pipe(stdin_fd: int = 0, stdout_fd: int = 1) -> None:
|
||||
"""Warn when stdio isn't a pipe — but continue anyway.
|
||||
def _assert_stdio_is_pipe_compatible(stdin_fd: int = 0, stdout_fd: int = 1) -> None:
|
||||
"""Assert that stdio fds are pipe/socket/char-device compatible.
|
||||
|
||||
The legacy asyncio.connect_read_pipe / connect_write_pipe transport
|
||||
rejected regular files, PTYs, and sockets with:
|
||||
@@ -711,6 +711,10 @@ def _warn_if_stdio_not_pipe(stdin_fd: int = 0, stdout_fd: int = 1) -> None:
|
||||
)
|
||||
|
||||
|
||||
# Deprecated alias — the canonical name is _assert_stdio_is_pipe_compatible.
|
||||
_warn_if_stdio_not_pipe = _assert_stdio_is_pipe_compatible
|
||||
|
||||
|
||||
async def main(): # pragma: no cover
|
||||
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses.
|
||||
|
||||
@@ -967,7 +971,7 @@ def cli_main(transport: str = "stdio", port: int = 9100) -> None: # pragma: no
|
||||
if transport == "http":
|
||||
asyncio.run(_run_http_server(port))
|
||||
else:
|
||||
_warn_if_stdio_not_pipe()
|
||||
_assert_stdio_is_pipe_compatible()
|
||||
asyncio.run(main())
|
||||
|
||||
|
||||
|
||||
@@ -1826,8 +1826,8 @@ def test_inbox_bridge_swallows_closed_loop_runtime_error():
|
||||
|
||||
|
||||
class TestStdioPipeAssertion:
|
||||
"""Pin _warn_if_stdio_not_pipe — the diagnostic warning that replaces
|
||||
the old fatal _assert_stdio_is_pipe_compatible guard.
|
||||
"""Pin _assert_stdio_is_pipe_compatible — the canonical function name.
|
||||
_warn_if_stdio_not_pipe is a deprecated alias.
|
||||
|
||||
The universal stdio transport now works with ANY file descriptor
|
||||
(pipes, regular files, PTYs, sockets), so the old exit-2 behavior
|
||||
@@ -1838,12 +1838,12 @@ class TestStdioPipeAssertion:
|
||||
|
||||
def test_pipe_pair_passes_silently(self, caplog):
|
||||
"""Happy path — both fds are pipes. No warning emitted."""
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
|
||||
r, w = os.pipe()
|
||||
try:
|
||||
with caplog.at_level("WARNING"):
|
||||
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
|
||||
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w)
|
||||
assert "not a pipe" not in caplog.text
|
||||
finally:
|
||||
os.close(r)
|
||||
@@ -1852,14 +1852,14 @@ class TestStdioPipeAssertion:
|
||||
def test_regular_file_stdout_warns(self, tmp_path, caplog):
|
||||
"""Reproducer for runtime#61: stdout redirected to a regular file.
|
||||
Now emits a warning instead of exiting."""
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
|
||||
r, _w = os.pipe()
|
||||
regular = tmp_path / "captured.log"
|
||||
f = open(regular, "wb")
|
||||
try:
|
||||
with caplog.at_level("WARNING"):
|
||||
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=f.fileno())
|
||||
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=f.fileno())
|
||||
assert "stdout" in caplog.text
|
||||
assert "not a pipe" in caplog.text
|
||||
finally:
|
||||
@@ -1868,7 +1868,7 @@ class TestStdioPipeAssertion:
|
||||
|
||||
def test_regular_file_stdin_warns(self, tmp_path, caplog):
|
||||
"""Symmetric case — stdin redirected from a regular file."""
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
|
||||
regular = tmp_path / "input.json"
|
||||
regular.write_bytes(b'{"jsonrpc":"2.0","id":1,"method":"initialize"}\n')
|
||||
@@ -1876,7 +1876,7 @@ class TestStdioPipeAssertion:
|
||||
_r, w = os.pipe()
|
||||
try:
|
||||
with caplog.at_level("WARNING"):
|
||||
_warn_if_stdio_not_pipe(stdin_fd=f.fileno(), stdout_fd=w)
|
||||
_assert_stdio_is_pipe_compatible(stdin_fd=f.fileno(), stdout_fd=w)
|
||||
assert "stdin" in caplog.text
|
||||
assert "not a pipe" in caplog.text
|
||||
finally:
|
||||
@@ -1886,13 +1886,13 @@ class TestStdioPipeAssertion:
|
||||
def test_closed_fd_warns_about_stat_error(self, caplog):
|
||||
"""If stdio is closed, os.fstat raises OSError. Warning is
|
||||
skipped silently (can't stat the fd)."""
|
||||
from a2a_mcp_server import _warn_if_stdio_not_pipe
|
||||
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
|
||||
|
||||
r, w = os.pipe()
|
||||
os.close(w) # Now `w` is a stale fd — fstat will fail.
|
||||
try:
|
||||
with caplog.at_level("WARNING"):
|
||||
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
|
||||
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w)
|
||||
# No warning emitted because fstat failed before the check
|
||||
assert "not a pipe" not in caplog.text
|
||||
finally:
|
||||
|
||||
Reference in New Issue
Block a user