forked from molecule-ai/molecule-core
Compare commits
19 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f5ea812e9d | |||
| c4807a930d | |||
| d22fbb29b8 | |||
| 899c53550d | |||
| cdfc9f743f | |||
| 7a2664523c | |||
| 632e906640 | |||
| 475da5b64c | |||
| 1ad107cc15 | |||
| e4bd1e4293 | |||
| 01deeb36cf | |||
| b906e1da61 | |||
| abc3affcb6 | |||
| 3322524b0f | |||
| de01ff51b0 | |||
| f3782662bd | |||
| 0d0840d9d9 | |||
| 8e5d193761 | |||
| 3e0d2e650a |
@@ -387,6 +387,7 @@ jobs:
|
||||
"a2a_mcp_server.py"
|
||||
"mcp_cli.py"
|
||||
"a2a_tools.py"
|
||||
"a2a_tools_inbox.py"
|
||||
"inbox.py"
|
||||
"platform_auth.py"
|
||||
)
|
||||
|
||||
@@ -20,160 +20,6 @@ import * as Dialog from "@radix-ui/react-dialog";
|
||||
|
||||
type Tab = "python" | "curl" | "claude" | "mcp" | "hermes" | "codex" | "openclaw" | "fields";
|
||||
|
||||
// Per-tab help metadata: docs link, where-to-install link, common errors.
|
||||
// All URLs verified against repo content (docs/guides/* file paths map to
|
||||
// docs.molecule.ai/docs/guides/*; canonical hostname confirmed by existing
|
||||
// blog post canonical metadata) or against the snippet text the operator
|
||||
// just copied. Never linking to a URL that wasn't already in product —
|
||||
// dead links here defeat the purpose of "more comprehensive instructions."
|
||||
const TAB_HELP: Record<
|
||||
Tab,
|
||||
{
|
||||
docsUrl?: string;
|
||||
docsLabel?: string;
|
||||
downloadUrl?: string;
|
||||
downloadLabel?: string;
|
||||
commonIssues?: { symptom: string; check: string }[];
|
||||
}
|
||||
> = {
|
||||
mcp: {
|
||||
docsUrl: "https://docs.molecule.ai/docs/guides/mcp-server-setup",
|
||||
docsLabel: "MCP server setup guide",
|
||||
downloadUrl: "https://pypi.org/project/molecule-ai-workspace-runtime/",
|
||||
downloadLabel: "molecule-ai-workspace-runtime on PyPI",
|
||||
commonIssues: [
|
||||
{
|
||||
symptom: "Tools not appearing in your agent",
|
||||
check:
|
||||
"Run `claude mcp list` (or your runtime's equivalent) — the molecule entry should be listed. If missing, re-run the `claude mcp add` line.",
|
||||
},
|
||||
{
|
||||
symptom: "ConnectionRefused / DNS error on first call",
|
||||
check:
|
||||
"PLATFORM_URL must include the scheme (https://) and have no trailing slash. Verify with `curl $PLATFORM_URL/healthz`.",
|
||||
},
|
||||
],
|
||||
},
|
||||
python: {
|
||||
docsUrl:
|
||||
"https://docs.molecule.ai/docs/guides/external-agent-registration",
|
||||
docsLabel: "External agent registration guide",
|
||||
downloadUrl: "https://pypi.org/project/molecule-ai-workspace-runtime/",
|
||||
downloadLabel: "molecule-ai-workspace-runtime on PyPI",
|
||||
commonIssues: [
|
||||
{
|
||||
symptom: "401 from /heartbeat",
|
||||
check:
|
||||
"AUTH_TOKEN expired or wrong workspace_id. Tokens are shown only once at create time — re-create the workspace to get a fresh token.",
|
||||
},
|
||||
{
|
||||
symptom: "AGENT_URL not reachable from platform",
|
||||
check:
|
||||
"Public HTTPS URL required for inbound A2A. Use ngrok or Cloudflare Tunnel if your agent is behind NAT.",
|
||||
},
|
||||
],
|
||||
},
|
||||
claude: {
|
||||
docsUrl:
|
||||
"https://docs.molecule.ai/docs/guides/external-agent-registration",
|
||||
docsLabel: "External agent registration guide",
|
||||
downloadUrl: "https://claude.com/claude-code",
|
||||
downloadLabel: "Claude Code (claude.com)",
|
||||
commonIssues: [
|
||||
{
|
||||
symptom: "plugin not installed",
|
||||
check:
|
||||
"Run `/plugin marketplace add Molecule-AI/molecule-mcp-claude-channel` then `/plugin install molecule@molecule-mcp-claude-channel` inside Claude Code, then `/reload-plugins`.",
|
||||
},
|
||||
{
|
||||
symptom: "not on the approved channels allowlist",
|
||||
check:
|
||||
"Custom channels need `--dangerously-load-development-channels` on the launch command. Team/Enterprise orgs need admin to set `channelsEnabled` + `allowedChannelPlugins` in claude.ai admin settings.",
|
||||
},
|
||||
{
|
||||
symptom: "Inbound messages not arriving",
|
||||
check:
|
||||
"Check stderr for `molecule channel: connected — watching N workspace(s)`. Verify ~/.claude/channels/molecule/.env has the right PLATFORM_URL + token.",
|
||||
},
|
||||
],
|
||||
},
|
||||
hermes: {
|
||||
docsUrl:
|
||||
"https://docs.molecule.ai/docs/guides/external-agent-registration",
|
||||
docsLabel: "External agent registration guide",
|
||||
downloadUrl: "https://github.com/NousResearch/hermes-agent",
|
||||
downloadLabel: "hermes-agent (NousResearch)",
|
||||
commonIssues: [
|
||||
{
|
||||
symptom: "Gateway start failure",
|
||||
check:
|
||||
"Tail ~/.hermes/gateway.log. YAML duplicate-key in config.yaml is the most common cause — `gateway:` block must appear exactly once.",
|
||||
},
|
||||
{
|
||||
symptom: "Plugin not discovered after install",
|
||||
check:
|
||||
"Run `pip show hermes-channel-molecule` to confirm install. Some hermes builds need `hermes plugin reload` before the new platform_plugins entry takes effect.",
|
||||
},
|
||||
],
|
||||
},
|
||||
codex: {
|
||||
docsUrl: "https://docs.molecule.ai/docs/guides/mcp-server-setup",
|
||||
docsLabel: "MCP server setup guide",
|
||||
downloadUrl: "https://github.com/openai/codex",
|
||||
downloadLabel: "openai/codex",
|
||||
commonIssues: [
|
||||
{
|
||||
symptom: "[mcp_servers.molecule] not loaded",
|
||||
check:
|
||||
"Codex must be ≥ 0.57. Check with `codex --version`; upgrade via `npm install -g @openai/codex@latest`.",
|
||||
},
|
||||
{
|
||||
symptom: "TOML parse error after re-running setup",
|
||||
check:
|
||||
"TOML rejects duplicate `[mcp_servers.molecule]` tables. Open ~/.codex/config.toml and remove the old block before pasting the new one.",
|
||||
},
|
||||
{
|
||||
symptom: "Canvas messages don't wake codex",
|
||||
check:
|
||||
"Step 3 (codex-channel-molecule bridge daemon) is required for inbound push. Check `pgrep -f codex-channel-molecule` and `tail ~/.codex-channel-molecule/daemon.log`.",
|
||||
},
|
||||
],
|
||||
},
|
||||
openclaw: {
|
||||
docsUrl: "https://docs.molecule.ai/docs/guides/mcp-server-setup",
|
||||
docsLabel: "MCP server setup guide",
|
||||
commonIssues: [
|
||||
{
|
||||
symptom: "Gateway not starting",
|
||||
check:
|
||||
"Tail ~/.openclaw/gateway.log. The loopback bind requires :18789 to be free — check with `lsof -iTCP:18789`.",
|
||||
},
|
||||
{
|
||||
symptom: "openclaw mcp set rejected",
|
||||
check:
|
||||
"The heredoc generates JSON; verify it parsed by running `jq < ~/.openclaw/mcp/molecule.json`. Re-run `openclaw mcp set` if the file is malformed.",
|
||||
},
|
||||
],
|
||||
},
|
||||
curl: {
|
||||
docsUrl:
|
||||
"https://docs.molecule.ai/docs/guides/external-agent-registration",
|
||||
docsLabel: "External agent registration guide",
|
||||
commonIssues: [
|
||||
{
|
||||
symptom: "401 / 403 on register",
|
||||
check:
|
||||
"WORKSPACE_AUTH_TOKEN must be the value shown at workspace create. Tokens are shown only once.",
|
||||
},
|
||||
],
|
||||
},
|
||||
fields: {
|
||||
docsUrl:
|
||||
"https://docs.molecule.ai/docs/guides/external-agent-registration",
|
||||
docsLabel: "External agent registration guide",
|
||||
},
|
||||
};
|
||||
|
||||
export interface ExternalConnectionInfo {
|
||||
workspace_id: string;
|
||||
platform_url: string;
|
||||
@@ -457,7 +303,6 @@ export function ExternalConnectModal({ info, onClose }: Props) {
|
||||
<Field label="heartbeat_endpoint" value={info.heartbeat_endpoint} onCopy={() => copy(info.heartbeat_endpoint, "hb")} copied={copiedKey === "hb"} />
|
||||
</div>
|
||||
)}
|
||||
<HelpBlock help={TAB_HELP[tab]} />
|
||||
</div>
|
||||
|
||||
<div className="mt-5 flex justify-end gap-2">
|
||||
@@ -506,70 +351,6 @@ function SnippetBlock({
|
||||
);
|
||||
}
|
||||
|
||||
// HelpBlock — collapsible "Need help?" section under each tab's snippet.
|
||||
// Renders only the keys present in the per-tab help metadata (no empty
|
||||
// sections). Closed by default so the snippet stays the visual focus;
|
||||
// operators with a working setup never see this. Uses native <details>
|
||||
// for keyboard accessibility (Tab + Enter) without extra ARIA wiring.
|
||||
function HelpBlock({
|
||||
help,
|
||||
}: {
|
||||
help: (typeof TAB_HELP)[Tab] | undefined;
|
||||
}) {
|
||||
if (!help) return null;
|
||||
const { docsUrl, docsLabel, downloadUrl, downloadLabel, commonIssues } = help;
|
||||
if (!docsUrl && !downloadUrl && !commonIssues?.length) return null;
|
||||
|
||||
return (
|
||||
<details className="mt-3 border border-line rounded-lg bg-surface text-xs">
|
||||
<summary className="cursor-pointer select-none px-3 py-2 text-ink-mid hover:text-ink">
|
||||
Need help? — install link, docs, common errors
|
||||
</summary>
|
||||
<div className="px-3 pb-3 pt-1 space-y-2">
|
||||
{downloadUrl && (
|
||||
<div>
|
||||
<span className="text-ink-soft">Where to install: </span>
|
||||
<a
|
||||
href={downloadUrl}
|
||||
target="_blank"
|
||||
rel="noopener noreferrer"
|
||||
className="text-accent underline hover:text-accent-strong"
|
||||
>
|
||||
{downloadLabel || downloadUrl}
|
||||
</a>
|
||||
</div>
|
||||
)}
|
||||
{docsUrl && (
|
||||
<div>
|
||||
<span className="text-ink-soft">Documentation: </span>
|
||||
<a
|
||||
href={docsUrl}
|
||||
target="_blank"
|
||||
rel="noopener noreferrer"
|
||||
className="text-accent underline hover:text-accent-strong"
|
||||
>
|
||||
{docsLabel || docsUrl}
|
||||
</a>
|
||||
</div>
|
||||
)}
|
||||
{commonIssues && commonIssues.length > 0 && (
|
||||
<div>
|
||||
<div className="text-ink-soft mb-1">Common errors:</div>
|
||||
<ul className="space-y-1.5 pl-3">
|
||||
{commonIssues.map((issue, i) => (
|
||||
<li key={i}>
|
||||
<code className="text-warm font-mono">{issue.symptom}</code>
|
||||
<span className="text-ink-mid"> — {issue.check}</span>
|
||||
</li>
|
||||
))}
|
||||
</ul>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</details>
|
||||
);
|
||||
}
|
||||
|
||||
function Field({
|
||||
label,
|
||||
value,
|
||||
|
||||
@@ -56,7 +56,9 @@ TOP_LEVEL_MODULES = {
|
||||
"a2a_mcp_server",
|
||||
"a2a_tools",
|
||||
"a2a_tools_delegation",
|
||||
"a2a_tools_inbox",
|
||||
"a2a_tools_memory",
|
||||
"a2a_tools_messaging",
|
||||
"a2a_tools_rbac",
|
||||
"adapter_base",
|
||||
"agent",
|
||||
@@ -289,10 +291,37 @@ directory** by the `publish-runtime` GitHub Actions workflow on every
|
||||
Operators running an agent outside the platform's container fleet
|
||||
(any runtime that supports MCP stdio — Claude Code, hermes, codex,
|
||||
etc.) can install this wheel and run the universal MCP server
|
||||
locally:
|
||||
locally.
|
||||
|
||||
### Requirements
|
||||
|
||||
* **Python ≥3.11.** The wheel sets `requires-python = ">=3.11"`. On
|
||||
older interpreters `pip install` returns the cryptic
|
||||
`Could not find a version that satisfies the requirement` — that
|
||||
message is pip filtering this wheel out, NOT the package missing
|
||||
from PyPI. Upgrade with `brew install python@3.12` /
|
||||
`apt install python3.12` / `pyenv install 3.12` first.
|
||||
* **`pipx` recommended over `pip`.** `pipx install` puts
|
||||
`molecule-mcp` on PATH automatically and isolates the runtime's
|
||||
deps from your system Python. Plain `pip install --user` works
|
||||
but the binary lands in `~/.local/bin` (Linux) or
|
||||
`~/Library/Python/3.X/bin` (macOS) which is often not on PATH on
|
||||
a fresh shell — `claude mcp add molecule -- molecule-mcp` then
|
||||
fails with "command not found" at first use.
|
||||
|
||||
### Install
|
||||
|
||||
```sh
|
||||
# Recommended:
|
||||
pipx install molecule-ai-workspace-runtime
|
||||
|
||||
# Alternative (manage PATH yourself):
|
||||
pip install --user molecule-ai-workspace-runtime
|
||||
```
|
||||
|
||||
### Run
|
||||
|
||||
```sh
|
||||
pip install molecule-ai-workspace-runtime
|
||||
WORKSPACE_ID=<uuid> \\
|
||||
PLATFORM_URL=https://<tenant>.staging.moleculesai.app \\
|
||||
MOLECULE_WORKSPACE_TOKEN=<bearer> \\
|
||||
@@ -305,10 +334,64 @@ runtimes already get via the workspace's auto-spawned MCP. Register
|
||||
the binary in your agent's MCP config (e.g. Claude Code's
|
||||
`claude mcp add molecule -- molecule-mcp` with the env above).
|
||||
|
||||
### Keeping the token out of shell history
|
||||
|
||||
Inline `MOLECULE_WORKSPACE_TOKEN=<bearer>` ends up in `~/.zsh_history`
|
||||
and (when registered via `claude mcp add`) plaintext in
|
||||
`~/.claude.json`. To avoid that, write the token to a 0600 file and
|
||||
point `MOLECULE_WORKSPACE_TOKEN_FILE` at it:
|
||||
|
||||
```sh
|
||||
umask 077
|
||||
printf '%s' "<bearer>" > ~/.config/molecule/token
|
||||
WORKSPACE_ID=<uuid> \\
|
||||
PLATFORM_URL=https://<tenant>.staging.moleculesai.app \\
|
||||
MOLECULE_WORKSPACE_TOKEN_FILE=$HOME/.config/molecule/token \\
|
||||
molecule-mcp
|
||||
```
|
||||
|
||||
Token resolution order: `MOLECULE_WORKSPACE_TOKEN` (inline env) →
|
||||
`MOLECULE_WORKSPACE_TOKEN_FILE` (path) → `${CONFIGS_DIR}/.auth_token`
|
||||
(in-container default).
|
||||
|
||||
The token comes from the canvas → Tokens tab. Restarting an external
|
||||
workspace from the canvas no longer revokes the token (PR #2412), so
|
||||
operator tokens persist across status nudges.
|
||||
|
||||
### Push vs poll delivery (Claude Code specifics)
|
||||
|
||||
By default the inbox runs in **poll mode** — every turn the agent
|
||||
calls `wait_for_message`, which blocks up to ~60s on
|
||||
`/activity?since_id=…`. Real-time push delivery is also supported,
|
||||
but on Claude Code it requires THREE conditions, ALL of which must
|
||||
hold:
|
||||
|
||||
1. **The MCP server declares `experimental.claude/channel`** — this
|
||||
wheel does (see `_build_initialize_result`). Nothing for you to
|
||||
do.
|
||||
2. **Claude Code installs the server as a marketplace plugin** — a
|
||||
plain `claude mcp add molecule -- molecule-mcp` produces a
|
||||
non-plugin-sourced server, which Claude Code rejects with
|
||||
`channel_enable requires a marketplace plugin`. Until the
|
||||
official `moleculesai/claude-code-plugin` marketplace lands
|
||||
(issue #2934 follow-up), operators who want push must scaffold
|
||||
their own local marketplace under
|
||||
`~/.claude/marketplaces/molecule-local/` containing a
|
||||
`marketplace.json` + `plugin.json` that points at this wheel.
|
||||
3. **Claude Code is launched with the dev-channels flag** — pass
|
||||
`--dangerously-load-development-channels plugin:molecule@<marketplace>`
|
||||
on the `claude` invocation. Without this flag the channel
|
||||
capability is silently ignored.
|
||||
|
||||
Symptom of any condition failing: messages arrive but only via the
|
||||
poll path (every ~1–60s), not real-time. There's currently no
|
||||
diagnostic surfaced — `molecule-mcp doctor` (issue #2934 follow-up)
|
||||
is planned.
|
||||
|
||||
If you don't need real-time push, the default poll path works
|
||||
universally with no extra setup; both modes converge on the same
|
||||
`inbox_pop` ack so messages never duplicate.
|
||||
|
||||
See [`docs/workspace-runtime-package.md`](https://github.com/Molecule-AI/molecule-core/blob/main/docs/workspace-runtime-package.md)
|
||||
for the publish flow and architecture.
|
||||
"""
|
||||
|
||||
@@ -0,0 +1,177 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"go/ast"
|
||||
"go/parser"
|
||||
"go/token"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestAgentMessageBroadcastsArePersisted is a forward-looking AST
|
||||
// gate: every function in this package that broadcasts an
|
||||
// `AGENT_MESSAGE` WebSocket event MUST also call
|
||||
// `INSERT INTO activity_logs` somewhere in its body.
|
||||
//
|
||||
// The reno-stars production data-loss bug (CEO Ryan PC's long-form
|
||||
// onboarding-friction message visible live but missing on reload)
|
||||
// happened because mcp_tools.go:toolSendMessageToUser broadcast WS
|
||||
// without a paired INSERT — while the HTTP /notify sibling DID
|
||||
// persist. The fix added the INSERT; this gate prevents the regression
|
||||
// class from re-emerging in any future chat-bearing tool.
|
||||
//
|
||||
// Why an AST gate vs a code-review checklist (per memory
|
||||
// feedback_behavior_based_ast_gates.md): "pin invariants by what a
|
||||
// function calls, not what it's named". The shape that loses data is:
|
||||
//
|
||||
// BroadcastOnly(_, "AGENT_MESSAGE", _) without an INSERT companion
|
||||
//
|
||||
// Any new tool that emits AGENT_MESSAGE must persist or the next
|
||||
// canvas refresh drops the message — same shape as reno-stars. A
|
||||
// reviewer can miss this; the AST walk can't.
|
||||
//
|
||||
// Allowlist: empty by intent. If a future use case genuinely needs
|
||||
// fire-and-forget broadcast (e.g., transient typing indicators that
|
||||
// should NOT survive reload), add an entry here AND document why.
|
||||
// "Doesn't need to persist" is rarely the right answer for chat —
|
||||
// the canvas history is the source of truth.
|
||||
func TestAgentMessageBroadcastsArePersisted(t *testing.T) {
|
||||
wd, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("getwd: %v", err)
|
||||
}
|
||||
entries, err := os.ReadDir(wd)
|
||||
if err != nil {
|
||||
t.Fatalf("readdir %s: %v", wd, err)
|
||||
}
|
||||
|
||||
type violation struct {
|
||||
file string
|
||||
fn string
|
||||
}
|
||||
var violations []violation
|
||||
|
||||
for _, ent := range entries {
|
||||
name := ent.Name()
|
||||
if ent.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") {
|
||||
continue
|
||||
}
|
||||
path := filepath.Join(wd, name)
|
||||
fset := token.NewFileSet()
|
||||
file, err := parser.ParseFile(fset, path, nil, parser.ParseComments)
|
||||
if err != nil {
|
||||
t.Fatalf("parse %s: %v", path, err)
|
||||
}
|
||||
|
||||
for _, decl := range file.Decls {
|
||||
fn, ok := decl.(*ast.FuncDecl)
|
||||
if !ok || fn.Body == nil {
|
||||
continue
|
||||
}
|
||||
if !funcEmitsAgentMessageBroadcast(fn) {
|
||||
continue
|
||||
}
|
||||
if !funcInsertsIntoActivityLogs(fn) {
|
||||
violations = append(violations, violation{file: name, fn: fn.Name.Name})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(violations) > 0 {
|
||||
sort.Slice(violations, func(i, j int) bool {
|
||||
if violations[i].file != violations[j].file {
|
||||
return violations[i].file < violations[j].file
|
||||
}
|
||||
return violations[i].fn < violations[j].fn
|
||||
})
|
||||
var buf strings.Builder
|
||||
for _, v := range violations {
|
||||
buf.WriteString(" - ")
|
||||
buf.WriteString(v.file)
|
||||
buf.WriteString(":")
|
||||
buf.WriteString(v.fn)
|
||||
buf.WriteString("\n")
|
||||
}
|
||||
t.Errorf(`function(s) broadcast `+"`AGENT_MESSAGE`"+` without persisting to activity_logs:
|
||||
|
||||
%s
|
||||
This is the reno-stars data-loss regression class: live message
|
||||
visible to the user, but missing on reload because activity_log was
|
||||
never written. Every chat-bearing broadcast MUST be paired with:
|
||||
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method,
|
||||
summary, response_body, status)
|
||||
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
|
||||
|
||||
See activity.go:Notify and mcp_tools.go:toolSendMessageToUser for
|
||||
the canonical shapes. Don't add an allowlist entry without a
|
||||
documented reason — the canvas chat history is the source of truth
|
||||
and silently dropping messages is a P0 user trust break.`,
|
||||
buf.String())
|
||||
}
|
||||
}
|
||||
|
||||
// funcEmitsAgentMessageBroadcast walks fn.Body for any CallExpr that
|
||||
// looks like `*.BroadcastOnly(_, "AGENT_MESSAGE", _)`.
|
||||
func funcEmitsAgentMessageBroadcast(fn *ast.FuncDecl) bool {
|
||||
var found bool
|
||||
ast.Inspect(fn.Body, func(n ast.Node) bool {
|
||||
call, ok := n.(*ast.CallExpr)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
sel, ok := call.Fun.(*ast.SelectorExpr)
|
||||
if !ok || sel.Sel.Name != "BroadcastOnly" {
|
||||
return true
|
||||
}
|
||||
// BroadcastOnly(workspaceID, eventType, payload) — the second
|
||||
// arg is the event name. Match by string-literal value.
|
||||
if len(call.Args) < 2 {
|
||||
return true
|
||||
}
|
||||
lit, ok := call.Args[1].(*ast.BasicLit)
|
||||
if !ok || lit.Kind != token.STRING {
|
||||
return true
|
||||
}
|
||||
raw := lit.Value
|
||||
if unq, err := strconv.Unquote(raw); err == nil {
|
||||
raw = unq
|
||||
}
|
||||
if raw == "AGENT_MESSAGE" {
|
||||
found = true
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
return found
|
||||
}
|
||||
|
||||
// funcInsertsIntoActivityLogs walks fn.Body for any STRING BasicLit
|
||||
// whose body contains `INSERT INTO activity_logs` (the SQL literal
|
||||
// passed to ExecContext). Matches the substring rather than a strict
|
||||
// regex because we don't care about the exact INSERT shape here —
|
||||
// only that the function persists. Specific shape pinning lives in
|
||||
// the per-handler test (see TestMCPHandler_SendMessageToUser_*).
|
||||
func funcInsertsIntoActivityLogs(fn *ast.FuncDecl) bool {
|
||||
var found bool
|
||||
ast.Inspect(fn.Body, func(n ast.Node) bool {
|
||||
lit, ok := n.(*ast.BasicLit)
|
||||
if !ok || lit.Kind != token.STRING {
|
||||
return true
|
||||
}
|
||||
raw := lit.Value
|
||||
if unq, err := strconv.Unquote(raw); err == nil {
|
||||
raw = unq
|
||||
}
|
||||
if strings.Contains(raw, "INSERT INTO activity_logs") {
|
||||
found = true
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
return found
|
||||
}
|
||||
@@ -109,6 +109,12 @@ curl -fsS -X POST "{{PLATFORM_URL}}/registry/register" \
|
||||
"version": "0.1.0"
|
||||
}
|
||||
}'
|
||||
|
||||
# Need help?
|
||||
# Documentation: https://doc.moleculesai.app/docs/guides/external-agent-registration
|
||||
# Common errors:
|
||||
# • 401 / 403 on register — WORKSPACE_AUTH_TOKEN must be the value
|
||||
# shown at workspace create. Tokens are shown only once.
|
||||
`
|
||||
|
||||
// externalChannelTemplate — Claude Code channel plugin install + .env. For
|
||||
@@ -172,6 +178,18 @@ claude --dangerously-load-development-channels \
|
||||
# Multi-workspace: comma-separate IDs and tokens (same order). See
|
||||
# https://github.com/Molecule-AI/molecule-mcp-claude-channel for
|
||||
# pairing flow, push-mode upgrade, and v0.2 roadmap.
|
||||
|
||||
# Need help?
|
||||
# Documentation: https://doc.moleculesai.app/docs/guides/claude-code-channel-plugin
|
||||
# Common errors:
|
||||
# • "plugin not installed" — run /plugin marketplace add then
|
||||
# /plugin install lines above; /reload-plugins or restart.
|
||||
# • "not on the approved channels allowlist" — custom channels need
|
||||
# --dangerously-load-development-channels; team/enterprise orgs
|
||||
# need admin to set channelsEnabled + allowedChannelPlugins.
|
||||
# • "Inbound messages not arriving" — stderr should show
|
||||
# "molecule channel: connected — watching N workspace(s)";
|
||||
# verify ~/.claude/channels/molecule/.env has PLATFORM_URL + token.
|
||||
`
|
||||
|
||||
// externalUniversalMcpTemplate — runtime-agnostic standalone path.
|
||||
@@ -224,6 +242,17 @@ claude mcp add molecule -s user -- env \
|
||||
#
|
||||
# Origin/WAF handling is built into the wheel — no manual headers
|
||||
# needed when calling tools through the MCP server.
|
||||
|
||||
# Need help?
|
||||
# Where to install: https://pypi.org/project/molecule-ai-workspace-runtime/
|
||||
# Documentation: https://doc.moleculesai.app/docs/guides/mcp-server-setup
|
||||
# Common errors:
|
||||
# • "Tools not appearing in your agent" — run ` + "`claude mcp list`" + ` (or
|
||||
# your runtime's equivalent) and confirm the molecule entry. If
|
||||
# missing, re-run the ` + "`claude mcp add`" + ` line above.
|
||||
# • "ConnectionRefused / DNS error on first call" — PLATFORM_URL must
|
||||
# include the scheme (https://) and have NO trailing slash. Verify
|
||||
# with: curl ${PLATFORM_URL}/healthz
|
||||
`
|
||||
|
||||
// externalPythonTemplate uses molecule-sdk-python's RemoteAgentClient +
|
||||
@@ -262,6 +291,15 @@ async def main():
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
||||
# Need help?
|
||||
# Where to install: https://pypi.org/project/molecule-ai-workspace-runtime/
|
||||
# Documentation: https://doc.moleculesai.app/docs/guides/external-agent-registration
|
||||
# Common errors:
|
||||
# • 401 from /heartbeat — AUTH_TOKEN expired or wrong workspace_id.
|
||||
# Tokens shown only once at create time; re-create to get a fresh one.
|
||||
# • AGENT_URL not reachable from platform — public HTTPS URL required
|
||||
# for inbound A2A. Use ngrok or Cloudflare Tunnel if behind NAT.
|
||||
`
|
||||
|
||||
// externalHermesChannelTemplate — install snippet for operators whose
|
||||
@@ -329,6 +367,16 @@ hermes gateway --replace
|
||||
#
|
||||
# Source + issue tracker:
|
||||
# https://github.com/Molecule-AI/hermes-channel-molecule
|
||||
|
||||
# Need help?
|
||||
# Documentation: https://doc.moleculesai.app/docs/guides/external-agent-registration
|
||||
# Common errors:
|
||||
# • Gateway start failure — tail ~/.hermes/gateway.log. YAML
|
||||
# duplicate-key in config.yaml is the most common cause; the
|
||||
# gateway: block must appear exactly once.
|
||||
# • Plugin not discovered after install — pip show hermes-channel-molecule
|
||||
# to confirm install. Some hermes builds need ` + "`hermes plugin reload`" + `
|
||||
# before the new platform_plugins entry takes effect.
|
||||
`
|
||||
|
||||
// externalCodexTemplate — for operators whose external agent is a
|
||||
@@ -410,6 +458,18 @@ disown
|
||||
# available to the agent, and the bridge wakes a non-interactive
|
||||
# codex turn for any inbound canvas/peer message:
|
||||
codex
|
||||
|
||||
# Need help?
|
||||
# Documentation: https://doc.moleculesai.app/docs/guides/mcp-server-setup
|
||||
# Common errors:
|
||||
# • [mcp_servers.molecule] not loaded — codex must be ≥ 0.57.
|
||||
# Check with ` + "`codex --version`" + `; upgrade via npm install -g @openai/codex@latest.
|
||||
# • TOML parse error after re-running setup — TOML rejects duplicate
|
||||
# [mcp_servers.molecule] tables. Open ~/.codex/config.toml and
|
||||
# remove the old block before pasting the new one.
|
||||
# • Canvas messages don't wake codex — step 3 (codex-channel-molecule
|
||||
# bridge daemon) is required for inbound push. Check
|
||||
# pgrep -f codex-channel-molecule and tail ~/.codex-channel-molecule/daemon.log.
|
||||
`
|
||||
|
||||
// externalOpenClawTemplate — for operators whose external agent is an
|
||||
@@ -471,4 +531,13 @@ disown
|
||||
|
||||
# 5. Run an agent turn — molecule tools are now available:
|
||||
openclaw agent --message "list my peers"
|
||||
|
||||
# Need help?
|
||||
# Documentation: https://doc.moleculesai.app/docs/guides/mcp-server-setup
|
||||
# Common errors:
|
||||
# • Gateway not starting — tail ~/.openclaw/gateway.log. The loopback
|
||||
# bind requires :18789 to be free; check with ` + "`lsof -iTCP:18789`" + `.
|
||||
# • ` + "`openclaw mcp set`" + ` rejected — the heredoc generates JSON;
|
||||
# verify with ` + "`jq < ~/.openclaw/mcp/molecule.json`" + ` and re-run
|
||||
# ` + "`openclaw mcp set`" + ` if the file is malformed.
|
||||
`
|
||||
|
||||
@@ -11,18 +11,21 @@ import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"errors"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// newMCPHandler is a test helper that constructs an MCPHandler backed by the
|
||||
// sqlmock DB set up by setupTestDB.
|
||||
// sqlmock DB set up by setupTestDB. Uses newTestBroadcaster so handlers
|
||||
// that BroadcastOnly (send_message_to_user, etc.) don't nil-panic on the
|
||||
// hub — events.NewBroadcaster(nil) crashes inside hub.Broadcast.
|
||||
func newMCPHandler(t *testing.T) (*MCPHandler, sqlmock.Sqlmock) {
|
||||
t.Helper()
|
||||
mock := setupTestDB(t)
|
||||
h := NewMCPHandler(db.DB, events.NewBroadcaster(nil))
|
||||
h := NewMCPHandler(db.DB, newTestBroadcaster())
|
||||
return h, mock
|
||||
}
|
||||
|
||||
@@ -628,6 +631,170 @@ func TestMCPHandler_SendMessageToUser_Blocked_WhenEnvNotSet(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s pins the
|
||||
// "best-effort persistence" contract: when the activity_log INSERT
|
||||
// fails (DB hiccup, constraint violation, transient connection drop),
|
||||
// the tool MUST still return success to the agent because the WS
|
||||
// broadcast already succeeded — the user has seen the message.
|
||||
//
|
||||
// This matches /notify (activity.go) behavior. Returning an error
|
||||
// here would cause the agent to retry and re-broadcast, double-
|
||||
// rendering the message in the user's live chat panel for every
|
||||
// retry until the DB recovers.
|
||||
func TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s(t *testing.T) {
|
||||
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
|
||||
h, mock := newMCPHandler(t)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-err").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||
|
||||
// INSERT fails — must NOT abort the tool response.
|
||||
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
|
||||
WillReturnError(errors.New("transient db error"))
|
||||
|
||||
w := mcpPost(t, h, "ws-err", map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 100,
|
||||
"method": "tools/call",
|
||||
"params": map[string]interface{}{
|
||||
"name": "send_message_to_user",
|
||||
"arguments": map[string]interface{}{
|
||||
"message": "should not be lost from the live chat",
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
var resp mcpResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response was not valid JSON-RPC: %v", err)
|
||||
}
|
||||
// Tool response is success — INSERT failure logged, broadcast
|
||||
// already succeeded.
|
||||
if resp.Error != nil {
|
||||
t.Errorf("tool response should be success on DB error (broadcast won), got JSON-RPC error: %+v", resp.Error)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("expected DB calls in order: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMCPHandler_SendMessageToUser_ResponseBodyShape pins the
|
||||
// response_body JSON shape stored in activity_logs. This shape MUST
|
||||
// match what the canvas hydrater (extractResponseText in
|
||||
// historyHydration.ts) reads — specifically `{"result": "<text>"}`.
|
||||
// Any drift in the JSON shape silently breaks chat history without
|
||||
// failing the INSERT.
|
||||
//
|
||||
// Caught the same drift class flagged in
|
||||
// feedback_assert_exact_not_substring.md: a substring match on
|
||||
// "result" would pass even if the field were renamed; we assert the
|
||||
// exact JSON shape.
|
||||
func TestMCPHandler_SendMessageToUser_ResponseBodyShape(t *testing.T) {
|
||||
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
|
||||
h, mock := newMCPHandler(t)
|
||||
|
||||
const userMessage = "Hi there from the agent"
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-shape").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||
|
||||
// Capture the response_body argument and assert its exact shape.
|
||||
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
|
||||
WithArgs(
|
||||
"ws-shape",
|
||||
sqlmock.AnyArg(), // summary
|
||||
// The response_body MUST be JSON `{"result": "<message>"}`.
|
||||
// Any other shape (e.g., wrapping in a Task object) breaks
|
||||
// the canvas hydrater's `body.result` extractor.
|
||||
`{"result":"`+userMessage+`"}`,
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
w := mcpPost(t, h, "ws-shape", map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 101,
|
||||
"method": "tools/call",
|
||||
"params": map[string]interface{}{
|
||||
"name": "send_message_to_user",
|
||||
"arguments": map[string]interface{}{
|
||||
"message": userMessage,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
if w.Code != 200 {
|
||||
t.Fatalf("expected 200, got %d body=%s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("response_body shape drift — would silently break canvas chat history: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMCPHandler_SendMessageToUser_PersistsToActivityLog pins the fix
|
||||
// for the reno-stars / CEO Ryan PC chat-history data-loss bug:
|
||||
// external claude-code agents using molecule-mcp's send_message_to_user
|
||||
// tool route through THIS handler (not the HTTP /notify endpoint),
|
||||
// and the handler used to broadcast WS only — visible live, gone on
|
||||
// reload because nothing wrote to activity_logs.
|
||||
//
|
||||
// Pins:
|
||||
// - INSERT happens on the success path (broadcast + DB write).
|
||||
// - INSERT shape mirrors the HTTP /notify handler exactly:
|
||||
// activity_type='a2a_receive', method='notify', request_body NULL,
|
||||
// response_body={"result": message}, status='ok'. The canvas
|
||||
// hydration query (`type=a2a_receive&source=canvas`) treats
|
||||
// both writers as the same shape — drift here means the bug
|
||||
// re-surfaces silently.
|
||||
func TestMCPHandler_SendMessageToUser_PersistsToActivityLog(t *testing.T) {
|
||||
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
|
||||
h, mock := newMCPHandler(t)
|
||||
|
||||
// Workspace lookup — the handler verifies the workspace exists
|
||||
// before it does anything else. Returning a name lets the
|
||||
// broadcast payload populate; the test doesn't assert on the
|
||||
// broadcast (no observable WS in this fake), only on the DB.
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-msg").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||
|
||||
// The persistence INSERT — pin the exact shape so a future
|
||||
// refactor that switches columns or drops `method='notify'`
|
||||
// breaks the test loud, not silently. Match by regex on the
|
||||
// table + activity_type + method literals.
|
||||
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
|
||||
WithArgs(
|
||||
"ws-msg",
|
||||
sqlmock.AnyArg(), // summary "Agent message: ..."
|
||||
sqlmock.AnyArg(), // response_body JSON
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
w := mcpPost(t, h, "ws-msg", map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 99,
|
||||
"method": "tools/call",
|
||||
"params": map[string]interface{}{
|
||||
"name": "send_message_to_user",
|
||||
"arguments": map[string]interface{}{
|
||||
"message": "Hello, this should persist!",
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
var resp mcpResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response was not valid JSON-RPC: %v\nbody=%s", err, w.Body.String())
|
||||
}
|
||||
if resp.Error != nil {
|
||||
t.Errorf("unexpected JSON-RPC error: %+v", resp.Error)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("DB expectations not met (INSERT missing → reno-stars data-loss regression): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Parse error
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -344,6 +344,43 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri
|
||||
"name": wsName,
|
||||
})
|
||||
|
||||
// Persist to activity_logs so chat history loaders surface this
|
||||
// message after a page reload. Pre-fix (reno-stars 2026-05-05),
|
||||
// the MCP-bridge variant of send_message_to_user broadcast WS
|
||||
// only — visible live, gone on reload — while the HTTP /notify
|
||||
// sibling already had this fix (activity.go:535). External
|
||||
// claude-code agents using molecule-mcp's send_message_to_user
|
||||
// tool route through THIS handler, not /notify, so they were
|
||||
// hitting the unfixed path.
|
||||
//
|
||||
// Shape mirrors activity.go's Notify handler exactly so the
|
||||
// canvas chat-history hydration treats both the same:
|
||||
// - activity_type='a2a_receive' joins the source=canvas filter
|
||||
// - source_id is omitted → defaults to NULL ("canvas-source")
|
||||
// - method='notify' tags it as a push (vs a real A2A receive)
|
||||
// - request_body=NULL so the loader doesn't draw a duplicate
|
||||
// "user" bubble
|
||||
// - response_body={"result": "<text>"} feeds extractResponseText
|
||||
// directly
|
||||
//
|
||||
// Errors are log-only — the broadcast already returned ok to the
|
||||
// caller, the user has seen the message, and the persistence
|
||||
// failure mode (DB hiccup) shouldn't block the tool call. The
|
||||
// downside is the same as pre-fix: message vanishes on reload
|
||||
// after a transient DB error. Log it so operators have a signal.
|
||||
respPayload := map[string]interface{}{"result": message}
|
||||
respJSON, _ := json.Marshal(respPayload)
|
||||
preview := message
|
||||
if len(preview) > 80 {
|
||||
preview = preview[:80] + "…"
|
||||
}
|
||||
if _, err := h.database.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status)
|
||||
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
|
||||
`, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil {
|
||||
log.Printf("MCP send_message_to_user: failed to persist for %s: %v", workspaceID, err)
|
||||
}
|
||||
|
||||
return "Message sent.", nil
|
||||
}
|
||||
|
||||
|
||||
@@ -425,7 +425,16 @@ def _build_initialize_result() -> dict:
|
||||
"tools": {"listChanged": False},
|
||||
"experimental": {"claude/channel": {}},
|
||||
},
|
||||
"serverInfo": {"name": "a2a-delegation", "version": "1.0.0"},
|
||||
# Identifier convention: this server is what users register with
|
||||
# `claude mcp add molecule -- molecule-mcp` (and similar across
|
||||
# other MCP hosts), so the canonical name is "molecule". Earlier
|
||||
# versions reported "a2a-delegation" — accurate to the original
|
||||
# purpose but a mismatch with how operators actually name it.
|
||||
# Mismatch is harmless on tool routing (all MCP hosts dispatch
|
||||
# by the user-supplied registration name, NOT serverInfo.name)
|
||||
# but matters for any future Claude Code allowlist that gates
|
||||
# channel push by hardcoded server name (issue #2934).
|
||||
"serverInfo": {"name": "molecule", "version": "1.0.0"},
|
||||
# Built per-call (not the module-level constant) so an operator
|
||||
# who sets MOLECULE_MCP_POLL_TIMEOUT_SECS after import — e.g.
|
||||
# via a wrapper script that exports then re-imports — sees
|
||||
|
||||
+24
-410
@@ -129,200 +129,19 @@ from a2a_tools_delegation import ( # noqa: E402 (import after the from-a2a_cli
|
||||
)
|
||||
|
||||
|
||||
async def _upload_chat_files(
|
||||
client: httpx.AsyncClient,
|
||||
paths: list[str],
|
||||
workspace_id: str | None = None,
|
||||
) -> tuple[list[dict], str | None]:
|
||||
"""Upload local file paths through /workspaces/<self>/chat/uploads.
|
||||
|
||||
The platform stages each upload under /workspace/.molecule/chat-uploads
|
||||
(an "allowed root" the canvas knows how to render via the Download
|
||||
endpoint) and returns metadata the broadcast payload references.
|
||||
|
||||
Why we route through upload instead of just passing the agent's path:
|
||||
the canvas's allowed-root list is /configs, /workspace, /home, /plugins
|
||||
— files at /tmp or /root would be unreachable. Uploading copies the
|
||||
bytes into an allowed root regardless of where the agent wrote them.
|
||||
|
||||
Returns (attachments, error). On any failure the caller should NOT
|
||||
fire the notify — partial-attach would surface a half-rendered chip.
|
||||
"""
|
||||
if not paths:
|
||||
return [], None
|
||||
files_payload: list[tuple[str, tuple[str, bytes, str]]] = []
|
||||
for p in paths:
|
||||
if not isinstance(p, str) or not p:
|
||||
return [], f"Error: invalid attachment path {p!r}"
|
||||
if not os.path.isfile(p):
|
||||
return [], f"Error: attachment not found: {p}"
|
||||
try:
|
||||
with open(p, "rb") as fh:
|
||||
data = fh.read()
|
||||
except OSError as e:
|
||||
return [], f"Error reading {p}: {e}"
|
||||
# Sniff mime from filename so the canvas can pick the right
|
||||
# icon / preview / inline-image renderer. Pre-fix this was
|
||||
# hardcoded application/octet-stream and chat_files.go's
|
||||
# Upload trusts whatever Content-Type the multipart part
|
||||
# carries — `mt := fh.Header.Get("Content-Type")` only falls
|
||||
# back to extension-sniffing when the header is empty. So a
|
||||
# hardcoded octet-stream meant every attachment lost its
|
||||
# real type forever, breaking the canvas chip's icon logic.
|
||||
mime_type, _ = mimetypes.guess_type(p)
|
||||
if not mime_type:
|
||||
mime_type = "application/octet-stream"
|
||||
files_payload.append(("files", (os.path.basename(p), data, mime_type)))
|
||||
target_workspace_id = (workspace_id or "").strip() or WORKSPACE_ID
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{PLATFORM_URL}/workspaces/{target_workspace_id}/chat/uploads",
|
||||
files=files_payload,
|
||||
headers=_auth_headers_for_heartbeat(target_workspace_id),
|
||||
)
|
||||
except Exception as e:
|
||||
return [], f"Error uploading attachments: {e}"
|
||||
if resp.status_code != 200:
|
||||
return [], f"Error: chat/uploads returned {resp.status_code}: {resp.text[:200]}"
|
||||
try:
|
||||
body = resp.json()
|
||||
except Exception as e:
|
||||
return [], f"Error parsing upload response: {e}"
|
||||
uploaded = body.get("files") or []
|
||||
if not isinstance(uploaded, list) or len(uploaded) != len(paths):
|
||||
return [], f"Error: upload returned {len(uploaded) if isinstance(uploaded, list) else 'invalid'} entries for {len(paths)} files"
|
||||
return uploaded, None
|
||||
|
||||
|
||||
async def tool_send_message_to_user(
|
||||
message: str,
|
||||
attachments: list[str] | None = None,
|
||||
workspace_id: str | None = None,
|
||||
) -> str:
|
||||
"""Send a message directly to the user's canvas chat via WebSocket.
|
||||
|
||||
Args:
|
||||
message: The text to display in the user's chat. Required even
|
||||
when sending attachments — set to a short caption like
|
||||
"Here's the build output:" or "Done — see attached."
|
||||
attachments: Optional list of absolute file paths inside this
|
||||
container. Each is uploaded to the platform and rendered
|
||||
in the canvas as a clickable download chip. Use this
|
||||
instead of pasting paths in the message text — paths
|
||||
render as plain text and the user can't click them.
|
||||
Examples:
|
||||
attachments=["/tmp/build-output.zip"]
|
||||
attachments=["/workspace/report.pdf", "/workspace/data.csv"]
|
||||
workspace_id: Optional. When the agent is registered in MULTIPLE
|
||||
workspaces (external multi-workspace MCP path), this
|
||||
selects which workspace's chat to deliver the message to —
|
||||
should match the ``arrival_workspace_id`` of the inbound
|
||||
message you're replying to so the user sees the reply in
|
||||
the same canvas they typed in. Single-workspace agents
|
||||
omit this; the message routes to the only registered
|
||||
workspace.
|
||||
"""
|
||||
if not message:
|
||||
return "Error: message is required"
|
||||
target_workspace_id = (workspace_id or "").strip() or WORKSPACE_ID
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=60.0) as client:
|
||||
uploaded, upload_err = await _upload_chat_files(
|
||||
client, attachments or [], workspace_id=target_workspace_id,
|
||||
)
|
||||
if upload_err:
|
||||
return upload_err
|
||||
payload: dict = {"message": message}
|
||||
if uploaded:
|
||||
payload["attachments"] = uploaded
|
||||
resp = await client.post(
|
||||
f"{PLATFORM_URL}/workspaces/{target_workspace_id}/notify",
|
||||
json=payload,
|
||||
headers=_auth_headers_for_heartbeat(target_workspace_id),
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
if uploaded:
|
||||
return f"Message sent to user with {len(uploaded)} attachment(s)"
|
||||
return "Message sent to user"
|
||||
return f"Error: platform returned {resp.status_code}"
|
||||
except Exception as e:
|
||||
return f"Error sending message: {e}"
|
||||
|
||||
|
||||
async def tool_list_peers(source_workspace_id: str | None = None) -> str:
|
||||
"""List all workspaces this agent can communicate with.
|
||||
|
||||
Behavior:
|
||||
- ``source_workspace_id`` set → list peers of that one workspace.
|
||||
- Unset, single-workspace mode → list peers of WORKSPACE_ID
|
||||
(the legacy path, unchanged).
|
||||
- Unset, multi-workspace mode (MOLECULE_WORKSPACES populated) →
|
||||
aggregate across every registered workspace, prefixing each
|
||||
peer with its source so the agent / user can see the full peer
|
||||
surface in one call.
|
||||
|
||||
Side-effect: populates ``_peer_to_source`` so subsequent
|
||||
``tool_delegate_task(target)`` auto-routes through the correct
|
||||
sending workspace without the agent needing ``source_workspace_id``.
|
||||
"""
|
||||
sources: list[str]
|
||||
aggregate = False
|
||||
if source_workspace_id:
|
||||
sources = [source_workspace_id]
|
||||
else:
|
||||
registered = list_registered_workspaces()
|
||||
if len(registered) > 1:
|
||||
sources = registered
|
||||
aggregate = True
|
||||
else:
|
||||
sources = [WORKSPACE_ID]
|
||||
|
||||
all_peers: list[tuple[str, dict]] = [] # (source, peer_record)
|
||||
diagnostics: list[tuple[str, str]] = [] # (source, diagnostic)
|
||||
for src in sources:
|
||||
peers, diagnostic = await get_peers_with_diagnostic(source_workspace_id=src)
|
||||
if peers:
|
||||
for p in peers:
|
||||
all_peers.append((src, p))
|
||||
elif diagnostic is not None:
|
||||
diagnostics.append((src, diagnostic))
|
||||
|
||||
if not all_peers:
|
||||
if diagnostics:
|
||||
joined = "; ".join(f"[{src[:8]}] {d}" for src, d in diagnostics)
|
||||
return f"No peers found. {joined}"
|
||||
return (
|
||||
"You have no peers in the platform registry. "
|
||||
"(No parent, no children, no siblings registered.)"
|
||||
)
|
||||
|
||||
lines = []
|
||||
for src, p in all_peers:
|
||||
status = p.get("status", "unknown")
|
||||
role = p.get("role", "")
|
||||
peer_id = p["id"]
|
||||
# Cache name for use in delegate_task
|
||||
_peer_names[peer_id] = p["name"]
|
||||
# Cache the source workspace so tool_delegate_task auto-routes
|
||||
_peer_to_source[peer_id] = src
|
||||
if aggregate:
|
||||
lines.append(
|
||||
f"- {p['name']} (ID: {peer_id}, status: {status}, role: {role}, via: {src[:8]})"
|
||||
)
|
||||
else:
|
||||
lines.append(f"- {p['name']} (ID: {peer_id}, status: {status}, role: {role})")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
async def tool_get_workspace_info(source_workspace_id: str | None = None) -> str:
|
||||
"""Get this workspace's own info.
|
||||
|
||||
``source_workspace_id`` selects which registered workspace to
|
||||
introspect when the agent is registered into multiple workspaces.
|
||||
Unset → falls back to module-level WORKSPACE_ID.
|
||||
"""
|
||||
info = await get_workspace_info(source_workspace_id=source_workspace_id)
|
||||
return json.dumps(info, indent=2)
|
||||
# Messaging tool handlers — extracted to a2a_tools_messaging
|
||||
# (RFC #2873 iter 4d). Re-imported here so call sites + tests that
|
||||
# reference ``a2a_tools.tool_send_message_to_user`` /
|
||||
# ``tool_list_peers`` / ``tool_get_workspace_info`` /
|
||||
# ``tool_chat_history`` / ``_upload_chat_files`` keep resolving
|
||||
# identically.
|
||||
from a2a_tools_messaging import ( # noqa: E402 (import after the top-of-module imports)
|
||||
_upload_chat_files,
|
||||
tool_chat_history,
|
||||
tool_get_workspace_info,
|
||||
tool_list_peers,
|
||||
tool_send_message_to_user,
|
||||
)
|
||||
|
||||
|
||||
# Memory tool handlers — extracted to a2a_tools_memory (RFC #2873 iter 4c).
|
||||
@@ -335,220 +154,15 @@ from a2a_tools_memory import ( # noqa: E402 (import after the top-of-module im
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Inbox tools — inbound delivery for the standalone molecule-mcp path.
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# The InboxState singleton is set by mcp_cli before the MCP server starts
|
||||
# (see workspace/inbox.py for the rationale). In-container runtimes never
|
||||
# call ``inbox.activate(...)``, so ``inbox.get_state()`` returns None and
|
||||
# these tools surface an informational error rather than raising.
|
||||
#
|
||||
# When-to-use guidance (mirrored in platform_tools/registry.py): agents
|
||||
# in standalone-runtime mode should call ``wait_for_message`` to block
|
||||
# on the next inbound message after they've emitted a reply, forming
|
||||
# the loop ``wait → respond → wait``. ``inbox_peek`` is for inspecting
|
||||
# the queue without consuming; ``inbox_pop`` removes a handled message.
|
||||
|
||||
_INBOX_NOT_ENABLED_MSG = (
|
||||
"Error: inbox polling is not enabled in this runtime. The standalone "
|
||||
"molecule-mcp wrapper activates it; in-container runtimes receive "
|
||||
"messages via push delivery and do not need these tools."
|
||||
# Inbox tool handlers — extracted to a2a_tools_inbox (RFC #2873 iter 4e).
|
||||
# Re-imported here so call sites + tests that reference
|
||||
# ``a2a_tools.tool_inbox_peek`` / ``tool_inbox_pop`` / ``tool_wait_for_message``
|
||||
# / ``_enrich_inbound_for_agent`` / ``_INBOX_NOT_ENABLED_MSG`` keep
|
||||
# resolving identically.
|
||||
from a2a_tools_inbox import ( # noqa: E402 (import after the top-of-module imports)
|
||||
_INBOX_NOT_ENABLED_MSG,
|
||||
_enrich_inbound_for_agent,
|
||||
tool_inbox_peek,
|
||||
tool_inbox_pop,
|
||||
tool_wait_for_message,
|
||||
)
|
||||
|
||||
|
||||
async def tool_chat_history(
|
||||
peer_id: str,
|
||||
limit: int = 20,
|
||||
before_ts: str = "",
|
||||
source_workspace_id: str | None = None,
|
||||
) -> str:
|
||||
"""Fetch the prior conversation with one peer.
|
||||
|
||||
Hits ``/workspaces/<self>/activity?peer_id=<peer>&limit=<N>``
|
||||
against the workspace-server, which returns activity rows where
|
||||
the peer is either the sender (``source_id=peer`` — they sent us
|
||||
the message) or the recipient (``target_id=peer`` — we sent to
|
||||
them) of an A2A turn — both sides of the conversation in
|
||||
chronological order.
|
||||
|
||||
Args:
|
||||
peer_id: The other workspace's UUID. Same value the agent
|
||||
sees as ``peer_id`` on a peer_agent push or ``workspace_id``
|
||||
on a delegate_task call.
|
||||
limit: Maximum rows to return; capped server-side at 500. The
|
||||
default of 20 covers \"most recent context for this peer\"
|
||||
without flooding the agent's context window.
|
||||
before_ts: Optional RFC3339 timestamp; only rows strictly
|
||||
older are returned. Used to page backward through long
|
||||
histories — pass the oldest ``ts`` from the previous
|
||||
response. Empty (default) returns the most recent ``limit``
|
||||
rows.
|
||||
source_workspace_id: Which registered workspace's activity log
|
||||
to query. Auto-routes via ``_peer_to_source`` cache when
|
||||
unset (the workspace this peer was discovered through);
|
||||
falls back to module-level WORKSPACE_ID for single-workspace
|
||||
operators.
|
||||
|
||||
Returns a JSON-encoded list of activity rows (or an error string
|
||||
starting with ``Error:`` so the agent can branch). Each row carries
|
||||
``activity_type``, ``source_id``, ``target_id``, ``method``,
|
||||
``summary``, ``request_body``, ``response_body``, ``status``,
|
||||
``created_at`` — same shape ``inbox_peek`` and the canvas chat
|
||||
loader already see.
|
||||
"""
|
||||
if not peer_id or not isinstance(peer_id, str):
|
||||
return "Error: peer_id is required"
|
||||
if not isinstance(limit, int) or limit <= 0:
|
||||
limit = 20
|
||||
if limit > 500:
|
||||
limit = 500
|
||||
|
||||
src = source_workspace_id or _peer_to_source.get(peer_id) or WORKSPACE_ID
|
||||
|
||||
params: dict[str, str] = {
|
||||
"peer_id": peer_id,
|
||||
"limit": str(limit),
|
||||
}
|
||||
# Forward verbatim — the server route validates as RFC3339 at the
|
||||
# trust boundary and translates into a `created_at < $X` clause.
|
||||
if before_ts:
|
||||
params["before_ts"] = before_ts
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
resp = await client.get(
|
||||
f"{PLATFORM_URL}/workspaces/{src}/activity",
|
||||
params=params,
|
||||
headers=_auth_headers_for_heartbeat(src),
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
return f"Error: chat_history request failed: {exc}"
|
||||
|
||||
if resp.status_code == 400:
|
||||
# Trust-boundary rejection (malformed peer_id, etc.) — surface
|
||||
# the server's reason verbatim so the agent can correct itself.
|
||||
try:
|
||||
err = resp.json().get("error", "bad request")
|
||||
except Exception: # noqa: BLE001
|
||||
err = "bad request"
|
||||
return f"Error: {err}"
|
||||
if resp.status_code >= 400:
|
||||
return f"Error: chat_history returned HTTP {resp.status_code}"
|
||||
|
||||
try:
|
||||
rows = resp.json()
|
||||
except Exception: # noqa: BLE001
|
||||
return "Error: chat_history response was not JSON"
|
||||
if not isinstance(rows, list):
|
||||
return "Error: chat_history response was not a list"
|
||||
|
||||
# Server returns DESC (most recent first); reverse to chronological
|
||||
# so the agent reads the conversation top-down like a chat log.
|
||||
rows.reverse()
|
||||
return json.dumps(rows)
|
||||
|
||||
|
||||
def _enrich_inbound_for_agent(d: dict) -> dict:
|
||||
"""Add peer_name / peer_role / agent_card_url to a poll-path message.
|
||||
|
||||
The PUSH path (a2a_mcp_server._build_channel_notification) already
|
||||
enriches the meta dict with these fields, so a Claude Code host
|
||||
with channel-push sees them. The POLL path goes through
|
||||
InboxMessage.to_dict, which is intentionally identity-free (the
|
||||
storage layer doesn't know about the registry cache). Without this
|
||||
helper, every non-Claude-Code MCP client that uses inbox_peek /
|
||||
wait_for_message gets a plain message and the receiving agent
|
||||
can't tell who's writing — breaking the contract documented in
|
||||
a2a_mcp_server.py:303-345 ("In both paths the same fields apply").
|
||||
|
||||
Cache-first non-blocking enrichment (same shape as push): on cache
|
||||
miss the helper returns the bare message; the next call within the
|
||||
5-min TTL hits the warm cache. Failure to enrich is non-fatal —
|
||||
the agent still gets text + peer_id + kind + activity_id, just
|
||||
without the friendly identity.
|
||||
"""
|
||||
peer_id = d.get("peer_id") or ""
|
||||
if not peer_id:
|
||||
# canvas_user — no peer to enrich; helper returns the plain
|
||||
# message unchanged so the canvas reply path still works.
|
||||
return d
|
||||
try:
|
||||
from a2a_client import ( # local import — avoid module-load cycle
|
||||
_agent_card_url_for,
|
||||
enrich_peer_metadata_nonblocking,
|
||||
)
|
||||
except Exception: # noqa: BLE001
|
||||
# If a2a_client is unavailable (test harness, partial install),
|
||||
# degrade gracefully — agent still gets the bare envelope.
|
||||
return d
|
||||
record = enrich_peer_metadata_nonblocking(peer_id)
|
||||
if record is not None:
|
||||
if name := record.get("name"):
|
||||
d["peer_name"] = name
|
||||
if role := record.get("role"):
|
||||
d["peer_role"] = role
|
||||
# agent_card_url is constructable from peer_id alone — surface it
|
||||
# even when registry enrichment misses, so the receiving agent has
|
||||
# a single endpoint to hit for the peer's full capability list.
|
||||
d["agent_card_url"] = _agent_card_url_for(peer_id)
|
||||
return d
|
||||
|
||||
|
||||
async def tool_inbox_peek(limit: int = 10) -> str:
|
||||
"""Return up to ``limit`` pending inbound messages without removing them."""
|
||||
import inbox # local import — avoids a circular dep at module load
|
||||
|
||||
state = inbox.get_state()
|
||||
if state is None:
|
||||
return _INBOX_NOT_ENABLED_MSG
|
||||
messages = state.peek(limit=limit if isinstance(limit, int) else 10)
|
||||
return json.dumps([_enrich_inbound_for_agent(m.to_dict()) for m in messages])
|
||||
|
||||
|
||||
async def tool_inbox_pop(activity_id: str) -> str:
|
||||
"""Remove a message from the inbox queue by activity_id."""
|
||||
import inbox
|
||||
|
||||
state = inbox.get_state()
|
||||
if state is None:
|
||||
return _INBOX_NOT_ENABLED_MSG
|
||||
if not isinstance(activity_id, str) or not activity_id:
|
||||
return "Error: activity_id is required."
|
||||
removed = state.pop(activity_id)
|
||||
if removed is None:
|
||||
return json.dumps({"removed": False, "activity_id": activity_id})
|
||||
return json.dumps({"removed": True, "activity_id": activity_id})
|
||||
|
||||
|
||||
async def tool_wait_for_message(timeout_secs: float = 60.0) -> str:
|
||||
"""Block until a new message arrives or ``timeout_secs`` elapses.
|
||||
|
||||
Returns the head message non-destructively; the agent decides
|
||||
whether to ``inbox_pop`` it after acting.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
import inbox
|
||||
|
||||
state = inbox.get_state()
|
||||
if state is None:
|
||||
return _INBOX_NOT_ENABLED_MSG
|
||||
|
||||
try:
|
||||
timeout = float(timeout_secs)
|
||||
except (TypeError, ValueError):
|
||||
timeout = 60.0
|
||||
# Cap at 300s — Claude Code's default tool timeout is ~10min, and
|
||||
# blocking longer than 5min wastes the prompt cache window for
|
||||
# nothing useful. Operators who want longer can call repeatedly.
|
||||
timeout = max(0.0, min(timeout, 300.0))
|
||||
|
||||
# The threading.Event-based wait would block the asyncio loop.
|
||||
# Run it on the default executor so the MCP server can keep
|
||||
# processing other JSON-RPC requests while we sleep.
|
||||
loop = asyncio.get_running_loop()
|
||||
message = await loop.run_in_executor(None, state.wait, timeout)
|
||||
if message is None:
|
||||
return json.dumps({"timeout": True, "timeout_secs": timeout})
|
||||
return json.dumps(_enrich_inbound_for_agent(message.to_dict()))
|
||||
|
||||
@@ -0,0 +1,140 @@
|
||||
"""Inbox tool handlers — single-concern slice of the a2a_tools surface.
|
||||
|
||||
Standalone-runtime path for inbound-message delivery (push-mode runtimes
|
||||
get messages via the channel-tag synthesis in a2a_mcp_server). The
|
||||
``InboxState`` singleton is set by ``mcp_cli`` before the MCP server
|
||||
starts; in-container runtimes never call ``inbox.activate(...)`` so
|
||||
``inbox.get_state()`` returns None and these tools surface an
|
||||
informational error instead of raising.
|
||||
|
||||
When-to-use guidance for agents (mirrored in
|
||||
``platform_tools/registry.py``):
|
||||
- ``wait_for_message``: block until a new inbound message arrives, then
|
||||
decide what to do with it; forms the loop ``wait → respond → wait``.
|
||||
- ``inbox_peek``: inspect the queue non-destructively.
|
||||
- ``inbox_pop``: remove a handled message by activity_id.
|
||||
|
||||
Extracted from ``a2a_tools.py`` in RFC #2873 iter 4e so the kitchen-sink
|
||||
module shrinks to a back-compat shim. The extraction also makes the
|
||||
``_enrich_inbound_for_agent`` helper unit-testable in isolation —
|
||||
previously it was buried in ``a2a_tools`` and only exercised through
|
||||
the inbox wrappers, leaving its peer-id-empty / cache-miss / registry-
|
||||
unavailable branches under-covered.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
|
||||
# Surfaced when the inbox subsystem is not initialised. Returned by the
|
||||
# three inbox tool wrappers below so the agent gets a clear "this
|
||||
# runtime delivers via push" message instead of a NameError.
|
||||
_INBOX_NOT_ENABLED_MSG = (
|
||||
"Error: inbox polling is not enabled in this runtime. The standalone "
|
||||
"molecule-mcp wrapper activates it; in-container runtimes receive "
|
||||
"messages via push delivery and do not need these tools."
|
||||
)
|
||||
|
||||
|
||||
def _enrich_inbound_for_agent(d: dict) -> dict:
|
||||
"""Add peer_name / peer_role / agent_card_url to a poll-path message.
|
||||
|
||||
The PUSH path (a2a_mcp_server._build_channel_notification) already
|
||||
enriches the meta dict with these fields, so a Claude Code host
|
||||
with channel-push sees them. The POLL path goes through
|
||||
InboxMessage.to_dict, which is intentionally identity-free (the
|
||||
storage layer doesn't know about the registry cache). Without this
|
||||
helper, every non-Claude-Code MCP client that uses inbox_peek /
|
||||
wait_for_message gets a plain message and the receiving agent
|
||||
can't tell who's writing — breaking the contract documented in
|
||||
a2a_mcp_server.py:303-345 ("In both paths the same fields apply").
|
||||
|
||||
Cache-first non-blocking enrichment (same shape as push): on cache
|
||||
miss the helper returns the bare message; the next call within the
|
||||
5-min TTL hits the warm cache. Failure to enrich is non-fatal —
|
||||
the agent still gets text + peer_id + kind + activity_id, just
|
||||
without the friendly identity.
|
||||
"""
|
||||
peer_id = d.get("peer_id") or ""
|
||||
if not peer_id:
|
||||
# canvas_user — no peer to enrich; helper returns the plain
|
||||
# message unchanged so the canvas reply path still works.
|
||||
return d
|
||||
try:
|
||||
from a2a_client import ( # local import — avoid module-load cycle
|
||||
_agent_card_url_for,
|
||||
enrich_peer_metadata_nonblocking,
|
||||
)
|
||||
except Exception: # noqa: BLE001
|
||||
# If a2a_client is unavailable (test harness, partial install),
|
||||
# degrade gracefully — agent still gets the bare envelope.
|
||||
return d
|
||||
record = enrich_peer_metadata_nonblocking(peer_id)
|
||||
if record is not None:
|
||||
if name := record.get("name"):
|
||||
d["peer_name"] = name
|
||||
if role := record.get("role"):
|
||||
d["peer_role"] = role
|
||||
# agent_card_url is constructable from peer_id alone — surface it
|
||||
# even when registry enrichment misses, so the receiving agent has
|
||||
# a single endpoint to hit for the peer's full capability list.
|
||||
d["agent_card_url"] = _agent_card_url_for(peer_id)
|
||||
return d
|
||||
|
||||
|
||||
async def tool_inbox_peek(limit: int = 10) -> str:
|
||||
"""Return up to ``limit`` pending inbound messages without removing them."""
|
||||
import inbox # local import — avoids a circular dep at module load
|
||||
|
||||
state = inbox.get_state()
|
||||
if state is None:
|
||||
return _INBOX_NOT_ENABLED_MSG
|
||||
messages = state.peek(limit=limit if isinstance(limit, int) else 10)
|
||||
return json.dumps([_enrich_inbound_for_agent(m.to_dict()) for m in messages])
|
||||
|
||||
|
||||
async def tool_inbox_pop(activity_id: str) -> str:
|
||||
"""Remove a message from the inbox queue by activity_id."""
|
||||
import inbox
|
||||
|
||||
state = inbox.get_state()
|
||||
if state is None:
|
||||
return _INBOX_NOT_ENABLED_MSG
|
||||
if not isinstance(activity_id, str) or not activity_id:
|
||||
return "Error: activity_id is required."
|
||||
removed = state.pop(activity_id)
|
||||
if removed is None:
|
||||
return json.dumps({"removed": False, "activity_id": activity_id})
|
||||
return json.dumps({"removed": True, "activity_id": activity_id})
|
||||
|
||||
|
||||
async def tool_wait_for_message(timeout_secs: float = 60.0) -> str:
|
||||
"""Block until a new message arrives or ``timeout_secs`` elapses.
|
||||
|
||||
Returns the head message non-destructively; the agent decides
|
||||
whether to ``inbox_pop`` it after acting.
|
||||
"""
|
||||
import inbox
|
||||
|
||||
state = inbox.get_state()
|
||||
if state is None:
|
||||
return _INBOX_NOT_ENABLED_MSG
|
||||
|
||||
try:
|
||||
timeout = float(timeout_secs)
|
||||
except (TypeError, ValueError):
|
||||
timeout = 60.0
|
||||
# Cap at 300s — Claude Code's default tool timeout is ~10min, and
|
||||
# blocking longer than 5min wastes the prompt cache window for
|
||||
# nothing useful. Operators who want longer can call repeatedly.
|
||||
timeout = max(0.0, min(timeout, 300.0))
|
||||
|
||||
# The threading.Event-based wait would block the asyncio loop.
|
||||
# Run it on the default executor so the MCP server can keep
|
||||
# processing other JSON-RPC requests while we sleep.
|
||||
loop = asyncio.get_running_loop()
|
||||
message = await loop.run_in_executor(None, state.wait, timeout)
|
||||
if message is None:
|
||||
return json.dumps({"timeout": True, "timeout_secs": timeout})
|
||||
return json.dumps(_enrich_inbound_for_agent(message.to_dict()))
|
||||
@@ -0,0 +1,324 @@
|
||||
"""Messaging tool handlers — single-concern slice of the a2a_tools surface.
|
||||
|
||||
Extracted from ``a2a_tools.py`` (RFC #2873 iter 4d). Owns the four
|
||||
human-and-peer messaging MCP tools + the chat-upload helper they share:
|
||||
|
||||
* ``tool_send_message_to_user`` — push a canvas-chat message via the
|
||||
platform's ``/notify`` endpoint.
|
||||
* ``tool_list_peers`` — discover peers across one or many registered
|
||||
workspaces, with side-effect of populating ``_peer_to_source`` for
|
||||
delegate-task auto-routing.
|
||||
* ``tool_get_workspace_info`` — JSON-encode the workspace's own info.
|
||||
* ``tool_chat_history`` — fetch prior conversation rows with a peer.
|
||||
* ``_upload_chat_files`` — internal helper for the message-attachments
|
||||
code path; routes local file paths through the platform's
|
||||
``/chat/uploads`` so the canvas can render them as download chips.
|
||||
|
||||
Imports the auth-header primitive from ``a2a_tools_rbac`` (iter 4a).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import mimetypes
|
||||
import os
|
||||
|
||||
import httpx
|
||||
|
||||
from a2a_client import (
|
||||
PLATFORM_URL,
|
||||
WORKSPACE_ID,
|
||||
_peer_names,
|
||||
_peer_to_source,
|
||||
get_peers_with_diagnostic,
|
||||
get_workspace_info,
|
||||
)
|
||||
from a2a_tools_rbac import auth_headers_for_heartbeat as _auth_headers_for_heartbeat
|
||||
from platform_auth import list_registered_workspaces
|
||||
|
||||
|
||||
async def _upload_chat_files(
|
||||
client: httpx.AsyncClient,
|
||||
paths: list[str],
|
||||
workspace_id: str | None = None,
|
||||
) -> tuple[list[dict], str | None]:
|
||||
"""Upload local file paths through /workspaces/<self>/chat/uploads.
|
||||
|
||||
The platform stages each upload under /workspace/.molecule/chat-uploads
|
||||
(an "allowed root" the canvas knows how to render via the Download
|
||||
endpoint) and returns metadata the broadcast payload references.
|
||||
|
||||
Why we route through upload instead of just passing the agent's path:
|
||||
the canvas's allowed-root list is /configs, /workspace, /home, /plugins
|
||||
— files at /tmp or /root would be unreachable. Uploading copies the
|
||||
bytes into an allowed root regardless of where the agent wrote them.
|
||||
|
||||
Returns (attachments, error). On any failure the caller should NOT
|
||||
fire the notify — partial-attach would surface a half-rendered chip.
|
||||
"""
|
||||
if not paths:
|
||||
return [], None
|
||||
files_payload: list[tuple[str, tuple[str, bytes, str]]] = []
|
||||
for p in paths:
|
||||
if not isinstance(p, str) or not p:
|
||||
return [], f"Error: invalid attachment path {p!r}"
|
||||
if not os.path.isfile(p):
|
||||
return [], f"Error: attachment not found: {p}"
|
||||
try:
|
||||
with open(p, "rb") as fh:
|
||||
data = fh.read()
|
||||
except OSError as e:
|
||||
return [], f"Error reading {p}: {e}"
|
||||
# Sniff mime from filename so the canvas can pick the right
|
||||
# icon / preview / inline-image renderer. Pre-fix this was
|
||||
# hardcoded application/octet-stream and chat_files.go's
|
||||
# Upload trusts whatever Content-Type the multipart part
|
||||
# carries — `mt := fh.Header.Get("Content-Type")` only falls
|
||||
# back to extension-sniffing when the header is empty. So a
|
||||
# hardcoded octet-stream meant every attachment lost its
|
||||
# real type forever, breaking the canvas chip's icon logic.
|
||||
mime_type, _ = mimetypes.guess_type(p)
|
||||
if not mime_type:
|
||||
mime_type = "application/octet-stream"
|
||||
files_payload.append(("files", (os.path.basename(p), data, mime_type)))
|
||||
target_workspace_id = (workspace_id or "").strip() or WORKSPACE_ID
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{PLATFORM_URL}/workspaces/{target_workspace_id}/chat/uploads",
|
||||
files=files_payload,
|
||||
headers=_auth_headers_for_heartbeat(target_workspace_id),
|
||||
)
|
||||
except Exception as e:
|
||||
return [], f"Error uploading attachments: {e}"
|
||||
if resp.status_code != 200:
|
||||
return [], f"Error: chat/uploads returned {resp.status_code}: {resp.text[:200]}"
|
||||
try:
|
||||
body = resp.json()
|
||||
except Exception as e:
|
||||
return [], f"Error parsing upload response: {e}"
|
||||
uploaded = body.get("files") or []
|
||||
if not isinstance(uploaded, list) or len(uploaded) != len(paths):
|
||||
return [], f"Error: upload returned {len(uploaded) if isinstance(uploaded, list) else 'invalid'} entries for {len(paths)} files"
|
||||
return uploaded, None
|
||||
|
||||
|
||||
async def tool_send_message_to_user(
|
||||
message: str,
|
||||
attachments: list[str] | None = None,
|
||||
workspace_id: str | None = None,
|
||||
) -> str:
|
||||
"""Send a message directly to the user's canvas chat via WebSocket.
|
||||
|
||||
Args:
|
||||
message: The text to display in the user's chat. Required even
|
||||
when sending attachments — set to a short caption like
|
||||
"Here's the build output:" or "Done — see attached."
|
||||
attachments: Optional list of absolute file paths inside this
|
||||
container. Each is uploaded to the platform and rendered
|
||||
in the canvas as a clickable download chip. Use this
|
||||
instead of pasting paths in the message text — paths
|
||||
render as plain text and the user can't click them.
|
||||
Examples:
|
||||
attachments=["/tmp/build-output.zip"]
|
||||
attachments=["/workspace/report.pdf", "/workspace/data.csv"]
|
||||
workspace_id: Optional. When the agent is registered in MULTIPLE
|
||||
workspaces (external multi-workspace MCP path), this
|
||||
selects which workspace's chat to deliver the message to —
|
||||
should match the ``arrival_workspace_id`` of the inbound
|
||||
message you're replying to so the user sees the reply in
|
||||
the same canvas they typed in. Single-workspace agents
|
||||
omit this; the message routes to the only registered
|
||||
workspace.
|
||||
"""
|
||||
if not message:
|
||||
return "Error: message is required"
|
||||
target_workspace_id = (workspace_id or "").strip() or WORKSPACE_ID
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=60.0) as client:
|
||||
uploaded, upload_err = await _upload_chat_files(
|
||||
client, attachments or [], workspace_id=target_workspace_id,
|
||||
)
|
||||
if upload_err:
|
||||
return upload_err
|
||||
payload: dict = {"message": message}
|
||||
if uploaded:
|
||||
payload["attachments"] = uploaded
|
||||
resp = await client.post(
|
||||
f"{PLATFORM_URL}/workspaces/{target_workspace_id}/notify",
|
||||
json=payload,
|
||||
headers=_auth_headers_for_heartbeat(target_workspace_id),
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
if uploaded:
|
||||
return f"Message sent to user with {len(uploaded)} attachment(s)"
|
||||
return "Message sent to user"
|
||||
return f"Error: platform returned {resp.status_code}"
|
||||
except Exception as e:
|
||||
return f"Error sending message: {e}"
|
||||
|
||||
|
||||
async def tool_list_peers(source_workspace_id: str | None = None) -> str:
|
||||
"""List all workspaces this agent can communicate with.
|
||||
|
||||
Behavior:
|
||||
- ``source_workspace_id`` set → list peers of that one workspace.
|
||||
- Unset, single-workspace mode → list peers of WORKSPACE_ID
|
||||
(the legacy path, unchanged).
|
||||
- Unset, multi-workspace mode (MOLECULE_WORKSPACES populated) →
|
||||
aggregate across every registered workspace, prefixing each
|
||||
peer with its source so the agent / user can see the full peer
|
||||
surface in one call.
|
||||
|
||||
Side-effect: populates ``_peer_to_source`` so subsequent
|
||||
``tool_delegate_task(target)`` auto-routes through the correct
|
||||
sending workspace without the agent needing ``source_workspace_id``.
|
||||
"""
|
||||
sources: list[str]
|
||||
aggregate = False
|
||||
if source_workspace_id:
|
||||
sources = [source_workspace_id]
|
||||
else:
|
||||
registered = list_registered_workspaces()
|
||||
if len(registered) > 1:
|
||||
sources = registered
|
||||
aggregate = True
|
||||
else:
|
||||
sources = [WORKSPACE_ID]
|
||||
|
||||
all_peers: list[tuple[str, dict]] = [] # (source, peer_record)
|
||||
diagnostics: list[tuple[str, str]] = [] # (source, diagnostic)
|
||||
for src in sources:
|
||||
peers, diagnostic = await get_peers_with_diagnostic(source_workspace_id=src)
|
||||
if peers:
|
||||
for p in peers:
|
||||
all_peers.append((src, p))
|
||||
elif diagnostic is not None:
|
||||
diagnostics.append((src, diagnostic))
|
||||
|
||||
if not all_peers:
|
||||
if diagnostics:
|
||||
joined = "; ".join(f"[{src[:8]}] {d}" for src, d in diagnostics)
|
||||
return f"No peers found. {joined}"
|
||||
return (
|
||||
"You have no peers in the platform registry. "
|
||||
"(No parent, no children, no siblings registered.)"
|
||||
)
|
||||
|
||||
lines = []
|
||||
for src, p in all_peers:
|
||||
status = p.get("status", "unknown")
|
||||
role = p.get("role", "")
|
||||
peer_id = p["id"]
|
||||
# Cache name for use in delegate_task
|
||||
_peer_names[peer_id] = p["name"]
|
||||
# Cache the source workspace so tool_delegate_task auto-routes
|
||||
_peer_to_source[peer_id] = src
|
||||
if aggregate:
|
||||
lines.append(
|
||||
f"- {p['name']} (ID: {peer_id}, status: {status}, role: {role}, via: {src[:8]})"
|
||||
)
|
||||
else:
|
||||
lines.append(f"- {p['name']} (ID: {peer_id}, status: {status}, role: {role})")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
async def tool_get_workspace_info(source_workspace_id: str | None = None) -> str:
|
||||
"""Get this workspace's own info.
|
||||
|
||||
``source_workspace_id`` selects which registered workspace to
|
||||
introspect when the agent is registered into multiple workspaces.
|
||||
Unset → falls back to module-level WORKSPACE_ID.
|
||||
"""
|
||||
info = await get_workspace_info(source_workspace_id=source_workspace_id)
|
||||
return json.dumps(info, indent=2)
|
||||
|
||||
|
||||
async def tool_chat_history(
|
||||
peer_id: str,
|
||||
limit: int = 20,
|
||||
before_ts: str = "",
|
||||
source_workspace_id: str | None = None,
|
||||
) -> str:
|
||||
"""Fetch the prior conversation with one peer.
|
||||
|
||||
Hits ``/workspaces/<self>/activity?peer_id=<peer>&limit=<N>``
|
||||
against the workspace-server, which returns activity rows where
|
||||
the peer is either the sender (``source_id=peer`` — they sent us
|
||||
the message) or the recipient (``target_id=peer`` — we sent to
|
||||
them) of an A2A turn — both sides of the conversation in
|
||||
chronological order.
|
||||
|
||||
Args:
|
||||
peer_id: The other workspace's UUID. Same value the agent
|
||||
sees as ``peer_id`` on a peer_agent push or ``workspace_id``
|
||||
on a delegate_task call.
|
||||
limit: Maximum rows to return; capped server-side at 500. The
|
||||
default of 20 covers "most recent context for this peer"
|
||||
without flooding the agent's context window.
|
||||
before_ts: Optional RFC3339 timestamp; only rows strictly
|
||||
older are returned. Used to page backward through long
|
||||
histories — pass the oldest ``ts`` from the previous
|
||||
response. Empty (default) returns the most recent ``limit``
|
||||
rows.
|
||||
source_workspace_id: Which registered workspace's activity log
|
||||
to query. Auto-routes via ``_peer_to_source`` cache when
|
||||
unset (the workspace this peer was discovered through);
|
||||
falls back to module-level WORKSPACE_ID for single-workspace
|
||||
operators.
|
||||
|
||||
Returns a JSON-encoded list of activity rows (or an error string
|
||||
starting with ``Error:`` so the agent can branch). Each row carries
|
||||
``activity_type``, ``source_id``, ``target_id``, ``method``,
|
||||
``summary``, ``request_body``, ``response_body``, ``status``,
|
||||
``created_at`` — same shape ``inbox_peek`` and the canvas chat
|
||||
loader already see.
|
||||
"""
|
||||
if not peer_id or not isinstance(peer_id, str):
|
||||
return "Error: peer_id is required"
|
||||
if not isinstance(limit, int) or limit <= 0:
|
||||
limit = 20
|
||||
if limit > 500:
|
||||
limit = 500
|
||||
|
||||
src = source_workspace_id or _peer_to_source.get(peer_id) or WORKSPACE_ID
|
||||
|
||||
params: dict[str, str] = {
|
||||
"peer_id": peer_id,
|
||||
"limit": str(limit),
|
||||
}
|
||||
# Forward verbatim — the server route validates as RFC3339 at the
|
||||
# trust boundary and translates into a `created_at < $X` clause.
|
||||
if before_ts:
|
||||
params["before_ts"] = before_ts
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
resp = await client.get(
|
||||
f"{PLATFORM_URL}/workspaces/{src}/activity",
|
||||
params=params,
|
||||
headers=_auth_headers_for_heartbeat(src),
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
return f"Error: chat_history request failed: {exc}"
|
||||
|
||||
if resp.status_code == 400:
|
||||
# Trust-boundary rejection (malformed peer_id, etc.) — surface
|
||||
# the server's reason verbatim so the agent can correct itself.
|
||||
try:
|
||||
err = resp.json().get("error", "bad request")
|
||||
except Exception: # noqa: BLE001
|
||||
err = "bad request"
|
||||
return f"Error: {err}"
|
||||
if resp.status_code >= 400:
|
||||
return f"Error: chat_history returned HTTP {resp.status_code}"
|
||||
|
||||
try:
|
||||
rows = resp.json()
|
||||
except Exception: # noqa: BLE001
|
||||
return "Error: chat_history response was not JSON"
|
||||
if not isinstance(rows, list):
|
||||
return "Error: chat_history response was not a list"
|
||||
|
||||
# Server returns DESC (most recent first); reverse to chronological
|
||||
# so the agent reads the conversation top-down like a chat log.
|
||||
rows.reverse()
|
||||
return json.dumps(rows)
|
||||
@@ -35,9 +35,15 @@ def resolve_workspaces() -> tuple[list[tuple[str, str]], list[str]]:
|
||||
N workspaces). When set, ``WORKSPACE_ID`` / ``MOLECULE_WORKSPACE_TOKEN``
|
||||
are IGNORED — the JSON is the source of truth.
|
||||
|
||||
2. Single-workspace fallback — ``WORKSPACE_ID`` env var + token from
|
||||
``MOLECULE_WORKSPACE_TOKEN`` or ``${CONFIGS_DIR}/.auth_token``.
|
||||
This is the pre-existing path; back-compat exact.
|
||||
2. Single-workspace fallback — ``WORKSPACE_ID`` env var + token
|
||||
resolved in this order:
|
||||
a. ``MOLECULE_WORKSPACE_TOKEN`` (inline env — convenient but
|
||||
leaks into shell history + plaintext MCP-host config).
|
||||
b. ``MOLECULE_WORKSPACE_TOKEN_FILE`` (path to a file holding
|
||||
the token — operator can keep it 0600 in their home dir;
|
||||
survives shell-history scrubs).
|
||||
c. ``${CONFIGS_DIR}/.auth_token`` (in-container runtimes —
|
||||
the platform writes this on provision).
|
||||
|
||||
Returns ``(workspaces, errors)``:
|
||||
* ``workspaces``: list of ``(workspace_id, token)`` — non-empty
|
||||
@@ -98,16 +104,47 @@ def resolve_workspaces() -> tuple[list[tuple[str, str]], list[str]]:
|
||||
wsid = os.environ.get("WORKSPACE_ID", "").strip()
|
||||
if not wsid:
|
||||
return [], ["WORKSPACE_ID (or MOLECULE_WORKSPACES) is required"]
|
||||
# Token resolution order (#2934): inline env → file path → CONFIGS_DIR
|
||||
# default. The file-path option exists so operators can keep the
|
||||
# bearer out of shell history and out of MCP-host config plaintext
|
||||
# (e.g. ~/.claude.json) — set MOLECULE_WORKSPACE_TOKEN_FILE to a
|
||||
# 0600 file containing the token. The CONFIGS_DIR/.auth_token
|
||||
# fallback predates this and stays for in-container runtimes.
|
||||
tok = os.environ.get("MOLECULE_WORKSPACE_TOKEN", "").strip()
|
||||
if not tok:
|
||||
tok = _read_token_from_file_env()
|
||||
if not tok:
|
||||
tok = read_token_file()
|
||||
if not tok:
|
||||
return [], [
|
||||
"MOLECULE_WORKSPACE_TOKEN (or CONFIGS_DIR/.auth_token) is required"
|
||||
"MOLECULE_WORKSPACE_TOKEN, MOLECULE_WORKSPACE_TOKEN_FILE, or "
|
||||
"CONFIGS_DIR/.auth_token is required"
|
||||
]
|
||||
return [(wsid, tok)], []
|
||||
|
||||
|
||||
def _read_token_from_file_env() -> str:
|
||||
"""Read the token from the file path in MOLECULE_WORKSPACE_TOKEN_FILE.
|
||||
|
||||
Returns "" on:
|
||||
- env var unset / blank
|
||||
- file not found, unreadable, or empty
|
||||
- any OSError on read
|
||||
|
||||
Empty-on-failure (rather than raising) lets the resolver fall through
|
||||
to the CONFIGS_DIR fallback. The caller surfaces the combined "no
|
||||
token" error if every source is empty.
|
||||
"""
|
||||
path = os.environ.get("MOLECULE_WORKSPACE_TOKEN_FILE", "").strip()
|
||||
if not path:
|
||||
return ""
|
||||
try:
|
||||
with open(path, encoding="utf-8") as fh:
|
||||
return fh.read().strip()
|
||||
except OSError:
|
||||
return ""
|
||||
|
||||
|
||||
def print_missing_env_help(missing: list[str], have_token_file: bool) -> None:
|
||||
print("molecule-mcp: missing required environment.\n", file=sys.stderr)
|
||||
print("Set the following before running molecule-mcp:", file=sys.stderr)
|
||||
@@ -123,6 +160,16 @@ def print_missing_env_help(missing: list[str], have_token_file: bool) -> None:
|
||||
"(canvas → Tokens tab)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
print(
|
||||
" OR set MOLECULE_WORKSPACE_TOKEN_FILE"
|
||||
" to a path that holds the token",
|
||||
file=sys.stderr,
|
||||
)
|
||||
print(
|
||||
" (keeps the secret out of shell"
|
||||
" history and MCP-host config plaintext)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
print("", file=sys.stderr)
|
||||
print(f"Currently missing: {', '.join(missing)}", file=sys.stderr)
|
||||
|
||||
|
||||
@@ -241,7 +241,7 @@ class TestToolListPeersAggregation:
|
||||
return [{"id": "2222bbbb-2222-2222-2222-222222222222", "name": "bob", "status": "online", "role": "dev"}], None
|
||||
return [], None
|
||||
|
||||
with patch("a2a_tools.get_peers_with_diagnostic", side_effect=fake_get_peers):
|
||||
with patch("a2a_tools_messaging.get_peers_with_diagnostic", side_effect=fake_get_peers):
|
||||
output = await a2a_tools.tool_list_peers()
|
||||
|
||||
assert "alice" in output
|
||||
@@ -263,7 +263,7 @@ class TestToolListPeersAggregation:
|
||||
assert source_workspace_id == a2a_client.WORKSPACE_ID
|
||||
return [{"id": "1111aaaa-1111-1111-1111-111111111111", "name": "alice", "status": "online", "role": "ops"}], None
|
||||
|
||||
with patch("a2a_tools.get_peers_with_diagnostic", side_effect=fake_get_peers):
|
||||
with patch("a2a_tools_messaging.get_peers_with_diagnostic", side_effect=fake_get_peers):
|
||||
output = await a2a_tools.tool_list_peers()
|
||||
|
||||
assert "alice" in output
|
||||
@@ -286,7 +286,7 @@ class TestToolListPeersAggregation:
|
||||
seen.append(source_workspace_id)
|
||||
return [{"id": "1111aaaa-1111-1111-1111-111111111111", "name": "alice", "status": "online", "role": "ops"}], None
|
||||
|
||||
with patch("a2a_tools.get_peers_with_diagnostic", side_effect=fake_get_peers):
|
||||
with patch("a2a_tools_messaging.get_peers_with_diagnostic", side_effect=fake_get_peers):
|
||||
output = await a2a_tools.tool_list_peers(source_workspace_id=ws_a)
|
||||
|
||||
assert seen == [ws_a]
|
||||
@@ -309,7 +309,7 @@ class TestToolListPeersAggregation:
|
||||
return [], "auth failed"
|
||||
return [], "platform 5xx"
|
||||
|
||||
with patch("a2a_tools.get_peers_with_diagnostic", side_effect=fake_get_peers):
|
||||
with patch("a2a_tools_messaging.get_peers_with_diagnostic", side_effect=fake_get_peers):
|
||||
out = await a2a_tools.tool_list_peers()
|
||||
|
||||
assert "[aaaa1111] auth failed" in out
|
||||
|
||||
@@ -453,14 +453,14 @@ class TestToolSendMessageToUser:
|
||||
async def test_success_200_returns_sent_message(self):
|
||||
import a2a_tools
|
||||
mc = _make_http_mock(post_resp=_resp(200, {}))
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.tool_send_message_to_user("Hello user!")
|
||||
assert result == "Message sent to user"
|
||||
|
||||
async def test_non_200_returns_status_code_in_error(self):
|
||||
import a2a_tools
|
||||
mc = _make_http_mock(post_resp=_resp(503, {}))
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.tool_send_message_to_user("Hello user!")
|
||||
assert "503" in result
|
||||
assert "Error" in result
|
||||
@@ -468,7 +468,7 @@ class TestToolSendMessageToUser:
|
||||
async def test_exception_returns_error_message(self):
|
||||
import a2a_tools
|
||||
mc = _make_http_mock(post_exc=RuntimeError("platform unreachable"))
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.tool_send_message_to_user("Hi!")
|
||||
assert "Error sending message" in result
|
||||
assert "platform unreachable" in result
|
||||
@@ -495,7 +495,7 @@ class TestToolSendMessageToUser:
|
||||
mc = _make_http_mock(post_resp=notify_resp)
|
||||
mc.post = AsyncMock(side_effect=[upload_resp, notify_resp])
|
||||
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.tool_send_message_to_user(
|
||||
"Done — see attached.",
|
||||
attachments=[str(f)],
|
||||
@@ -523,7 +523,7 @@ class TestToolSendMessageToUser:
|
||||
# with a half-rendered attachment chip.
|
||||
import a2a_tools
|
||||
mc = _make_http_mock()
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.tool_send_message_to_user(
|
||||
"Hi", attachments=["/no/such/file.zip"],
|
||||
)
|
||||
@@ -541,7 +541,7 @@ class TestToolSendMessageToUser:
|
||||
mc = _make_http_mock()
|
||||
mc.post = AsyncMock(return_value=upload_resp)
|
||||
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.tool_send_message_to_user(
|
||||
"Hi", attachments=[str(f)],
|
||||
)
|
||||
@@ -555,7 +555,7 @@ class TestToolSendMessageToUser:
|
||||
# an `attachments` field added to the notify body.
|
||||
import a2a_tools
|
||||
mc = _make_http_mock(post_resp=_resp(200, {}))
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
await a2a_tools.tool_send_message_to_user("plain text")
|
||||
body = mc.post.await_args.kwargs.get("json") or {}
|
||||
assert body == {"message": "plain text"}
|
||||
@@ -570,7 +570,7 @@ class TestToolListPeers:
|
||||
async def test_true_empty_returns_no_peers_message_without_diagnostic(self):
|
||||
"""200 + empty list → 'no peers in the platform registry' (no failure)."""
|
||||
import a2a_tools
|
||||
with patch("a2a_tools.get_peers_with_diagnostic", return_value=([], None)):
|
||||
with patch("a2a_tools_messaging.get_peers_with_diagnostic", return_value=([], None)):
|
||||
result = await a2a_tools.tool_list_peers()
|
||||
# The new wording explicitly says no peers exist (no parent/sibling/child).
|
||||
# Avoids the misleading "may be isolated" hint when discovery succeeded.
|
||||
@@ -582,7 +582,7 @@ class TestToolListPeers:
|
||||
"""401/403 → tool_list_peers must surface the auth failure + restart hint, not 'isolated'."""
|
||||
import a2a_tools
|
||||
diag = "Authentication to platform failed (HTTP 401). Restart the workspace to re-mint."
|
||||
with patch("a2a_tools.get_peers_with_diagnostic", return_value=([], diag)):
|
||||
with patch("a2a_tools_messaging.get_peers_with_diagnostic", return_value=([], diag)):
|
||||
result = await a2a_tools.tool_list_peers()
|
||||
assert "401" in result
|
||||
assert "Authentication" in result
|
||||
@@ -593,7 +593,7 @@ class TestToolListPeers:
|
||||
"""404 → tool_list_peers tells the user re-registration is needed."""
|
||||
import a2a_tools
|
||||
diag = "Workspace ID ws-test is not registered with the platform (HTTP 404). Re-register."
|
||||
with patch("a2a_tools.get_peers_with_diagnostic", return_value=([], diag)):
|
||||
with patch("a2a_tools_messaging.get_peers_with_diagnostic", return_value=([], diag)):
|
||||
result = await a2a_tools.tool_list_peers()
|
||||
assert "404" in result
|
||||
assert "registered" in result.lower()
|
||||
@@ -602,7 +602,7 @@ class TestToolListPeers:
|
||||
"""5xx → 'Platform error' surfaced; agent / user can correctly route to oncall."""
|
||||
import a2a_tools
|
||||
diag = "Platform error: HTTP 503."
|
||||
with patch("a2a_tools.get_peers_with_diagnostic", return_value=([], diag)):
|
||||
with patch("a2a_tools_messaging.get_peers_with_diagnostic", return_value=([], diag)):
|
||||
result = await a2a_tools.tool_list_peers()
|
||||
assert "503" in result
|
||||
assert "Platform error" in result
|
||||
@@ -611,7 +611,7 @@ class TestToolListPeers:
|
||||
"""Network error → operator can tell that the workspace can't reach the platform at all."""
|
||||
import a2a_tools
|
||||
diag = "Cannot reach platform at http://platform.example: timed out"
|
||||
with patch("a2a_tools.get_peers_with_diagnostic", return_value=([], diag)):
|
||||
with patch("a2a_tools_messaging.get_peers_with_diagnostic", return_value=([], diag)):
|
||||
result = await a2a_tools.tool_list_peers()
|
||||
assert "Cannot reach platform" in result
|
||||
assert "timed out" in result
|
||||
@@ -624,7 +624,7 @@ class TestToolListPeers:
|
||||
{"id": "ws-1", "name": "Alpha", "status": "online", "role": "worker"},
|
||||
{"id": "ws-2", "name": "Beta", "status": "idle", "role": "analyst"},
|
||||
]
|
||||
with patch("a2a_tools.get_peers_with_diagnostic", return_value=(peers, None)):
|
||||
with patch("a2a_tools_messaging.get_peers_with_diagnostic", return_value=(peers, None)):
|
||||
result = await a2a_tools.tool_list_peers()
|
||||
|
||||
assert "Alpha" in result
|
||||
@@ -641,7 +641,7 @@ class TestToolListPeers:
|
||||
# Clear any prior cache entries for these IDs
|
||||
a2a_tools._peer_names.pop("ws-cache-test", None)
|
||||
peers = [{"id": "ws-cache-test", "name": "CacheMe", "status": "online", "role": "w"}]
|
||||
with patch("a2a_tools.get_peers_with_diagnostic", return_value=(peers, None)):
|
||||
with patch("a2a_tools_messaging.get_peers_with_diagnostic", return_value=(peers, None)):
|
||||
await a2a_tools.tool_list_peers()
|
||||
|
||||
assert a2a_tools._peer_names.get("ws-cache-test") == "CacheMe"
|
||||
@@ -651,7 +651,7 @@ class TestToolListPeers:
|
||||
import a2a_tools
|
||||
|
||||
peers = [{"id": "ws-3", "name": "Gamma"}] # no status, no role
|
||||
with patch("a2a_tools.get_peers_with_diagnostic", return_value=(peers, None)):
|
||||
with patch("a2a_tools_messaging.get_peers_with_diagnostic", return_value=(peers, None)):
|
||||
result = await a2a_tools.tool_list_peers()
|
||||
|
||||
assert "Gamma" in result
|
||||
@@ -669,7 +669,7 @@ class TestToolGetWorkspaceInfo:
|
||||
import a2a_tools
|
||||
|
||||
info = {"id": "ws-test", "name": "My Workspace", "status": "online"}
|
||||
with patch("a2a_tools.get_workspace_info", return_value=info):
|
||||
with patch("a2a_tools_messaging.get_workspace_info", return_value=info):
|
||||
result = await a2a_tools.tool_get_workspace_info()
|
||||
|
||||
parsed = json.loads(result)
|
||||
@@ -678,7 +678,7 @@ class TestToolGetWorkspaceInfo:
|
||||
async def test_returns_error_dict_as_json(self):
|
||||
import a2a_tools
|
||||
|
||||
with patch("a2a_tools.get_workspace_info", return_value={"error": "not found"}):
|
||||
with patch("a2a_tools_messaging.get_workspace_info", return_value={"error": "not found"}):
|
||||
result = await a2a_tools.tool_get_workspace_info()
|
||||
|
||||
parsed = json.loads(result)
|
||||
@@ -994,7 +994,7 @@ class TestChatHistory:
|
||||
import a2a_tools
|
||||
|
||||
mc = _make_http_mock()
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.tool_chat_history(peer_id="")
|
||||
|
||||
mc.get.assert_not_called()
|
||||
@@ -1006,7 +1006,7 @@ class TestChatHistory:
|
||||
import a2a_tools
|
||||
|
||||
mc = _make_http_mock(get_resp=_resp(200, []))
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
await a2a_tools.tool_chat_history(peer_id=_PEER)
|
||||
|
||||
url, kwargs = mc.get.call_args.args[0], mc.get.call_args.kwargs
|
||||
@@ -1023,7 +1023,7 @@ class TestChatHistory:
|
||||
import a2a_tools
|
||||
|
||||
mc = _make_http_mock(get_resp=_resp(200, []))
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
await a2a_tools.tool_chat_history(peer_id=_PEER, limit=10000)
|
||||
|
||||
params = mc.get.call_args.kwargs["params"]
|
||||
@@ -1035,7 +1035,7 @@ class TestChatHistory:
|
||||
import a2a_tools
|
||||
|
||||
mc = _make_http_mock(get_resp=_resp(200, []))
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
await a2a_tools.tool_chat_history(peer_id=_PEER, limit=0)
|
||||
|
||||
assert mc.get.call_args.kwargs["params"]["limit"] == "20"
|
||||
@@ -1044,7 +1044,7 @@ class TestChatHistory:
|
||||
import a2a_tools
|
||||
|
||||
mc = _make_http_mock(get_resp=_resp(200, []))
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
await a2a_tools.tool_chat_history(
|
||||
peer_id=_PEER, before_ts="2026-05-01T00:00:00Z",
|
||||
)
|
||||
@@ -1063,7 +1063,7 @@ class TestChatHistory:
|
||||
import a2a_tools
|
||||
|
||||
mc = _make_http_mock(get_resp=_resp(200, []))
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.tool_chat_history(peer_id=_PEER)
|
||||
|
||||
# Exact-equality on the JSON literal (per assert-exact memory) —
|
||||
@@ -1084,7 +1084,7 @@ class TestChatHistory:
|
||||
{"id": "act-1", "created_at": "2026-05-01T00:01:00Z"},
|
||||
]
|
||||
mc = _make_http_mock(get_resp=_resp(200, rows))
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.tool_chat_history(peer_id=_PEER)
|
||||
|
||||
out = json.loads(result)
|
||||
@@ -1097,7 +1097,7 @@ class TestChatHistory:
|
||||
import a2a_tools
|
||||
|
||||
mc = _make_http_mock(get_resp=_resp(400, {"error": "peer_id must be a UUID"}))
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.tool_chat_history(peer_id="bad")
|
||||
|
||||
assert "peer_id must be a UUID" in result
|
||||
@@ -1108,7 +1108,7 @@ class TestChatHistory:
|
||||
import a2a_tools
|
||||
|
||||
mc = _make_http_mock(get_resp=_resp(500, {"error": "internal"}))
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.tool_chat_history(peer_id=_PEER)
|
||||
|
||||
assert result.startswith("Error:")
|
||||
@@ -1121,7 +1121,7 @@ class TestChatHistory:
|
||||
import a2a_tools
|
||||
|
||||
mc = _make_http_mock(get_exc=httpx.ConnectError("network down"))
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.tool_chat_history(peer_id=_PEER)
|
||||
|
||||
assert result.startswith("Error:")
|
||||
@@ -1135,7 +1135,7 @@ class TestChatHistory:
|
||||
import a2a_tools
|
||||
|
||||
mc = _make_http_mock(get_resp=_resp(200, {"unexpected": "shape"}))
|
||||
with patch("a2a_tools.httpx.AsyncClient", return_value=mc):
|
||||
with patch("a2a_tools_messaging.httpx.AsyncClient", return_value=mc):
|
||||
result = await a2a_tools.tool_chat_history(peer_id=_PEER)
|
||||
|
||||
assert result.startswith("Error:")
|
||||
|
||||
@@ -0,0 +1,181 @@
|
||||
"""Drift gate + import-contract tests for ``a2a_tools_inbox`` (RFC #2873 iter 4e).
|
||||
|
||||
The full behavior matrix for the three inbox tool wrappers lives in
|
||||
``test_a2a_tools_inbox_wrappers.py`` (kept on the public ``a2a_tools``
|
||||
module so the same tests pin both the alias and the underlying impl).
|
||||
|
||||
This file pins:
|
||||
|
||||
1. **Drift gate** — every previously-public symbol on ``a2a_tools``
|
||||
(``tool_inbox_peek``, ``tool_inbox_pop``, ``tool_wait_for_message``,
|
||||
``_enrich_inbound_for_agent``, ``_INBOX_NOT_ENABLED_MSG``) is the
|
||||
EXACT same object as ``a2a_tools_inbox.foo``. Refactor wrapping
|
||||
silently loses existing test coverage; this gate makes that drift
|
||||
fail fast.
|
||||
2. **Import contract** — ``a2a_tools_inbox`` does NOT pull in
|
||||
``a2a_tools`` at module-load time (the layered architecture: it
|
||||
depends only on stdlib + a lazy import of ``inbox`` + a lazy
|
||||
import of ``a2a_client``, never the kitchen-sink module that
|
||||
re-exports it).
|
||||
3. **_enrich_inbound_for_agent** branches that the wrapper tests
|
||||
can't easily reach: peer_id-empty (canvas_user) returns the
|
||||
dict unchanged; a2a_client unavailable degrades gracefully.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _require_workspace_id(monkeypatch):
|
||||
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
||||
monkeypatch.setenv("PLATFORM_URL", "http://test.invalid")
|
||||
yield
|
||||
|
||||
|
||||
# ============== Drift gate ==============
|
||||
|
||||
class TestBackCompatAliases:
|
||||
def test_tool_inbox_peek_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_inbox
|
||||
assert a2a_tools.tool_inbox_peek is a2a_tools_inbox.tool_inbox_peek
|
||||
|
||||
def test_tool_inbox_pop_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_inbox
|
||||
assert a2a_tools.tool_inbox_pop is a2a_tools_inbox.tool_inbox_pop
|
||||
|
||||
def test_tool_wait_for_message_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_inbox
|
||||
assert (
|
||||
a2a_tools.tool_wait_for_message is a2a_tools_inbox.tool_wait_for_message
|
||||
)
|
||||
|
||||
def test_enrich_helper_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_inbox
|
||||
assert (
|
||||
a2a_tools._enrich_inbound_for_agent
|
||||
is a2a_tools_inbox._enrich_inbound_for_agent
|
||||
)
|
||||
|
||||
def test_inbox_not_enabled_msg_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_inbox
|
||||
assert (
|
||||
a2a_tools._INBOX_NOT_ENABLED_MSG is a2a_tools_inbox._INBOX_NOT_ENABLED_MSG
|
||||
)
|
||||
|
||||
|
||||
# ============== Import contract ==============
|
||||
|
||||
class TestImportContract:
|
||||
def test_inbox_module_does_not_import_a2a_tools_eagerly(self):
|
||||
# Force a fresh load of a2a_tools_inbox without a2a_tools in sight.
|
||||
for k in [k for k in list(sys.modules) if k in (
|
||||
"a2a_tools_inbox", "a2a_tools",
|
||||
)]:
|
||||
sys.modules.pop(k, None)
|
||||
import a2a_tools_inbox # noqa: F401 — load only
|
||||
|
||||
# a2a_tools_inbox MUST NOT have caused a2a_tools to load. The
|
||||
# extracted module sits BELOW the kitchen-sink in the layering;
|
||||
# the dependency arrow points the other direction.
|
||||
assert "a2a_tools" not in sys.modules, (
|
||||
"a2a_tools_inbox eagerly imported a2a_tools — the kitchen-sink "
|
||||
"module must not be a load-time dependency of its slices."
|
||||
)
|
||||
|
||||
|
||||
# ============== _enrich_inbound_for_agent branches ==============
|
||||
|
||||
class TestEnrichInboundForAgent:
|
||||
def test_canvas_user_returns_dict_unchanged(self):
|
||||
# peer_id empty → canvas_user → no enrichment, no a2a_client touch.
|
||||
from a2a_tools_inbox import _enrich_inbound_for_agent
|
||||
|
||||
msg = {"activity_id": "a-1", "kind": "canvas_user", "peer_id": ""}
|
||||
result = _enrich_inbound_for_agent(msg)
|
||||
assert result is msg # same dict, mutated in place if at all
|
||||
assert "peer_name" not in result
|
||||
assert "peer_role" not in result
|
||||
assert "agent_card_url" not in result
|
||||
|
||||
def test_missing_peer_id_key_returns_unchanged(self):
|
||||
from a2a_tools_inbox import _enrich_inbound_for_agent
|
||||
|
||||
msg = {"activity_id": "a-2", "kind": "canvas_user"} # no peer_id key
|
||||
result = _enrich_inbound_for_agent(msg)
|
||||
assert result is msg
|
||||
assert "agent_card_url" not in result
|
||||
|
||||
def test_a2a_client_unavailable_degrades_gracefully(self, monkeypatch):
|
||||
# Simulate a2a_client import failing (test harness, partial
|
||||
# install). The helper must return the bare envelope, not raise.
|
||||
from a2a_tools_inbox import _enrich_inbound_for_agent
|
||||
|
||||
# Force an ImportError by poisoning sys.modules.
|
||||
import builtins
|
||||
real_import = builtins.__import__
|
||||
|
||||
def fake_import(name, *args, **kwargs):
|
||||
if name == "a2a_client":
|
||||
raise ImportError("simulated a2a_client unavailable")
|
||||
return real_import(name, *args, **kwargs)
|
||||
|
||||
monkeypatch.setattr(builtins, "__import__", fake_import)
|
||||
|
||||
msg = {"activity_id": "a-3", "kind": "peer_agent", "peer_id": "ws-x"}
|
||||
result = _enrich_inbound_for_agent(msg)
|
||||
# Bare envelope back — no peer_name, no agent_card_url. Crucially
|
||||
# the helper did NOT raise, so the inbox tool surfaces the message
|
||||
# to the agent even when the registry is unreachable.
|
||||
assert result is msg
|
||||
assert "peer_name" not in result
|
||||
assert "agent_card_url" not in result
|
||||
|
||||
def test_registry_record_populates_peer_name_and_role(self, monkeypatch):
|
||||
from a2a_tools_inbox import _enrich_inbound_for_agent
|
||||
|
||||
# Stub out the lazy-imported a2a_client functions.
|
||||
import sys
|
||||
import types
|
||||
fake_a2a_client = types.SimpleNamespace(
|
||||
_agent_card_url_for=lambda pid: f"http://test/agent/{pid}",
|
||||
enrich_peer_metadata_nonblocking=lambda pid: {
|
||||
"name": "PeerOne",
|
||||
"role": "worker",
|
||||
},
|
||||
)
|
||||
monkeypatch.setitem(sys.modules, "a2a_client", fake_a2a_client)
|
||||
|
||||
msg = {"activity_id": "a-4", "kind": "peer_agent", "peer_id": "ws-1"}
|
||||
result = _enrich_inbound_for_agent(msg)
|
||||
assert result["peer_name"] == "PeerOne"
|
||||
assert result["peer_role"] == "worker"
|
||||
assert result["agent_card_url"] == "http://test/agent/ws-1"
|
||||
|
||||
def test_registry_miss_keeps_agent_card_url(self, monkeypatch):
|
||||
# On registry cache miss the helper still surfaces agent_card_url
|
||||
# because it's constructable from peer_id alone — preserves the
|
||||
# contract that the receiving agent always has somewhere to
|
||||
# fetch the peer's full capability list.
|
||||
from a2a_tools_inbox import _enrich_inbound_for_agent
|
||||
|
||||
import sys
|
||||
import types
|
||||
fake_a2a_client = types.SimpleNamespace(
|
||||
_agent_card_url_for=lambda pid: f"http://test/agent/{pid}",
|
||||
enrich_peer_metadata_nonblocking=lambda pid: None, # cache miss
|
||||
)
|
||||
monkeypatch.setitem(sys.modules, "a2a_client", fake_a2a_client)
|
||||
|
||||
msg = {"activity_id": "a-5", "kind": "peer_agent", "peer_id": "ws-2"}
|
||||
result = _enrich_inbound_for_agent(msg)
|
||||
assert "peer_name" not in result
|
||||
assert "peer_role" not in result
|
||||
assert result["agent_card_url"] == "http://test/agent/ws-2"
|
||||
@@ -0,0 +1,196 @@
|
||||
"""Direct unit tests for the three inbox tool wrappers in ``a2a_tools``.
|
||||
|
||||
After RFC #2873 iter 4d (messaging extraction), ``a2a_tools.py`` is
|
||||
mostly back-compat re-exports — the only behavior still defined here
|
||||
is ``report_activity`` plus three thin wrappers around the inbox state
|
||||
machine: ``tool_inbox_peek`` / ``tool_inbox_pop`` / ``tool_wait_for_message``.
|
||||
|
||||
These wrappers were never exercised at the module level, so the
|
||||
critical-path coverage gate (75% per-file floor for MCP/inbox/auth)
|
||||
dropped to 54% on iter 4d. This file pins each wrapper's behavior
|
||||
directly so the floor is met without changing the gate.
|
||||
|
||||
The wrappers are ~40 LOC of glue. The full delivery behavior
|
||||
(persistence, 410 recovery, etc.) is exercised in test_inbox.py.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _require_workspace_id(monkeypatch):
|
||||
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
||||
monkeypatch.setenv("PLATFORM_URL", "http://test.invalid")
|
||||
yield
|
||||
|
||||
|
||||
def _run(coro):
|
||||
return asyncio.get_event_loop().run_until_complete(coro)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# tool_inbox_peek
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestToolInboxPeek:
|
||||
def test_returns_not_enabled_when_state_none(self):
|
||||
import a2a_tools
|
||||
|
||||
with patch("inbox.get_state", return_value=None):
|
||||
out = _run(a2a_tools.tool_inbox_peek())
|
||||
assert "not enabled" in out
|
||||
|
||||
def test_returns_json_array_of_messages(self):
|
||||
import a2a_tools
|
||||
|
||||
msg1 = MagicMock()
|
||||
msg1.to_dict.return_value = {"activity_id": "a1", "kind": "canvas_user"}
|
||||
msg2 = MagicMock()
|
||||
msg2.to_dict.return_value = {"activity_id": "a2", "kind": "peer_agent"}
|
||||
|
||||
fake_state = MagicMock()
|
||||
fake_state.peek.return_value = [msg1, msg2]
|
||||
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
out = _run(a2a_tools.tool_inbox_peek(limit=5))
|
||||
# peek limit is forwarded
|
||||
fake_state.peek.assert_called_once_with(limit=5)
|
||||
parsed = json.loads(out)
|
||||
assert len(parsed) == 2
|
||||
assert parsed[0]["activity_id"] == "a1"
|
||||
|
||||
def test_non_int_limit_falls_back_to_10(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
fake_state.peek.return_value = []
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
_run(a2a_tools.tool_inbox_peek(limit="garbage")) # type: ignore[arg-type]
|
||||
fake_state.peek.assert_called_once_with(limit=10)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# tool_inbox_pop
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestToolInboxPop:
|
||||
def test_returns_not_enabled_when_state_none(self):
|
||||
import a2a_tools
|
||||
|
||||
with patch("inbox.get_state", return_value=None):
|
||||
out = _run(a2a_tools.tool_inbox_pop("act-1"))
|
||||
assert "not enabled" in out
|
||||
|
||||
def test_rejects_empty_activity_id(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
out = _run(a2a_tools.tool_inbox_pop(""))
|
||||
assert "activity_id is required" in out
|
||||
fake_state.pop.assert_not_called()
|
||||
|
||||
def test_rejects_non_str_activity_id(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
out = _run(a2a_tools.tool_inbox_pop(123)) # type: ignore[arg-type]
|
||||
assert "activity_id is required" in out
|
||||
fake_state.pop.assert_not_called()
|
||||
|
||||
def test_returns_removed_true_when_popped(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
fake_state.pop.return_value = MagicMock() # truthy = something was removed
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
out = _run(a2a_tools.tool_inbox_pop("act-7"))
|
||||
parsed = json.loads(out)
|
||||
assert parsed == {"removed": True, "activity_id": "act-7"}
|
||||
fake_state.pop.assert_called_once_with("act-7")
|
||||
|
||||
def test_returns_removed_false_when_unknown(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
fake_state.pop.return_value = None
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
out = _run(a2a_tools.tool_inbox_pop("act-missing"))
|
||||
parsed = json.loads(out)
|
||||
assert parsed == {"removed": False, "activity_id": "act-missing"}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# tool_wait_for_message
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestToolWaitForMessage:
|
||||
def test_returns_not_enabled_when_state_none(self):
|
||||
import a2a_tools
|
||||
|
||||
with patch("inbox.get_state", return_value=None):
|
||||
out = _run(a2a_tools.tool_wait_for_message(timeout_secs=1.0))
|
||||
assert "not enabled" in out
|
||||
|
||||
def test_timeout_payload_when_no_message(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
fake_state.wait.return_value = None
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
out = _run(a2a_tools.tool_wait_for_message(timeout_secs=0.1))
|
||||
parsed = json.loads(out)
|
||||
assert parsed["timeout"] is True
|
||||
assert parsed["timeout_secs"] == 0.1
|
||||
|
||||
def test_returns_message_when_delivered(self):
|
||||
import a2a_tools
|
||||
|
||||
msg = MagicMock()
|
||||
msg.to_dict.return_value = {"activity_id": "a-9", "kind": "peer_agent"}
|
||||
fake_state = MagicMock()
|
||||
fake_state.wait.return_value = msg
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
out = _run(a2a_tools.tool_wait_for_message(timeout_secs=2.0))
|
||||
parsed = json.loads(out)
|
||||
assert parsed["activity_id"] == "a-9"
|
||||
|
||||
def test_timeout_clamped_to_300(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
fake_state.wait.return_value = None
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
_run(a2a_tools.tool_wait_for_message(timeout_secs=99999))
|
||||
# Whatever wait was called with, it must not exceed 300
|
||||
passed = fake_state.wait.call_args.args[0]
|
||||
assert passed == 300.0
|
||||
|
||||
def test_timeout_clamped_to_zero_floor(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
fake_state.wait.return_value = None
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
_run(a2a_tools.tool_wait_for_message(timeout_secs=-5))
|
||||
passed = fake_state.wait.call_args.args[0]
|
||||
assert passed == 0.0
|
||||
|
||||
def test_non_numeric_timeout_falls_back_to_60(self):
|
||||
import a2a_tools
|
||||
|
||||
fake_state = MagicMock()
|
||||
fake_state.wait.return_value = None
|
||||
with patch("inbox.get_state", return_value=fake_state):
|
||||
_run(a2a_tools.tool_wait_for_message(timeout_secs="garbage")) # type: ignore[arg-type]
|
||||
passed = fake_state.wait.call_args.args[0]
|
||||
assert passed == 60.0
|
||||
@@ -0,0 +1,92 @@
|
||||
"""Drift gate + smoke tests for ``a2a_tools_messaging`` (RFC #2873 iter 4d).
|
||||
|
||||
The full behavior matrix lives in ``test_a2a_tools_impl.py`` —
|
||||
TestToolSendMessageToUser + TestToolListPeers + TestToolGetWorkspaceInfo
|
||||
+ TestChatHistory all patch ``a2a_tools_messaging.foo`` after the iter
|
||||
4d retarget.
|
||||
|
||||
This file pins:
|
||||
|
||||
1. **Drift gate** — every previously-public symbol on ``a2a_tools``
|
||||
is the EXACT same callable / value as ``a2a_tools_messaging.foo``.
|
||||
Wraps would silently lose existing test coverage; this gate
|
||||
fails fast on that drift.
|
||||
2. **Import contract** — ``a2a_tools_messaging`` does NOT pull in
|
||||
``a2a_tools`` at module-load time (the layered architecture: it
|
||||
depends on ``a2a_tools_rbac`` + ``a2a_client`` + ``platform_auth``,
|
||||
never the kitchen-sink module).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _require_workspace_id(monkeypatch):
|
||||
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
||||
monkeypatch.setenv("PLATFORM_URL", "http://test.invalid")
|
||||
yield
|
||||
|
||||
|
||||
# ============== Drift gate ==============
|
||||
|
||||
class TestBackCompatAliases:
|
||||
def test_tool_send_message_to_user_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_messaging
|
||||
assert (
|
||||
a2a_tools.tool_send_message_to_user
|
||||
is a2a_tools_messaging.tool_send_message_to_user
|
||||
)
|
||||
|
||||
def test_tool_list_peers_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_messaging
|
||||
assert a2a_tools.tool_list_peers is a2a_tools_messaging.tool_list_peers
|
||||
|
||||
def test_tool_get_workspace_info_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_messaging
|
||||
assert (
|
||||
a2a_tools.tool_get_workspace_info
|
||||
is a2a_tools_messaging.tool_get_workspace_info
|
||||
)
|
||||
|
||||
def test_tool_chat_history_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_messaging
|
||||
assert a2a_tools.tool_chat_history is a2a_tools_messaging.tool_chat_history
|
||||
|
||||
def test_upload_chat_files_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_messaging
|
||||
assert a2a_tools._upload_chat_files is a2a_tools_messaging._upload_chat_files
|
||||
|
||||
|
||||
# ============== Import contract ==============
|
||||
|
||||
class TestImportContract:
|
||||
def test_messaging_module_does_not_load_a2a_tools(self, monkeypatch):
|
||||
"""`a2a_tools_messaging` must depend on `a2a_tools_rbac` (the
|
||||
layered architecture), `a2a_client`, and `platform_auth` — but
|
||||
NEVER on the kitchen-sink `a2a_tools`. Top-level
|
||||
`from a2a_tools import …` would re-introduce the circular
|
||||
dependency that motivated the lazy-import contract for the
|
||||
delegation module."""
|
||||
for m in ("a2a_tools", "a2a_tools_messaging"):
|
||||
sys.modules.pop(m, None)
|
||||
|
||||
import a2a_tools_messaging # noqa: F401
|
||||
assert "a2a_tools_messaging" in sys.modules
|
||||
|
||||
def test_a2a_tools_re_exports_messaging_handlers(self):
|
||||
"""Opposite direction: a2a_tools surfaces every messaging
|
||||
symbol so existing call sites + tests work unchanged."""
|
||||
import a2a_tools
|
||||
assert hasattr(a2a_tools, "tool_send_message_to_user")
|
||||
assert hasattr(a2a_tools, "tool_list_peers")
|
||||
assert hasattr(a2a_tools, "tool_get_workspace_info")
|
||||
assert hasattr(a2a_tools, "tool_chat_history")
|
||||
assert hasattr(a2a_tools, "_upload_chat_files")
|
||||
@@ -229,3 +229,87 @@ class TestResolveWorkspacesDirect:
|
||||
out, errors = mcp_workspace_resolver.resolve_workspaces()
|
||||
assert out == [("ws-a", "a"), ("ws-b", "b")]
|
||||
assert errors == []
|
||||
|
||||
|
||||
# ============== Token-from-file env var (issue #2934) ==============
|
||||
|
||||
class TestTokenFileEnv:
|
||||
"""``MOLECULE_WORKSPACE_TOKEN_FILE`` lets operators keep the bearer
|
||||
out of shell history and out of MCP-host config plaintext (e.g.
|
||||
~/.claude.json). Resolution order: inline TOKEN env > TOKEN_FILE
|
||||
env > ${CONFIGS_DIR}/.auth_token.
|
||||
"""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _isolate(self, monkeypatch, tmp_path):
|
||||
for v in (
|
||||
"WORKSPACE_ID",
|
||||
"MOLECULE_WORKSPACE_TOKEN",
|
||||
"MOLECULE_WORKSPACE_TOKEN_FILE",
|
||||
"MOLECULE_WORKSPACES",
|
||||
):
|
||||
monkeypatch.delenv(v, raising=False)
|
||||
# Point CONFIGS_DIR at an empty tmp_path so the .auth_token
|
||||
# fallback returns "" — keeps the test cases unambiguous.
|
||||
monkeypatch.setenv("CONFIGS_DIR", str(tmp_path))
|
||||
yield tmp_path
|
||||
|
||||
def test_token_file_env_resolves(self, monkeypatch, tmp_path):
|
||||
token_path = tmp_path / "token.txt"
|
||||
token_path.write_text("file-tok-123\n") # trailing newline must strip
|
||||
monkeypatch.setenv("WORKSPACE_ID", "ws-1")
|
||||
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN_FILE", str(token_path))
|
||||
out, errors = mcp_workspace_resolver.resolve_workspaces()
|
||||
assert out == [("ws-1", "file-tok-123")]
|
||||
assert errors == []
|
||||
|
||||
def test_inline_token_takes_precedence_over_file(self, monkeypatch, tmp_path):
|
||||
# If both env vars are set, inline wins — matches the docstring's
|
||||
# documented order. (Operators sometimes set both during a
|
||||
# rotation; we want predictable behavior.)
|
||||
token_path = tmp_path / "token.txt"
|
||||
token_path.write_text("file-tok")
|
||||
monkeypatch.setenv("WORKSPACE_ID", "ws-1")
|
||||
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "inline-tok")
|
||||
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN_FILE", str(token_path))
|
||||
out, _ = mcp_workspace_resolver.resolve_workspaces()
|
||||
assert out == [("ws-1", "inline-tok")]
|
||||
|
||||
def test_missing_file_falls_through_to_error(self, monkeypatch, tmp_path):
|
||||
# Pointed at a non-existent path — resolver should return the
|
||||
# combined "no token" error, NOT crash.
|
||||
monkeypatch.setenv("WORKSPACE_ID", "ws-1")
|
||||
monkeypatch.setenv(
|
||||
"MOLECULE_WORKSPACE_TOKEN_FILE", str(tmp_path / "does-not-exist")
|
||||
)
|
||||
out, errors = mcp_workspace_resolver.resolve_workspaces()
|
||||
assert out == []
|
||||
assert any("MOLECULE_WORKSPACE_TOKEN_FILE" in e for e in errors)
|
||||
|
||||
def test_empty_file_falls_through_to_error(self, monkeypatch, tmp_path):
|
||||
# File exists but is blank — same shape as no token at all.
|
||||
token_path = tmp_path / "empty.txt"
|
||||
token_path.write_text("")
|
||||
monkeypatch.setenv("WORKSPACE_ID", "ws-1")
|
||||
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN_FILE", str(token_path))
|
||||
out, errors = mcp_workspace_resolver.resolve_workspaces()
|
||||
assert out == []
|
||||
assert errors # at least one combined error message
|
||||
|
||||
def test_blank_env_var_treated_as_unset(self, monkeypatch):
|
||||
# Empty string is treated as "not set" — common pitfall when
|
||||
# users export an unset shell var.
|
||||
monkeypatch.setenv("WORKSPACE_ID", "ws-1")
|
||||
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN_FILE", "")
|
||||
out, errors = mcp_workspace_resolver.resolve_workspaces()
|
||||
assert out == []
|
||||
assert errors
|
||||
|
||||
def test_help_message_advertises_token_file(self, capsys):
|
||||
# Help text must mention TOKEN_FILE so a first-run operator
|
||||
# learns about the safer option without grepping the source.
|
||||
mcp_workspace_resolver.print_missing_env_help(
|
||||
["WORKSPACE_ID", "MOLECULE_WORKSPACE_TOKEN"], have_token_file=False
|
||||
)
|
||||
err = capsys.readouterr().err
|
||||
assert "MOLECULE_WORKSPACE_TOKEN_FILE" in err
|
||||
|
||||
Reference in New Issue
Block a user