Compare commits

..

9 Commits

Author SHA1 Message Date
molecule-ai[bot] f5ea812e9d Merge pull request #2947 from Molecule-AI/staging
staging → main: auto-promote c4807a9
2026-05-05 22:22:58 +00:00
Hongming Wang c4807a930d Merge pull request #2940 from Molecule-AI/refactor/a2a-tools-inbox-extract-rfc2873-iter4e
refactor(workspace): extract inbox tools from a2a_tools.py (RFC #2873 iter 4e)
2026-05-05 21:58:32 +00:00
Hongming Wang d22fbb29b8 Merge pull request #2944 from Molecule-AI/fix-mcp-send-message-to-user-persist
fix(mcp): persist send_message_to_user pushes to activity_log (reno-stars data loss)
2026-05-05 21:57:37 +00:00
Hongming Wang 899c53550d test(mcp): comprehensive coverage for send_message_to_user persistence + AST gate (reno-stars followup)
Per user request: audit all similar tools + write comprehensive tests
including E2E for the persistence-of-AGENT_MESSAGE-broadcasts contract.

Audit (all BroadcastOnly call sites in workspace-server/internal/):

  | Site | Event | Persisted? | Notes |
  |---|---|:---:|---|
  | a2a_proxy_helpers.go:275 | A2A_RESPONSE | ✓ | LogActivity above |
  | activity.go:486 (Notify) | AGENT_MESSAGE | ✓ | INSERT line 535 |
  | activity.go:701 (LogActivity) | ACTIVITY_LOGGED | ✓ | self-emits inside DB write |
  | mcp_tools.go:341 (toolSendMessageToUser) | AGENT_MESSAGE | ✓ NEW (this PR) |
  | registry.go:575 | TASK_UPDATED | N/A | transient progress, not chat |
  | registry.go:596 | WORKSPACE_HEARTBEAT | N/A | infra ping, not chat |

Only one chat-bearing broadcast was missing persistence (the just-
fixed mcp bridge path). No other regressions found.

Tests added (4 new, total 5 send_message_to_user tests):

1. TestAgentMessageBroadcastsArePersisted — AST gate that walks every
   non-test .go in the package, finds funcs that BroadcastOnly with
   "AGENT_MESSAGE", asserts each ALSO contains an
   "INSERT INTO activity_logs". Forward-looking regression block:
   any future chat tool that broadcasts without persisting fails the
   test with a clear file:func diagnostic. Mutation-tested locally:
   removing the INSERT block from toolSendMessageToUser reliably
   produces the expected failure.

2. TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s — pins
   the "best-effort persistence" contract. DB INSERT failures must
   NOT abort the tool response (the WS broadcast already succeeded;
   retrying would double-render in the live chat). Matches /notify.

3. TestMCPHandler_SendMessageToUser_ResponseBodyShape — pins the
   exact `{"result": "<message>"}` JSON shape stored in
   response_body. The canvas hydrater (extractResponseText in
   historyHydration.ts) reads body.result; any drift here silently
   breaks chat history without failing the INSERT. Per memory
   feedback_assert_exact_not_substring.md, asserts the literal JSON
   shape, not a substring.

4. TestMCPHandler_SendMessageToUser_PersistsToActivityLog (existing,
   from previous commit) — pins INSERT shape with regex on
   'a2a_receive' + 'notify' literals.

5. TestMCPHandler_SendMessageToUser_Blocked_WhenEnvNotSet (existing)
   — env-gate aborts before DB.

Test fixture cleanup: newMCPHandler now uses newTestBroadcaster (real
ws.Hub) instead of events.NewBroadcaster(nil) — the latter nil-panics
inside hub.Broadcast on the AGENT_MESSAGE path. Same broadcaster
shape every other handler test uses.

E2E note: the AST gate is the strongest forward-looking guarantee.
A real-DB integration test would add value for CI but is largely
duplicative of the sqlmock contract tests above (sqlmock pins SQL
shape with much faster feedback). Left as a future enhancement when
the handlers Postgres-integration suite extends MCP coverage.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 14:52:32 -07:00
Hongming Wang cdfc9f743f fix(mcp): persist send_message_to_user pushes to activity_log (reno-stars data loss)
Reported on production tenant reno-stars: an external claude-code agent
(CEO Ryan PC workspace) sent a long-form message via send_message_to_user;
the user saw it live in the chat panel but it vanished after a refresh.
Confirmed via direct production query — the message is NOT in
activity_logs at all (only short test pings around it are persisted).

Root cause: there are TWO server-side handlers for send_message_to_user:

  1. HTTP `/workspaces/:id/notify` (activity.go:Notify) — broadcasts WS
     AND inserts a row into activity_logs. This is the path the
     in-container runtime's tool_send_message_to_user calls.

  2. MCP-bridge `tools/call name=send_message_to_user`
     (mcp_tools.go:toolSendMessageToUser) — broadcasts WS only,
     **never persisted**. This is the path EXTERNAL agents using
     molecule-mcp's send_message_to_user tool route through.

The persistence fix landed for path 1 months ago but was never mirrored
on path 2. External agents — exactly the case in reno-stars/CEO Ryan PC
— have been silently losing every long-form notification on reload.

Fix: mirror the activity.go INSERT shape inside toolSendMessageToUser:

  INSERT INTO activity_logs
    (workspace_id, activity_type, method, summary, response_body, status)
  VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')

Same wire shape as /notify so the canvas's chat-history hydration
(`type=a2a_receive&source=canvas`) treats both writers identically.
Errors are log-only — broadcast already succeeded, persistence failure
shouldn't block the tool response (matches /notify behavior; downside
is the same data-loss-on-DB-error risk, surfaced via log.Printf).

Tests
-----

- `TestMCPHandler_SendMessageToUser_PersistsToActivityLog` — pins both
  the workspace-name lookup AND the INSERT shape. Regex-matches
  `'a2a_receive'` + `'notify'` literals so a future refactor that
  changes activity_type or method breaks the test loud, not silently
  re-introducing the data-loss bug.
- Updated newMCPHandler to use newTestBroadcaster() (real ws.Hub) —
  events.NewBroadcaster(nil) crashes inside hub.Broadcast in the
  send_message_to_user path. Same shape every other handler test uses.

Verified `go test ./internal/handlers/ -run TestMCPHandler_SendMessage`
green; full vet clean.

Refs reno-stars production incident 2026-05-05.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 14:47:48 -07:00
molecule-ai[bot] 7a2664523c Merge pull request #2942 from Molecule-AI/staging
staging → main: auto-promote 1ad107c
2026-05-05 14:47:21 -07:00
Hongming Wang 475da5b64c refactor(workspace): extract inbox tools from a2a_tools.py (RFC #2873 iter 4e)
Continues the OSS-shape refactor. After iters 4a-4d (rbac, delegation,
memory, messaging) the only behavior left in ``a2a_tools.py`` was
``report_activity`` plus three thin inbox-tool wrappers and the
``_enrich_inbound_for_agent`` helper. This iter extracts the inbox
slice to ``a2a_tools_inbox.py`` so the kitchen-sink module shrinks
from 280 LOC to ~165 LOC of imports + report_activity + back-compat
re-export blocks.

Extracted symbols:
  - ``_INBOX_NOT_ENABLED_MSG`` (sentinel)
  - ``_enrich_inbound_for_agent`` (poll-path peer enrichment helper)
  - ``tool_inbox_peek``
  - ``tool_inbox_pop``
  - ``tool_wait_for_message``

Re-exports (`from a2a_tools_inbox import …`) preserve the public
``a2a_tools.tool_inbox_*`` surface so existing tests + call sites
continue to resolve unchanged.

New tests in test_a2a_tools_inbox_split.py:
  1. **Drift gate (5)** — every previously-public symbol on a2a_tools
     is the EXACT same object as a2a_tools_inbox.foo (`is`, not `==`),
     catches a future "wrap with logging" refactor that silently loses
     existing test coverage.
  2. **Import contract (1)** — a2a_tools_inbox does NOT eagerly import
     a2a_tools at module load. Pins the layered architecture: the
     extracted slice depends on ``inbox`` + a lazy ``a2a_client``
     import, never on the kitchen-sink that re-exports it.
  3. **_enrich_inbound_for_agent branches (5)** — peer_id-empty
     (canvas_user) returns dict unchanged; missing peer_id key same;
     a2a_client unavailable (test harness, partial install) degrades
     gracefully with a bare envelope; registry hit populates
     peer_name + peer_role + agent_card_url; registry miss still
     surfaces agent_card_url (constructable from peer_id alone).

The full timeout-clamp / validation / JSON-shape behavior matrix for
the three wrappers stays in test_a2a_tools_inbox_wrappers.py — those
tests pass identically against both the alias and the underlying impl.

Wiring updates:
  - ``scripts/build_runtime_package.py``: add ``a2a_tools_inbox`` to
    ``TOP_LEVEL_MODULES`` so it ships in the runtime wheel and the
    drift gate doesn't fail the next publish.
  - ``.github/workflows/ci.yml``: add ``a2a_tools_inbox.py`` to
    ``CRITICAL_FILES`` so the 75% MCP/inbox/auth per-file floor
    applies — this is now where the inbox-delivery code actually
    lives.
2026-05-05 14:28:58 -07:00
Hongming Wang 1ad107cc15 Merge pull request #2935 from Molecule-AI/fix/onboarding-friction-2934
fix(onboarding): address Claude Code MCP onboarding friction (#2934)
2026-05-05 21:25:57 +00:00
Hongming Wang 01deeb36cf fix(onboarding): address Claude Code MCP onboarding friction (#2934)
Ryan's bug report (#2934) walked through ~45 min of debugging a stock
external-runtime install. This PR fixes the four items he flagged that
have a small surface, and stubs out the larger ones for follow-up.

Fixed in this PR
================

#1 — Python floor disclosure (README in publish bundle)
  Add an explicit "Requires Python ≥3.11" section that calls out the
  cryptic "Could not find a version that satisfies the requirement"
  failure mode; recommend `pipx install` over `pip install` so the
  binary lands on PATH automatically; show the explicit `pip install
  --user` alternative with the PATH caveat.

#3 — MOLECULE_WORKSPACE_TOKEN_FILE support (mcp_workspace_resolver.py)
  Add a third resolution step between the inline env var and the
  in-container CONFIGS_DIR fallback. Operators can write the bearer to
  a 0600 file (e.g. ~/.config/molecule/token) and point
  MOLECULE_WORKSPACE_TOKEN_FILE at it, keeping the secret out of
  ~/.zsh_history and out of plaintext in MCP-host configs like
  ~/.claude.json. Inline TOKEN still wins on conflict so rotation flows
  are predictable. README documents the safer option as the
  recommended path. 6 new tests pin every leg (file resolves, inline
  wins, missing/empty file falls through, blank env unset-equivalent,
  help text advertises it).

#4 — Push delivery 3-condition gating (README in publish bundle)
  Document that real-time push on Claude Code requires (a) the server
  to declare experimental.claude/channel (we do), (b) the server to be
  marketplace-plugin-sourced (operators must scaffold their own until
  the official marketplace lands — see #2934 follow-up), and (c) the
  --dangerously-load-development-channels flag on the claude
  invocation. Until any of the three is in place, delivery silently
  falls back to poll mode with no diagnostic. The README now says all
  of this explicitly so a new operator doesn't grep the binary for
  channel_enable to figure it out.

#8 — serverInfo.name mismatch (a2a_mcp_server.py)
  The server reported `serverInfo.name = "a2a-delegation"` while
  operators register it as `molecule` (the name in `claude mcp add
  molecule …`). Harmless on tool routing today but matters for any
  future Claude Code allowlist that gates push by hardcoded server
  name. Renamed to "molecule" with an inline comment explaining the
  invariant.

Deferred (separate issues to track)
===================================

#2 — covered transitively by #1's pipx recommendation; no separate fix.
#5 — `moleculesai/claude-code-plugin` marketplace repo (substantial new
     repo work; the README references it as a documented follow-up).
#6 — `molecule-mcp doctor` subcommand (substantial new CLI surface;
     mentioned in the README's push-vs-poll section as the planned
     diagnostic for silent push fallback).
#7 — `--dangerously-load-development-channels` rename — not in our
     control; that's Claude Code's flag.

Tests
=====
164/164 mcp_cli + a2a_mcp_server tests pass locally
(WORKSPACE_ID=00000000-0000-0000-0000-000000000001 pytest …) including
6 new TestTokenFileEnv cases. Wheel builds successfully via
scripts/build_runtime_package.py with the new README markers verified
in the output.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 14:19:09 -07:00
11 changed files with 946 additions and 133 deletions
+1
View File
@@ -387,6 +387,7 @@ jobs:
"a2a_mcp_server.py"
"mcp_cli.py"
"a2a_tools.py"
"a2a_tools_inbox.py"
"inbox.py"
"platform_auth.py"
)
+84 -2
View File
@@ -56,6 +56,7 @@ TOP_LEVEL_MODULES = {
"a2a_mcp_server",
"a2a_tools",
"a2a_tools_delegation",
"a2a_tools_inbox",
"a2a_tools_memory",
"a2a_tools_messaging",
"a2a_tools_rbac",
@@ -290,10 +291,37 @@ directory** by the `publish-runtime` GitHub Actions workflow on every
Operators running an agent outside the platform's container fleet
(any runtime that supports MCP stdio — Claude Code, hermes, codex,
etc.) can install this wheel and run the universal MCP server
locally:
locally.
### Requirements
* **Python ≥3.11.** The wheel sets `requires-python = ">=3.11"`. On
older interpreters `pip install` returns the cryptic
`Could not find a version that satisfies the requirement` — that
message is pip filtering this wheel out, NOT the package missing
from PyPI. Upgrade with `brew install python@3.12` /
`apt install python3.12` / `pyenv install 3.12` first.
* **`pipx` recommended over `pip`.** `pipx install` puts
`molecule-mcp` on PATH automatically and isolates the runtime's
deps from your system Python. Plain `pip install --user` works
but the binary lands in `~/.local/bin` (Linux) or
`~/Library/Python/3.X/bin` (macOS) which is often not on PATH on
a fresh shell — `claude mcp add molecule -- molecule-mcp` then
fails with "command not found" at first use.
### Install
```sh
# Recommended:
pipx install molecule-ai-workspace-runtime
# Alternative (manage PATH yourself):
pip install --user molecule-ai-workspace-runtime
```
### Run
```sh
pip install molecule-ai-workspace-runtime
WORKSPACE_ID=<uuid> \\
PLATFORM_URL=https://<tenant>.staging.moleculesai.app \\
MOLECULE_WORKSPACE_TOKEN=<bearer> \\
@@ -306,10 +334,64 @@ runtimes already get via the workspace's auto-spawned MCP. Register
the binary in your agent's MCP config (e.g. Claude Code's
`claude mcp add molecule -- molecule-mcp` with the env above).
### Keeping the token out of shell history
Inline `MOLECULE_WORKSPACE_TOKEN=<bearer>` ends up in `~/.zsh_history`
and (when registered via `claude mcp add`) plaintext in
`~/.claude.json`. To avoid that, write the token to a 0600 file and
point `MOLECULE_WORKSPACE_TOKEN_FILE` at it:
```sh
umask 077
printf '%s' "<bearer>" > ~/.config/molecule/token
WORKSPACE_ID=<uuid> \\
PLATFORM_URL=https://<tenant>.staging.moleculesai.app \\
MOLECULE_WORKSPACE_TOKEN_FILE=$HOME/.config/molecule/token \\
molecule-mcp
```
Token resolution order: `MOLECULE_WORKSPACE_TOKEN` (inline env) →
`MOLECULE_WORKSPACE_TOKEN_FILE` (path) → `${CONFIGS_DIR}/.auth_token`
(in-container default).
The token comes from the canvas → Tokens tab. Restarting an external
workspace from the canvas no longer revokes the token (PR #2412), so
operator tokens persist across status nudges.
### Push vs poll delivery (Claude Code specifics)
By default the inbox runs in **poll mode** — every turn the agent
calls `wait_for_message`, which blocks up to ~60s on
`/activity?since_id=…`. Real-time push delivery is also supported,
but on Claude Code it requires THREE conditions, ALL of which must
hold:
1. **The MCP server declares `experimental.claude/channel`** — this
wheel does (see `_build_initialize_result`). Nothing for you to
do.
2. **Claude Code installs the server as a marketplace plugin** — a
plain `claude mcp add molecule -- molecule-mcp` produces a
non-plugin-sourced server, which Claude Code rejects with
`channel_enable requires a marketplace plugin`. Until the
official `moleculesai/claude-code-plugin` marketplace lands
(issue #2934 follow-up), operators who want push must scaffold
their own local marketplace under
`~/.claude/marketplaces/molecule-local/` containing a
`marketplace.json` + `plugin.json` that points at this wheel.
3. **Claude Code is launched with the dev-channels flag** — pass
`--dangerously-load-development-channels plugin:molecule@<marketplace>`
on the `claude` invocation. Without this flag the channel
capability is silently ignored.
Symptom of any condition failing: messages arrive but only via the
poll path (every ~160s), not real-time. There's currently no
diagnostic surfaced — `molecule-mcp doctor` (issue #2934 follow-up)
is planned.
If you don't need real-time push, the default poll path works
universally with no extra setup; both modes converge on the same
`inbox_pop` ack so messages never duplicate.
See [`docs/workspace-runtime-package.md`](https://github.com/Molecule-AI/molecule-core/blob/main/docs/workspace-runtime-package.md)
for the publish flow and architecture.
"""
@@ -0,0 +1,177 @@
package handlers
import (
"go/ast"
"go/parser"
"go/token"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"testing"
)
// TestAgentMessageBroadcastsArePersisted is a forward-looking AST
// gate: every function in this package that broadcasts an
// `AGENT_MESSAGE` WebSocket event MUST also call
// `INSERT INTO activity_logs` somewhere in its body.
//
// The reno-stars production data-loss bug (CEO Ryan PC's long-form
// onboarding-friction message visible live but missing on reload)
// happened because mcp_tools.go:toolSendMessageToUser broadcast WS
// without a paired INSERT — while the HTTP /notify sibling DID
// persist. The fix added the INSERT; this gate prevents the regression
// class from re-emerging in any future chat-bearing tool.
//
// Why an AST gate vs a code-review checklist (per memory
// feedback_behavior_based_ast_gates.md): "pin invariants by what a
// function calls, not what it's named". The shape that loses data is:
//
// BroadcastOnly(_, "AGENT_MESSAGE", _) without an INSERT companion
//
// Any new tool that emits AGENT_MESSAGE must persist or the next
// canvas refresh drops the message — same shape as reno-stars. A
// reviewer can miss this; the AST walk can't.
//
// Allowlist: empty by intent. If a future use case genuinely needs
// fire-and-forget broadcast (e.g., transient typing indicators that
// should NOT survive reload), add an entry here AND document why.
// "Doesn't need to persist" is rarely the right answer for chat —
// the canvas history is the source of truth.
func TestAgentMessageBroadcastsArePersisted(t *testing.T) {
wd, err := os.Getwd()
if err != nil {
t.Fatalf("getwd: %v", err)
}
entries, err := os.ReadDir(wd)
if err != nil {
t.Fatalf("readdir %s: %v", wd, err)
}
type violation struct {
file string
fn string
}
var violations []violation
for _, ent := range entries {
name := ent.Name()
if ent.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") {
continue
}
path := filepath.Join(wd, name)
fset := token.NewFileSet()
file, err := parser.ParseFile(fset, path, nil, parser.ParseComments)
if err != nil {
t.Fatalf("parse %s: %v", path, err)
}
for _, decl := range file.Decls {
fn, ok := decl.(*ast.FuncDecl)
if !ok || fn.Body == nil {
continue
}
if !funcEmitsAgentMessageBroadcast(fn) {
continue
}
if !funcInsertsIntoActivityLogs(fn) {
violations = append(violations, violation{file: name, fn: fn.Name.Name})
}
}
}
if len(violations) > 0 {
sort.Slice(violations, func(i, j int) bool {
if violations[i].file != violations[j].file {
return violations[i].file < violations[j].file
}
return violations[i].fn < violations[j].fn
})
var buf strings.Builder
for _, v := range violations {
buf.WriteString(" - ")
buf.WriteString(v.file)
buf.WriteString(":")
buf.WriteString(v.fn)
buf.WriteString("\n")
}
t.Errorf(`function(s) broadcast `+"`AGENT_MESSAGE`"+` without persisting to activity_logs:
%s
This is the reno-stars data-loss regression class: live message
visible to the user, but missing on reload because activity_log was
never written. Every chat-bearing broadcast MUST be paired with:
INSERT INTO activity_logs (workspace_id, activity_type, method,
summary, response_body, status)
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
See activity.go:Notify and mcp_tools.go:toolSendMessageToUser for
the canonical shapes. Don't add an allowlist entry without a
documented reason — the canvas chat history is the source of truth
and silently dropping messages is a P0 user trust break.`,
buf.String())
}
}
// funcEmitsAgentMessageBroadcast walks fn.Body for any CallExpr that
// looks like `*.BroadcastOnly(_, "AGENT_MESSAGE", _)`.
func funcEmitsAgentMessageBroadcast(fn *ast.FuncDecl) bool {
var found bool
ast.Inspect(fn.Body, func(n ast.Node) bool {
call, ok := n.(*ast.CallExpr)
if !ok {
return true
}
sel, ok := call.Fun.(*ast.SelectorExpr)
if !ok || sel.Sel.Name != "BroadcastOnly" {
return true
}
// BroadcastOnly(workspaceID, eventType, payload) — the second
// arg is the event name. Match by string-literal value.
if len(call.Args) < 2 {
return true
}
lit, ok := call.Args[1].(*ast.BasicLit)
if !ok || lit.Kind != token.STRING {
return true
}
raw := lit.Value
if unq, err := strconv.Unquote(raw); err == nil {
raw = unq
}
if raw == "AGENT_MESSAGE" {
found = true
return false
}
return true
})
return found
}
// funcInsertsIntoActivityLogs walks fn.Body for any STRING BasicLit
// whose body contains `INSERT INTO activity_logs` (the SQL literal
// passed to ExecContext). Matches the substring rather than a strict
// regex because we don't care about the exact INSERT shape here —
// only that the function persists. Specific shape pinning lives in
// the per-handler test (see TestMCPHandler_SendMessageToUser_*).
func funcInsertsIntoActivityLogs(fn *ast.FuncDecl) bool {
var found bool
ast.Inspect(fn.Body, func(n ast.Node) bool {
lit, ok := n.(*ast.BasicLit)
if !ok || lit.Kind != token.STRING {
return true
}
raw := lit.Value
if unq, err := strconv.Unquote(raw); err == nil {
raw = unq
}
if strings.Contains(raw, "INSERT INTO activity_logs") {
found = true
return false
}
return true
})
return found
}
+170 -3
View File
@@ -11,18 +11,21 @@ import (
"os"
"testing"
"errors"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/gin-gonic/gin"
)
// newMCPHandler is a test helper that constructs an MCPHandler backed by the
// sqlmock DB set up by setupTestDB.
// sqlmock DB set up by setupTestDB. Uses newTestBroadcaster so handlers
// that BroadcastOnly (send_message_to_user, etc.) don't nil-panic on the
// hub — events.NewBroadcaster(nil) crashes inside hub.Broadcast.
func newMCPHandler(t *testing.T) (*MCPHandler, sqlmock.Sqlmock) {
t.Helper()
mock := setupTestDB(t)
h := NewMCPHandler(db.DB, events.NewBroadcaster(nil))
h := NewMCPHandler(db.DB, newTestBroadcaster())
return h, mock
}
@@ -628,6 +631,170 @@ func TestMCPHandler_SendMessageToUser_Blocked_WhenEnvNotSet(t *testing.T) {
}
}
// TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s pins the
// "best-effort persistence" contract: when the activity_log INSERT
// fails (DB hiccup, constraint violation, transient connection drop),
// the tool MUST still return success to the agent because the WS
// broadcast already succeeded — the user has seen the message.
//
// This matches /notify (activity.go) behavior. Returning an error
// here would cause the agent to retry and re-broadcast, double-
// rendering the message in the user's live chat panel for every
// retry until the DB recovers.
func TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s(t *testing.T) {
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
h, mock := newMCPHandler(t)
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-err").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
// INSERT fails — must NOT abort the tool response.
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
WillReturnError(errors.New("transient db error"))
w := mcpPost(t, h, "ws-err", map[string]interface{}{
"jsonrpc": "2.0",
"id": 100,
"method": "tools/call",
"params": map[string]interface{}{
"name": "send_message_to_user",
"arguments": map[string]interface{}{
"message": "should not be lost from the live chat",
},
},
})
var resp mcpResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response was not valid JSON-RPC: %v", err)
}
// Tool response is success — INSERT failure logged, broadcast
// already succeeded.
if resp.Error != nil {
t.Errorf("tool response should be success on DB error (broadcast won), got JSON-RPC error: %+v", resp.Error)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("expected DB calls in order: %v", err)
}
}
// TestMCPHandler_SendMessageToUser_ResponseBodyShape pins the
// response_body JSON shape stored in activity_logs. This shape MUST
// match what the canvas hydrater (extractResponseText in
// historyHydration.ts) reads — specifically `{"result": "<text>"}`.
// Any drift in the JSON shape silently breaks chat history without
// failing the INSERT.
//
// Caught the same drift class flagged in
// feedback_assert_exact_not_substring.md: a substring match on
// "result" would pass even if the field were renamed; we assert the
// exact JSON shape.
func TestMCPHandler_SendMessageToUser_ResponseBodyShape(t *testing.T) {
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
h, mock := newMCPHandler(t)
const userMessage = "Hi there from the agent"
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-shape").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
// Capture the response_body argument and assert its exact shape.
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
WithArgs(
"ws-shape",
sqlmock.AnyArg(), // summary
// The response_body MUST be JSON `{"result": "<message>"}`.
// Any other shape (e.g., wrapping in a Task object) breaks
// the canvas hydrater's `body.result` extractor.
`{"result":"`+userMessage+`"}`,
).
WillReturnResult(sqlmock.NewResult(1, 1))
w := mcpPost(t, h, "ws-shape", map[string]interface{}{
"jsonrpc": "2.0",
"id": 101,
"method": "tools/call",
"params": map[string]interface{}{
"name": "send_message_to_user",
"arguments": map[string]interface{}{
"message": userMessage,
},
},
})
if w.Code != 200 {
t.Fatalf("expected 200, got %d body=%s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("response_body shape drift — would silently break canvas chat history: %v", err)
}
}
// TestMCPHandler_SendMessageToUser_PersistsToActivityLog pins the fix
// for the reno-stars / CEO Ryan PC chat-history data-loss bug:
// external claude-code agents using molecule-mcp's send_message_to_user
// tool route through THIS handler (not the HTTP /notify endpoint),
// and the handler used to broadcast WS only — visible live, gone on
// reload because nothing wrote to activity_logs.
//
// Pins:
// - INSERT happens on the success path (broadcast + DB write).
// - INSERT shape mirrors the HTTP /notify handler exactly:
// activity_type='a2a_receive', method='notify', request_body NULL,
// response_body={"result": message}, status='ok'. The canvas
// hydration query (`type=a2a_receive&source=canvas`) treats
// both writers as the same shape — drift here means the bug
// re-surfaces silently.
func TestMCPHandler_SendMessageToUser_PersistsToActivityLog(t *testing.T) {
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
h, mock := newMCPHandler(t)
// Workspace lookup — the handler verifies the workspace exists
// before it does anything else. Returning a name lets the
// broadcast payload populate; the test doesn't assert on the
// broadcast (no observable WS in this fake), only on the DB.
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-msg").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
// The persistence INSERT — pin the exact shape so a future
// refactor that switches columns or drops `method='notify'`
// breaks the test loud, not silently. Match by regex on the
// table + activity_type + method literals.
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
WithArgs(
"ws-msg",
sqlmock.AnyArg(), // summary "Agent message: ..."
sqlmock.AnyArg(), // response_body JSON
).
WillReturnResult(sqlmock.NewResult(1, 1))
w := mcpPost(t, h, "ws-msg", map[string]interface{}{
"jsonrpc": "2.0",
"id": 99,
"method": "tools/call",
"params": map[string]interface{}{
"name": "send_message_to_user",
"arguments": map[string]interface{}{
"message": "Hello, this should persist!",
},
},
})
var resp mcpResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response was not valid JSON-RPC: %v\nbody=%s", err, w.Body.String())
}
if resp.Error != nil {
t.Errorf("unexpected JSON-RPC error: %+v", resp.Error)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("DB expectations not met (INSERT missing → reno-stars data-loss regression): %v", err)
}
}
// ─────────────────────────────────────────────────────────────────────────────
// Parse error
// ─────────────────────────────────────────────────────────────────────────────
@@ -344,6 +344,43 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri
"name": wsName,
})
// Persist to activity_logs so chat history loaders surface this
// message after a page reload. Pre-fix (reno-stars 2026-05-05),
// the MCP-bridge variant of send_message_to_user broadcast WS
// only — visible live, gone on reload — while the HTTP /notify
// sibling already had this fix (activity.go:535). External
// claude-code agents using molecule-mcp's send_message_to_user
// tool route through THIS handler, not /notify, so they were
// hitting the unfixed path.
//
// Shape mirrors activity.go's Notify handler exactly so the
// canvas chat-history hydration treats both the same:
// - activity_type='a2a_receive' joins the source=canvas filter
// - source_id is omitted → defaults to NULL ("canvas-source")
// - method='notify' tags it as a push (vs a real A2A receive)
// - request_body=NULL so the loader doesn't draw a duplicate
// "user" bubble
// - response_body={"result": "<text>"} feeds extractResponseText
// directly
//
// Errors are log-only — the broadcast already returned ok to the
// caller, the user has seen the message, and the persistence
// failure mode (DB hiccup) shouldn't block the tool call. The
// downside is the same as pre-fix: message vanishes on reload
// after a transient DB error. Log it so operators have a signal.
respPayload := map[string]interface{}{"result": message}
respJSON, _ := json.Marshal(respPayload)
preview := message
if len(preview) > 80 {
preview = preview[:80] + "…"
}
if _, err := h.database.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status)
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
`, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil {
log.Printf("MCP send_message_to_user: failed to persist for %s: %v", workspaceID, err)
}
return "Message sent.", nil
}
+10 -1
View File
@@ -425,7 +425,16 @@ def _build_initialize_result() -> dict:
"tools": {"listChanged": False},
"experimental": {"claude/channel": {}},
},
"serverInfo": {"name": "a2a-delegation", "version": "1.0.0"},
# Identifier convention: this server is what users register with
# `claude mcp add molecule -- molecule-mcp` (and similar across
# other MCP hosts), so the canonical name is "molecule". Earlier
# versions reported "a2a-delegation" — accurate to the original
# purpose but a mismatch with how operators actually name it.
# Mismatch is harmless on tool routing (all MCP hosts dispatch
# by the user-supplied registration name, NOT serverInfo.name)
# but matters for any future Claude Code allowlist that gates
# channel push by hardcoded server name (issue #2934).
"serverInfo": {"name": "molecule", "version": "1.0.0"},
# Built per-call (not the module-level constant) so an operator
# who sets MOLECULE_MCP_POLL_TIMEOUT_SECS after import — e.g.
# via a wrapper script that exports then re-imports — sees
+11 -123
View File
@@ -154,127 +154,15 @@ from a2a_tools_memory import ( # noqa: E402 (import after the top-of-module im
)
# ---------------------------------------------------------------------------
# Inbox tools — inbound delivery for the standalone molecule-mcp path.
# ---------------------------------------------------------------------------
#
# The InboxState singleton is set by mcp_cli before the MCP server starts
# (see workspace/inbox.py for the rationale). In-container runtimes never
# call ``inbox.activate(...)``, so ``inbox.get_state()`` returns None and
# these tools surface an informational error rather than raising.
#
# When-to-use guidance (mirrored in platform_tools/registry.py): agents
# in standalone-runtime mode should call ``wait_for_message`` to block
# on the next inbound message after they've emitted a reply, forming
# the loop ``wait → respond → wait``. ``inbox_peek`` is for inspecting
# the queue without consuming; ``inbox_pop`` removes a handled message.
_INBOX_NOT_ENABLED_MSG = (
"Error: inbox polling is not enabled in this runtime. The standalone "
"molecule-mcp wrapper activates it; in-container runtimes receive "
"messages via push delivery and do not need these tools."
# Inbox tool handlers — extracted to a2a_tools_inbox (RFC #2873 iter 4e).
# Re-imported here so call sites + tests that reference
# ``a2a_tools.tool_inbox_peek`` / ``tool_inbox_pop`` / ``tool_wait_for_message``
# / ``_enrich_inbound_for_agent`` / ``_INBOX_NOT_ENABLED_MSG`` keep
# resolving identically.
from a2a_tools_inbox import ( # noqa: E402 (import after the top-of-module imports)
_INBOX_NOT_ENABLED_MSG,
_enrich_inbound_for_agent,
tool_inbox_peek,
tool_inbox_pop,
tool_wait_for_message,
)
def _enrich_inbound_for_agent(d: dict) -> dict:
"""Add peer_name / peer_role / agent_card_url to a poll-path message.
The PUSH path (a2a_mcp_server._build_channel_notification) already
enriches the meta dict with these fields, so a Claude Code host
with channel-push sees them. The POLL path goes through
InboxMessage.to_dict, which is intentionally identity-free (the
storage layer doesn't know about the registry cache). Without this
helper, every non-Claude-Code MCP client that uses inbox_peek /
wait_for_message gets a plain message and the receiving agent
can't tell who's writing — breaking the contract documented in
a2a_mcp_server.py:303-345 ("In both paths the same fields apply").
Cache-first non-blocking enrichment (same shape as push): on cache
miss the helper returns the bare message; the next call within the
5-min TTL hits the warm cache. Failure to enrich is non-fatal —
the agent still gets text + peer_id + kind + activity_id, just
without the friendly identity.
"""
peer_id = d.get("peer_id") or ""
if not peer_id:
# canvas_user — no peer to enrich; helper returns the plain
# message unchanged so the canvas reply path still works.
return d
try:
from a2a_client import ( # local import — avoid module-load cycle
_agent_card_url_for,
enrich_peer_metadata_nonblocking,
)
except Exception: # noqa: BLE001
# If a2a_client is unavailable (test harness, partial install),
# degrade gracefully — agent still gets the bare envelope.
return d
record = enrich_peer_metadata_nonblocking(peer_id)
if record is not None:
if name := record.get("name"):
d["peer_name"] = name
if role := record.get("role"):
d["peer_role"] = role
# agent_card_url is constructable from peer_id alone — surface it
# even when registry enrichment misses, so the receiving agent has
# a single endpoint to hit for the peer's full capability list.
d["agent_card_url"] = _agent_card_url_for(peer_id)
return d
async def tool_inbox_peek(limit: int = 10) -> str:
"""Return up to ``limit`` pending inbound messages without removing them."""
import inbox # local import — avoids a circular dep at module load
state = inbox.get_state()
if state is None:
return _INBOX_NOT_ENABLED_MSG
messages = state.peek(limit=limit if isinstance(limit, int) else 10)
return json.dumps([_enrich_inbound_for_agent(m.to_dict()) for m in messages])
async def tool_inbox_pop(activity_id: str) -> str:
"""Remove a message from the inbox queue by activity_id."""
import inbox
state = inbox.get_state()
if state is None:
return _INBOX_NOT_ENABLED_MSG
if not isinstance(activity_id, str) or not activity_id:
return "Error: activity_id is required."
removed = state.pop(activity_id)
if removed is None:
return json.dumps({"removed": False, "activity_id": activity_id})
return json.dumps({"removed": True, "activity_id": activity_id})
async def tool_wait_for_message(timeout_secs: float = 60.0) -> str:
"""Block until a new message arrives or ``timeout_secs`` elapses.
Returns the head message non-destructively; the agent decides
whether to ``inbox_pop`` it after acting.
"""
import asyncio
import inbox
state = inbox.get_state()
if state is None:
return _INBOX_NOT_ENABLED_MSG
try:
timeout = float(timeout_secs)
except (TypeError, ValueError):
timeout = 60.0
# Cap at 300s — Claude Code's default tool timeout is ~10min, and
# blocking longer than 5min wastes the prompt cache window for
# nothing useful. Operators who want longer can call repeatedly.
timeout = max(0.0, min(timeout, 300.0))
# The threading.Event-based wait would block the asyncio loop.
# Run it on the default executor so the MCP server can keep
# processing other JSON-RPC requests while we sleep.
loop = asyncio.get_running_loop()
message = await loop.run_in_executor(None, state.wait, timeout)
if message is None:
return json.dumps({"timeout": True, "timeout_secs": timeout})
return json.dumps(_enrich_inbound_for_agent(message.to_dict()))
+140
View File
@@ -0,0 +1,140 @@
"""Inbox tool handlers — single-concern slice of the a2a_tools surface.
Standalone-runtime path for inbound-message delivery (push-mode runtimes
get messages via the channel-tag synthesis in a2a_mcp_server). The
``InboxState`` singleton is set by ``mcp_cli`` before the MCP server
starts; in-container runtimes never call ``inbox.activate(...)`` so
``inbox.get_state()`` returns None and these tools surface an
informational error instead of raising.
When-to-use guidance for agents (mirrored in
``platform_tools/registry.py``):
- ``wait_for_message``: block until a new inbound message arrives, then
decide what to do with it; forms the loop ``wait → respond → wait``.
- ``inbox_peek``: inspect the queue non-destructively.
- ``inbox_pop``: remove a handled message by activity_id.
Extracted from ``a2a_tools.py`` in RFC #2873 iter 4e so the kitchen-sink
module shrinks to a back-compat shim. The extraction also makes the
``_enrich_inbound_for_agent`` helper unit-testable in isolation —
previously it was buried in ``a2a_tools`` and only exercised through
the inbox wrappers, leaving its peer-id-empty / cache-miss / registry-
unavailable branches under-covered.
"""
from __future__ import annotations
import asyncio
import json
# Surfaced when the inbox subsystem is not initialised. Returned by the
# three inbox tool wrappers below so the agent gets a clear "this
# runtime delivers via push" message instead of a NameError.
_INBOX_NOT_ENABLED_MSG = (
"Error: inbox polling is not enabled in this runtime. The standalone "
"molecule-mcp wrapper activates it; in-container runtimes receive "
"messages via push delivery and do not need these tools."
)
def _enrich_inbound_for_agent(d: dict) -> dict:
"""Add peer_name / peer_role / agent_card_url to a poll-path message.
The PUSH path (a2a_mcp_server._build_channel_notification) already
enriches the meta dict with these fields, so a Claude Code host
with channel-push sees them. The POLL path goes through
InboxMessage.to_dict, which is intentionally identity-free (the
storage layer doesn't know about the registry cache). Without this
helper, every non-Claude-Code MCP client that uses inbox_peek /
wait_for_message gets a plain message and the receiving agent
can't tell who's writing — breaking the contract documented in
a2a_mcp_server.py:303-345 ("In both paths the same fields apply").
Cache-first non-blocking enrichment (same shape as push): on cache
miss the helper returns the bare message; the next call within the
5-min TTL hits the warm cache. Failure to enrich is non-fatal —
the agent still gets text + peer_id + kind + activity_id, just
without the friendly identity.
"""
peer_id = d.get("peer_id") or ""
if not peer_id:
# canvas_user — no peer to enrich; helper returns the plain
# message unchanged so the canvas reply path still works.
return d
try:
from a2a_client import ( # local import — avoid module-load cycle
_agent_card_url_for,
enrich_peer_metadata_nonblocking,
)
except Exception: # noqa: BLE001
# If a2a_client is unavailable (test harness, partial install),
# degrade gracefully — agent still gets the bare envelope.
return d
record = enrich_peer_metadata_nonblocking(peer_id)
if record is not None:
if name := record.get("name"):
d["peer_name"] = name
if role := record.get("role"):
d["peer_role"] = role
# agent_card_url is constructable from peer_id alone — surface it
# even when registry enrichment misses, so the receiving agent has
# a single endpoint to hit for the peer's full capability list.
d["agent_card_url"] = _agent_card_url_for(peer_id)
return d
async def tool_inbox_peek(limit: int = 10) -> str:
"""Return up to ``limit`` pending inbound messages without removing them."""
import inbox # local import — avoids a circular dep at module load
state = inbox.get_state()
if state is None:
return _INBOX_NOT_ENABLED_MSG
messages = state.peek(limit=limit if isinstance(limit, int) else 10)
return json.dumps([_enrich_inbound_for_agent(m.to_dict()) for m in messages])
async def tool_inbox_pop(activity_id: str) -> str:
"""Remove a message from the inbox queue by activity_id."""
import inbox
state = inbox.get_state()
if state is None:
return _INBOX_NOT_ENABLED_MSG
if not isinstance(activity_id, str) or not activity_id:
return "Error: activity_id is required."
removed = state.pop(activity_id)
if removed is None:
return json.dumps({"removed": False, "activity_id": activity_id})
return json.dumps({"removed": True, "activity_id": activity_id})
async def tool_wait_for_message(timeout_secs: float = 60.0) -> str:
"""Block until a new message arrives or ``timeout_secs`` elapses.
Returns the head message non-destructively; the agent decides
whether to ``inbox_pop`` it after acting.
"""
import inbox
state = inbox.get_state()
if state is None:
return _INBOX_NOT_ENABLED_MSG
try:
timeout = float(timeout_secs)
except (TypeError, ValueError):
timeout = 60.0
# Cap at 300s — Claude Code's default tool timeout is ~10min, and
# blocking longer than 5min wastes the prompt cache window for
# nothing useful. Operators who want longer can call repeatedly.
timeout = max(0.0, min(timeout, 300.0))
# The threading.Event-based wait would block the asyncio loop.
# Run it on the default executor so the MCP server can keep
# processing other JSON-RPC requests while we sleep.
loop = asyncio.get_running_loop()
message = await loop.run_in_executor(None, state.wait, timeout)
if message is None:
return json.dumps({"timeout": True, "timeout_secs": timeout})
return json.dumps(_enrich_inbound_for_agent(message.to_dict()))
+51 -4
View File
@@ -35,9 +35,15 @@ def resolve_workspaces() -> tuple[list[tuple[str, str]], list[str]]:
N workspaces). When set, ``WORKSPACE_ID`` / ``MOLECULE_WORKSPACE_TOKEN``
are IGNORED — the JSON is the source of truth.
2. Single-workspace fallback — ``WORKSPACE_ID`` env var + token from
``MOLECULE_WORKSPACE_TOKEN`` or ``${CONFIGS_DIR}/.auth_token``.
This is the pre-existing path; back-compat exact.
2. Single-workspace fallback — ``WORKSPACE_ID`` env var + token
resolved in this order:
a. ``MOLECULE_WORKSPACE_TOKEN`` (inline env — convenient but
leaks into shell history + plaintext MCP-host config).
b. ``MOLECULE_WORKSPACE_TOKEN_FILE`` (path to a file holding
the token — operator can keep it 0600 in their home dir;
survives shell-history scrubs).
c. ``${CONFIGS_DIR}/.auth_token`` (in-container runtimes —
the platform writes this on provision).
Returns ``(workspaces, errors)``:
* ``workspaces``: list of ``(workspace_id, token)`` — non-empty
@@ -98,16 +104,47 @@ def resolve_workspaces() -> tuple[list[tuple[str, str]], list[str]]:
wsid = os.environ.get("WORKSPACE_ID", "").strip()
if not wsid:
return [], ["WORKSPACE_ID (or MOLECULE_WORKSPACES) is required"]
# Token resolution order (#2934): inline env → file path → CONFIGS_DIR
# default. The file-path option exists so operators can keep the
# bearer out of shell history and out of MCP-host config plaintext
# (e.g. ~/.claude.json) — set MOLECULE_WORKSPACE_TOKEN_FILE to a
# 0600 file containing the token. The CONFIGS_DIR/.auth_token
# fallback predates this and stays for in-container runtimes.
tok = os.environ.get("MOLECULE_WORKSPACE_TOKEN", "").strip()
if not tok:
tok = _read_token_from_file_env()
if not tok:
tok = read_token_file()
if not tok:
return [], [
"MOLECULE_WORKSPACE_TOKEN (or CONFIGS_DIR/.auth_token) is required"
"MOLECULE_WORKSPACE_TOKEN, MOLECULE_WORKSPACE_TOKEN_FILE, or "
"CONFIGS_DIR/.auth_token is required"
]
return [(wsid, tok)], []
def _read_token_from_file_env() -> str:
"""Read the token from the file path in MOLECULE_WORKSPACE_TOKEN_FILE.
Returns "" on:
- env var unset / blank
- file not found, unreadable, or empty
- any OSError on read
Empty-on-failure (rather than raising) lets the resolver fall through
to the CONFIGS_DIR fallback. The caller surfaces the combined "no
token" error if every source is empty.
"""
path = os.environ.get("MOLECULE_WORKSPACE_TOKEN_FILE", "").strip()
if not path:
return ""
try:
with open(path, encoding="utf-8") as fh:
return fh.read().strip()
except OSError:
return ""
def print_missing_env_help(missing: list[str], have_token_file: bool) -> None:
print("molecule-mcp: missing required environment.\n", file=sys.stderr)
print("Set the following before running molecule-mcp:", file=sys.stderr)
@@ -123,6 +160,16 @@ def print_missing_env_help(missing: list[str], have_token_file: bool) -> None:
"(canvas → Tokens tab)",
file=sys.stderr,
)
print(
" OR set MOLECULE_WORKSPACE_TOKEN_FILE"
" to a path that holds the token",
file=sys.stderr,
)
print(
" (keeps the secret out of shell"
" history and MCP-host config plaintext)",
file=sys.stderr,
)
print("", file=sys.stderr)
print(f"Currently missing: {', '.join(missing)}", file=sys.stderr)
@@ -0,0 +1,181 @@
"""Drift gate + import-contract tests for ``a2a_tools_inbox`` (RFC #2873 iter 4e).
The full behavior matrix for the three inbox tool wrappers lives in
``test_a2a_tools_inbox_wrappers.py`` (kept on the public ``a2a_tools``
module so the same tests pin both the alias and the underlying impl).
This file pins:
1. **Drift gate** — every previously-public symbol on ``a2a_tools``
(``tool_inbox_peek``, ``tool_inbox_pop``, ``tool_wait_for_message``,
``_enrich_inbound_for_agent``, ``_INBOX_NOT_ENABLED_MSG``) is the
EXACT same object as ``a2a_tools_inbox.foo``. Refactor wrapping
silently loses existing test coverage; this gate makes that drift
fail fast.
2. **Import contract** — ``a2a_tools_inbox`` does NOT pull in
``a2a_tools`` at module-load time (the layered architecture: it
depends only on stdlib + a lazy import of ``inbox`` + a lazy
import of ``a2a_client``, never the kitchen-sink module that
re-exports it).
3. **_enrich_inbound_for_agent** branches that the wrapper tests
can't easily reach: peer_id-empty (canvas_user) returns the
dict unchanged; a2a_client unavailable degrades gracefully.
"""
from __future__ import annotations
import sys
import pytest
@pytest.fixture(autouse=True)
def _require_workspace_id(monkeypatch):
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
monkeypatch.setenv("PLATFORM_URL", "http://test.invalid")
yield
# ============== Drift gate ==============
class TestBackCompatAliases:
def test_tool_inbox_peek_alias(self):
import a2a_tools
import a2a_tools_inbox
assert a2a_tools.tool_inbox_peek is a2a_tools_inbox.tool_inbox_peek
def test_tool_inbox_pop_alias(self):
import a2a_tools
import a2a_tools_inbox
assert a2a_tools.tool_inbox_pop is a2a_tools_inbox.tool_inbox_pop
def test_tool_wait_for_message_alias(self):
import a2a_tools
import a2a_tools_inbox
assert (
a2a_tools.tool_wait_for_message is a2a_tools_inbox.tool_wait_for_message
)
def test_enrich_helper_alias(self):
import a2a_tools
import a2a_tools_inbox
assert (
a2a_tools._enrich_inbound_for_agent
is a2a_tools_inbox._enrich_inbound_for_agent
)
def test_inbox_not_enabled_msg_alias(self):
import a2a_tools
import a2a_tools_inbox
assert (
a2a_tools._INBOX_NOT_ENABLED_MSG is a2a_tools_inbox._INBOX_NOT_ENABLED_MSG
)
# ============== Import contract ==============
class TestImportContract:
def test_inbox_module_does_not_import_a2a_tools_eagerly(self):
# Force a fresh load of a2a_tools_inbox without a2a_tools in sight.
for k in [k for k in list(sys.modules) if k in (
"a2a_tools_inbox", "a2a_tools",
)]:
sys.modules.pop(k, None)
import a2a_tools_inbox # noqa: F401 — load only
# a2a_tools_inbox MUST NOT have caused a2a_tools to load. The
# extracted module sits BELOW the kitchen-sink in the layering;
# the dependency arrow points the other direction.
assert "a2a_tools" not in sys.modules, (
"a2a_tools_inbox eagerly imported a2a_tools — the kitchen-sink "
"module must not be a load-time dependency of its slices."
)
# ============== _enrich_inbound_for_agent branches ==============
class TestEnrichInboundForAgent:
def test_canvas_user_returns_dict_unchanged(self):
# peer_id empty → canvas_user → no enrichment, no a2a_client touch.
from a2a_tools_inbox import _enrich_inbound_for_agent
msg = {"activity_id": "a-1", "kind": "canvas_user", "peer_id": ""}
result = _enrich_inbound_for_agent(msg)
assert result is msg # same dict, mutated in place if at all
assert "peer_name" not in result
assert "peer_role" not in result
assert "agent_card_url" not in result
def test_missing_peer_id_key_returns_unchanged(self):
from a2a_tools_inbox import _enrich_inbound_for_agent
msg = {"activity_id": "a-2", "kind": "canvas_user"} # no peer_id key
result = _enrich_inbound_for_agent(msg)
assert result is msg
assert "agent_card_url" not in result
def test_a2a_client_unavailable_degrades_gracefully(self, monkeypatch):
# Simulate a2a_client import failing (test harness, partial
# install). The helper must return the bare envelope, not raise.
from a2a_tools_inbox import _enrich_inbound_for_agent
# Force an ImportError by poisoning sys.modules.
import builtins
real_import = builtins.__import__
def fake_import(name, *args, **kwargs):
if name == "a2a_client":
raise ImportError("simulated a2a_client unavailable")
return real_import(name, *args, **kwargs)
monkeypatch.setattr(builtins, "__import__", fake_import)
msg = {"activity_id": "a-3", "kind": "peer_agent", "peer_id": "ws-x"}
result = _enrich_inbound_for_agent(msg)
# Bare envelope back — no peer_name, no agent_card_url. Crucially
# the helper did NOT raise, so the inbox tool surfaces the message
# to the agent even when the registry is unreachable.
assert result is msg
assert "peer_name" not in result
assert "agent_card_url" not in result
def test_registry_record_populates_peer_name_and_role(self, monkeypatch):
from a2a_tools_inbox import _enrich_inbound_for_agent
# Stub out the lazy-imported a2a_client functions.
import sys
import types
fake_a2a_client = types.SimpleNamespace(
_agent_card_url_for=lambda pid: f"http://test/agent/{pid}",
enrich_peer_metadata_nonblocking=lambda pid: {
"name": "PeerOne",
"role": "worker",
},
)
monkeypatch.setitem(sys.modules, "a2a_client", fake_a2a_client)
msg = {"activity_id": "a-4", "kind": "peer_agent", "peer_id": "ws-1"}
result = _enrich_inbound_for_agent(msg)
assert result["peer_name"] == "PeerOne"
assert result["peer_role"] == "worker"
assert result["agent_card_url"] == "http://test/agent/ws-1"
def test_registry_miss_keeps_agent_card_url(self, monkeypatch):
# On registry cache miss the helper still surfaces agent_card_url
# because it's constructable from peer_id alone — preserves the
# contract that the receiving agent always has somewhere to
# fetch the peer's full capability list.
from a2a_tools_inbox import _enrich_inbound_for_agent
import sys
import types
fake_a2a_client = types.SimpleNamespace(
_agent_card_url_for=lambda pid: f"http://test/agent/{pid}",
enrich_peer_metadata_nonblocking=lambda pid: None, # cache miss
)
monkeypatch.setitem(sys.modules, "a2a_client", fake_a2a_client)
msg = {"activity_id": "a-5", "kind": "peer_agent", "peer_id": "ws-2"}
result = _enrich_inbound_for_agent(msg)
assert "peer_name" not in result
assert "peer_role" not in result
assert result["agent_card_url"] == "http://test/agent/ws-2"
+84
View File
@@ -229,3 +229,87 @@ class TestResolveWorkspacesDirect:
out, errors = mcp_workspace_resolver.resolve_workspaces()
assert out == [("ws-a", "a"), ("ws-b", "b")]
assert errors == []
# ============== Token-from-file env var (issue #2934) ==============
class TestTokenFileEnv:
"""``MOLECULE_WORKSPACE_TOKEN_FILE`` lets operators keep the bearer
out of shell history and out of MCP-host config plaintext (e.g.
~/.claude.json). Resolution order: inline TOKEN env > TOKEN_FILE
env > ${CONFIGS_DIR}/.auth_token.
"""
@pytest.fixture(autouse=True)
def _isolate(self, monkeypatch, tmp_path):
for v in (
"WORKSPACE_ID",
"MOLECULE_WORKSPACE_TOKEN",
"MOLECULE_WORKSPACE_TOKEN_FILE",
"MOLECULE_WORKSPACES",
):
monkeypatch.delenv(v, raising=False)
# Point CONFIGS_DIR at an empty tmp_path so the .auth_token
# fallback returns "" — keeps the test cases unambiguous.
monkeypatch.setenv("CONFIGS_DIR", str(tmp_path))
yield tmp_path
def test_token_file_env_resolves(self, monkeypatch, tmp_path):
token_path = tmp_path / "token.txt"
token_path.write_text("file-tok-123\n") # trailing newline must strip
monkeypatch.setenv("WORKSPACE_ID", "ws-1")
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN_FILE", str(token_path))
out, errors = mcp_workspace_resolver.resolve_workspaces()
assert out == [("ws-1", "file-tok-123")]
assert errors == []
def test_inline_token_takes_precedence_over_file(self, monkeypatch, tmp_path):
# If both env vars are set, inline wins — matches the docstring's
# documented order. (Operators sometimes set both during a
# rotation; we want predictable behavior.)
token_path = tmp_path / "token.txt"
token_path.write_text("file-tok")
monkeypatch.setenv("WORKSPACE_ID", "ws-1")
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN", "inline-tok")
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN_FILE", str(token_path))
out, _ = mcp_workspace_resolver.resolve_workspaces()
assert out == [("ws-1", "inline-tok")]
def test_missing_file_falls_through_to_error(self, monkeypatch, tmp_path):
# Pointed at a non-existent path — resolver should return the
# combined "no token" error, NOT crash.
monkeypatch.setenv("WORKSPACE_ID", "ws-1")
monkeypatch.setenv(
"MOLECULE_WORKSPACE_TOKEN_FILE", str(tmp_path / "does-not-exist")
)
out, errors = mcp_workspace_resolver.resolve_workspaces()
assert out == []
assert any("MOLECULE_WORKSPACE_TOKEN_FILE" in e for e in errors)
def test_empty_file_falls_through_to_error(self, monkeypatch, tmp_path):
# File exists but is blank — same shape as no token at all.
token_path = tmp_path / "empty.txt"
token_path.write_text("")
monkeypatch.setenv("WORKSPACE_ID", "ws-1")
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN_FILE", str(token_path))
out, errors = mcp_workspace_resolver.resolve_workspaces()
assert out == []
assert errors # at least one combined error message
def test_blank_env_var_treated_as_unset(self, monkeypatch):
# Empty string is treated as "not set" — common pitfall when
# users export an unset shell var.
monkeypatch.setenv("WORKSPACE_ID", "ws-1")
monkeypatch.setenv("MOLECULE_WORKSPACE_TOKEN_FILE", "")
out, errors = mcp_workspace_resolver.resolve_workspaces()
assert out == []
assert errors
def test_help_message_advertises_token_file(self, capsys):
# Help text must mention TOKEN_FILE so a first-run operator
# learns about the safer option without grepping the source.
mcp_workspace_resolver.print_missing_env_help(
["WORKSPACE_ID", "MOLECULE_WORKSPACE_TOKEN"], have_token_file=False
)
err = capsys.readouterr().err
assert "MOLECULE_WORKSPACE_TOKEN_FILE" in err