Compare commits

...

27 Commits

Author SHA1 Message Date
molecule-ai[bot] 02960209a0 Merge pull request #2768 from Molecule-AI/staging
staging → main: auto-promote f70071e
2026-05-04 14:34:09 -07:00
Hongming Wang 89bdf29d6f Merge pull request #2766 from Molecule-AI/feat/mcp-multi-ws-tool-routing
feat(mcp): multi-workspace routing for memory/chat_history/workspace_info (PR-3)
2026-05-04 21:20:22 +00:00
Hongming Wang 700d44ec3d feat(mcp): multi-workspace routing for memory + chat_history + workspace_info
PR-3 of the multi-workspace MCP rollout. PR-1 made the MCP server itself
multi-workspace aware (one process, N workspace memberships). PR-2 added
source_workspace_id threading to delegate_task / list_peers. This change
closes the remaining workspace-scoped tools so a single agent registered
into multiple workspaces no longer leaks memories or chat history across
tenants.

Tools now accepting `source_workspace_id`:

- tool_commit_memory(content, scope, source_workspace_id=None) —
  routes POST to /workspaces/{src}/memories with the source workspace's
  Bearer token. Body still embeds source_workspace_id for the platform's
  audit + namespace-isolation enforcement.

- tool_recall_memory(query, scope, source_workspace_id=None) —
  GET /workspaces/{src}/memories with the source workspace's token and
  ?workspace_id={src} query so the platform scopes the read to the
  caller's tenant view (PR-1 / multi-workspace mode).

- tool_chat_history(peer_id, limit, before_ts, source_workspace_id=None)
  — auto-routes via the _peer_to_source cache populated by list_peers,
  with explicit override winning. Falls back to module-level WORKSPACE_ID
  if neither is available. URL: /workspaces/{src}/chat-history.

- tool_get_workspace_info(source_workspace_id=None) — GET /workspaces/{src}
  with the source workspace's token. Useful for introspecting any
  workspace the agent is registered into, not just the primary.

In every path, `src = source_workspace_id or WORKSPACE_ID`, so
single-workspace operators see no behavior change. Tokens are resolved
per-workspace via auth_headers(src) / _auth_headers_for_heartbeat(src),
which fall through to the legacy AUTH_TOKEN env when not in
multi-workspace mode.

Also updates input_schemas in platform_tools/registry.py so the new
optional parameter is advertised to LLM clients (claude-code,
hermes-agent, langchain wrappers).

Tests (4 new classes in test_a2a_multi_workspace.py, 21 new tests):
- TestCommitMemorySourceRouting — URL + Authorization header per source
- TestRecallMemorySourceRouting — URL + query param + Authorization
- TestChatHistorySourceRouting — peer-cache auto-route + explicit override
- TestGetWorkspaceInfoSourceRouting — URL + Authorization

Inbox tools (peek/pop/wait_for_message) already multi-workspace aware
since PR-1 — inbox.py spawns per-workspace pollers and tags every
InboxMessage with arrival_workspace_id. No further plumbing needed.

Suite: 1700 passed, 3 skipped, 2 xfailed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 14:17:58 -07:00
Hongming Wang f70071e1e1 Merge pull request #2765 from Molecule-AI/fix/isolate-adapter-failures-from-card
fix(runtime): isolate card-skill enrichment + transcript handler from adapter shape mismatch
2026-05-04 21:17:56 +00:00
Hongming Wang 63ac99788b fix(runtime): isolate card-skill enrichment + transcript handler from adapter shape mismatch
PR #2756 added a try/except around adapter.setup() so a missing LLM key
doesn't crash the workspace boot. Two paths that now run AFTER setup
succeeds were not similarly isolated, leaving small but real coupling
risks for future adapter authors.

1. **Skill metadata enrichment swap (main.py:248-259).** When
   adapter.setup() returns, main.py reads adapter.loaded_skills and
   replaces the static stubs in agent_card.skills with rich metadata
   (description, tags, examples). The list comprehension assumes each
   element exposes .metadata.{id,name,description,tags,examples}. A
   future adapter that returns a non-canonical shape would raise
   AttributeError, propagate to the outer except, capture as
   adapter_error, and silently degrade an OK boot to the
   not-configured state — even though setup() actually succeeded.

   Extract to card_helpers.enrich_card_skills(card, loaded_skills) →
   bool. Helper swallows enrichment failures, logs the cause, returns
   False, leaves the static stubs in place. setup() success path
   continues unchanged. 6 unit tests cover: None input, empty list,
   canonical happy path, missing .metadata attr, partial .metadata
   (missing one canonical field), atomic-failure-no-partial-swap.

2. **/transcript handler (main.py:513).** Calls await
   adapter.transcript_lines(...) without try/except. BaseAdapter's
   default returns {"supported": false} so today's 4 adapters never
   trigger this — but a future adapter override that assumes setup()
   ran would surface as a 500 from Starlette's default error handler
   instead of a useful 503 with the exception class + message.
   Inline try/except returns 503 with the reason, matching the
   not-configured JSON-RPC handler's pattern.

Both changes match the architectural principle the PR #2756 chain
established: availability (workspace reachable) is decoupled from
configuration / adapter behavior. Operators see useful errors instead
of silent degradation; future adapter authors can't accidentally
break tenant readiness with a shape mismatch.

Adds:
- workspace/card_helpers.py (~50 lines, 100% covered)
- workspace/tests/test_card_helpers.py (6 tests)
- AgentCard/AgentSkill/AgentCapabilities/AgentInterface stubs to
  workspace/tests/conftest.py so future card-related tests work
  under the existing a2a-mock infrastructure
- card_helpers in TOP_LEVEL_MODULES (drift gate would have caught it)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 14:15:27 -07:00
Hongming Wang 28472f0d2d Merge pull request #2764 from Molecule-AI/auto-sync/main-f42feb4e
chore: sync main → staging (auto, ff to f42feb4e)
2026-05-04 19:51:06 +00:00
molecule-ai[bot] f42feb4ed7 Merge pull request #2763 from Molecule-AI/staging
staging → main: auto-promote 99e7f13
2026-05-04 19:35:21 +00:00
Hongming Wang 99e7f13149 Merge pull request #2762 from Molecule-AI/fix/preflight-env-warn-not-fail
fix(preflight): downgrade required_env + auth_token failures to warnings
2026-05-04 19:23:06 +00:00
Hongming Wang 6488ba09e7 fix(preflight): downgrade required_env + auth_token failures to warnings
Preflight was hard-failing the workspace boot when required env vars or
legacy auth_token_files were missing, raising SystemExit(1) before
main.py's PR #2756 try/except could mount the not-configured handler.
Result: codex/openclaw workspaces launched without OPENAI_API_KEY were
INVISIBLE — `/.well-known/agent-card.json` never returned 200, the bench
timed out at 600s, canvas had no actionable signal. PR #2756 fixed half
the puzzle (decouple agent-card from adapter.setup() failure); this
fixes the other half (decouple from preflight failure).

Caught by bench-provision-time run 25335853189 on 2026-05-04: codex and
openclaw both timed_out at 609s while claude-code (whose default model
needs no env) hit 86.7s on the same AMI. Hermes hit 147s because hermes
config doesn't declare top-level required_env.

After this change:
- Missing required_env: WARN (operator sees it in boot logs); workspace
  proceeds to adapter.setup() which raises with the same env-name detail;
  PR #2756's try/except mounts the not-configured handler;
  /.well-known/agent-card.json serves 200; JSON-RPC POST / returns
  -32603 "agent not configured" with the env-name in `error.data`.
- Missing auth_token_file (legacy path): same treatment.
- Other preflight failures (runtime adapter not installable, invalid
  A2A port) STAY as fails — those are structural, the workspace truly
  can't run.

Updated 4 existing tests that asserted `report.ok is False` on
required_env / auth_token misses to assert `report.ok is True` and
check `report.warnings` instead. All 31 preflight tests pass; full
suite 1664 pass + 1 unrelated flake on staging.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 12:20:34 -07:00
Hongming Wang 8176b5142d Merge pull request #2759 from Molecule-AI/auto-sync/main-31427776
chore: sync main → staging (auto, ff to 31427776)
2026-05-04 18:03:49 +00:00
Hongming Wang 314277769e Merge pull request #2758 from Molecule-AI/staging
staging → main: auto-promote 4f9e3fe
2026-05-04 10:53:03 -07:00
hongming e0b567e992 Merge pull request #2757 from Molecule-AI/fix/memory-v2-wiring-real-tests
Memory v2 wiring: replace decorative tests with real integration
2026-05-04 17:43:09 +00:00
Hongming Wang 707e4d7342 Memory v2 wiring: replace decorative tests with real integration
Self-review of #2755 found two tests that didn't actually exercise the
production code path:

- TestNamespaceCleanupFn_NamespaceFormat asserted
  "workspace:" + "abc-123" == "workspace:abc-123" — a compile-time
  invariant, not runtime behavior. Provided no protection if the closure
  in Bundle.NamespaceCleanupFn ever stopped using that prefix.

- TestNamespaceCleanupFn_FailureLogsButReturns built a *parallel*
  cleanup closure inline with errors.New, then invoked the parallel
  closure. The production closure was never exercised. A regression
  in NamespaceCleanupFn (e.g. forgetting the deferred recover, calling
  the plugin without nil-check) would still pass this test.

Replaced both with real integration:

- TestNamespaceCleanupFn_HitsPluginAtCorrectNamespace spins up
  httptest.Server, points MEMORY_PLUGIN_URL at it, calls Build(),
  invokes the production closure, and asserts the server actually
  saw DELETE /v1/namespaces/workspace:abc-123.

- TestNamespaceCleanupFn_PluginErrorDoesNotPanic exercises the
  failure path for real: server returns 500 on DELETE, closure must
  log and return without propagating. defer-recover is belt-and-
  suspenders since production calls this from a for-loop in
  workspace_crud.go that has no recover.

Couldn't ship with #2755 because the merge queue locks the branch
once enqueued. Following up now that #2755 is merged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 10:38:59 -07:00
Hongming Wang 4f9e3feece Merge pull request #2756 from Molecule-AI/fix/agent-card-decouple-from-setup
fix(runtime): decouple agent-card readiness from adapter.setup()
2026-05-04 17:32:02 +00:00
Hongming Wang 10752fe330 Merge pull request #2755 from Molecule-AI/fix/memory-v2-main-wiring
Memory v2 fixup CRITICAL: wire plugin from main.go (was fully dormant)
2026-05-04 17:31:01 +00:00
Hongming Wang 8f7122a9b6 Merge branch 'staging' into fix/agent-card-decouple-from-setup 2026-05-04 10:24:41 -07:00
Hongming Wang b3982035b3 Merge branch 'staging' into fix/memory-v2-main-wiring 2026-05-04 10:24:31 -07:00
Hongming Wang d1122f8d28 fix(build): register not_configured_handler in TOP_LEVEL_MODULES
The wheel-build drift gate caught the new module added in this PR —
without registering it, the published wheel would ship `import
not_configured_handler` un-rewritten, which would `ModuleNotFoundError`
at runtime under `molecule_runtime.main`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 10:24:02 -07:00
Hongming Wang 4b35d25d86 fix(runtime): decouple agent-card readiness from adapter.setup()
Today, if `adapter.setup()` raises (most often: an LLM credential is
missing/rotated), main.py crashes before the agent-card route is mounted.
start.sh restart-loops, /.well-known/agent-card.json never returns 200,
and the workspace is invisible to the bench/canvas — operators see
"stuck booting forever" with no clear error to act on.

The agent-card is a static capability advertisement (name, version,
skills, supported protocols). It doesn't need a working LLM. Coupling
its mount to setup() conflates *availability* ("am I up?") with
*configuration* ("can I actually answer?"). They're different concerns.

This change:
- Builds AgentCard from `config.skills` (static names from config.yaml)
  BEFORE adapter.setup(), so the route mounts independent of setup state.
- Wraps setup() + create_executor in try/except. On success, mounts
  the real DefaultRequestHandler with rich loaded_skills metadata
  swapped into the card in-place. On failure, mounts a JSON-RPC
  handler that returns -32603 "agent not configured" with the
  setup() exception in error.data.
- Heartbeat keeps running on misconfigured boots so the platform
  marks the workspace as reachable-but-misconfigured rather than
  crash-looping. Operators redeploy with corrected env without
  chasing a restart loop.
- initial_prompt and idle_loop are skipped on misconfigured boots —
  they self-fire to /, which would land in -32603 anyway, and the
  marker would consume on the first useless attempt.

Bench impact (RFC #388 strict <120s): codex/openclaw bench-time-outs
were the agent-card-never-returns-200 symptom. With this fix those
runtimes serve the card immediately on EC2 boot, so the bench
measures infrastructure cold-start (claude-code class: ~50–80s)
instead of credential-coupled boot.

Adds workspace/not_configured_handler.py (factory + module-level so
behavior is unit-testable; main.py is `# pragma: no cover`) and
workspace/tests/test_not_configured_handler.py (6 tests covering
status code, JSON-RPC envelope shape, id-echo, malformed-body
fallback, reason surfacing, batch-body safety).

All 1665 existing workspace tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 10:22:31 -07:00
Hongming Wang 46731729d4 Memory v2 fixup Critical: wire plugin from main.go (was fully dormant)
Caught during continued review: the entire v2 plugin system shipped
in PRs #2729-#2742 + #2744-#2751 was never actually invoked because
main.go and router.go don't construct the plugin client/resolver or
attach the WithMemoryV2 / WithNamespaceCleanup hooks.

Operators setting MEMORY_PLUGIN_URL=... saw zero behavior change
because nothing read it. Every fixup we shipped (idempotency, verify
mode, expires_at validation, audit JSON, namespace cleanup, O(N)
export, boot E2E) was also dormant for the same reason.

Root cause: when a multi-handler feature lands across many PRs, none
of them are individually responsible for wiring main.go — and the
master-task-tracking issue didn't gate-check that the wiring landed.
Add main.go integration to every multi-handler RFC checklist.

What ships:

  * internal/memory/wiring/wiring.go: new package that constructs the
    plugin client + resolver from MEMORY_PLUGIN_URL once. Returns nil
    when unset (preserves zero-config legacy behavior). Probes
    /v1/health at boot but doesn't fail-closed — the MCP layer's
    circuit breaker handles ongoing unavailability.

  * internal/memory/wiring/wiring_test.go: 6 tests covering the
    nil/non-nil bundle paths + the namespace-cleanup closure
    contract (nil-safe, format-stable, failure-tolerant).

  * cmd/server/main.go: imports memwiring, calls Build(db.DB) once
    after WorkspaceHandler creation, attaches WithNamespaceCleanup,
    threads the bundle through router.Setup.

  * internal/router/router.go: Setup signature gains *memwiring.Bundle
    param. Inside, attaches WithMemoryV2 to AdminMemoriesHandler and
    MCPHandler when the bundle is non-nil.

After this, the v2 plugin is reachable end-to-end:

  Operator sets MEMORY_PLUGIN_URL → main.Build instantiates client +
  resolver → WorkspaceHandler gets cleanup hook → router wires
  AdminMemoriesHandler + MCPHandler with WithMemoryV2 → MCP tool
  calls (commit_memory_v2, search_memory, etc.) actually do
  something → admin export/import respects MEMORY_V2_CUTOVER.

Prerequisite for #292 (staging verification) — without this, the
operator runbook's step 2 (set MEMORY_PLUGIN_URL, observe behavior)
silently no-ops.

Verified: all 9 affected test packages still green
(memory/{client,contract,e2e,namespace,pgplugin,wiring}, handlers,
router, plus the build).
2026-05-04 10:22:30 -07:00
Hongming Wang 6dc2d907a2 Merge pull request #2754 from Molecule-AI/auto-sync/main-849bc973
chore: sync main → staging (auto, ff to 849bc973)
2026-05-04 17:19:03 +00:00
molecule-ai[bot] 849bc97349 Merge pull request #2753 from Molecule-AI/staging
staging → main: auto-promote e13dcab
2026-05-04 17:08:11 +00:00
Hongming Wang e13dcab5e0 Merge pull request #2749 from Molecule-AI/fix/memory-v2-i3-export-on
Memory v2 fixup I3: admin export O(workspaces) → O(N_roots+1)
2026-05-04 16:49:43 +00:00
Hongming Wang 721010307c Merge pull request #2752 from Molecule-AI/auto-sync/main-73a949bb
chore: sync main → staging (auto, ff to 73a949bb)
2026-05-04 16:49:23 +00:00
Hongming Wang 9f47ecf86e Merge branch 'staging' into fix/memory-v2-i3-export-on 2026-05-04 09:44:37 -07:00
Hongming Wang ebc20794f3 fix(admin-memories): include each member's private namespace in export
ReadableNamespaces(rootID) returns {workspace:rootID, team:rootID,
org:rootID} — the workspace: namespace it surfaces is the root's only.
The I3 batching change resolved namespaces once per root which silently
dropped every child workspace's private memories from admin export
(workspace:childID never reached the plugin search).

Keep the per-root batching win for team:/org:/custom: namespaces;
inject each member's workspace:<id> + owner mapping explicitly so
coverage matches the legacy per-workspace iteration.

Cost stays at 1 SQL + N_roots resolver + 1 plugin search.

Test changes:
- New TestExport_IncludesEveryMembersPrivateNamespace uses a
  per-workspace resolver stub (mirrors real behaviour) and asserts
  every member's workspace:<id> reaches the plugin search AND that
  children's private memories appear in the response with correct
  owner attribution. Verified to FAIL on the pre-fix code.
- TestExport_BatchesPluginCallsByRoot updated to expect 5 namespaces
  (3 workspace + team + org) instead of 3 — it had pinned the buggy
  3-namespace behaviour.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 09:44:06 -07:00
Hongming Wang 9a64aeaa2c Memory v2 fixup I3: admin export O(workspaces) → O(N_roots+1)
Self-review #289. The previous exportViaPlugin ran one resolver CTE
walk + one plugin search PER WORKSPACE. For a 1000-workspace tenant
that's 1000× of each, mostly redundant — workspaces sharing a
team/org root see identical readable namespaces.

New strategy:
  1. Single SQL pass returns each workspace + its computed root_id
     via a recursive CTE (loadWorkspacesWithRoots).
  2. Group by root → unique tree count is typically << workspace
     count.
  3. Resolver runs ONCE per root (any member sees the same readable
     list).
  4. Build the union of all root namespaces; single plugin.Search
     call.
  5. Map each memory back to a workspace_name via pickOwnerForNamespace
     (workspace:<id> → matching member; team:* / org:* / custom:* →
     canonical first member of root group).

Net call cost: 1 SQL + N_roots resolver + 1 plugin call (vs
N_workspaces × resolver + N_workspaces × plugin in the old code).

Tests:
  * TestExport_BatchesPluginCallsByRoot pins the new behavior
    explicitly: 3 workspaces under 1 root → exactly 1 plugin search
    (was 3 with the old code).
  * TestPickOwnerForNamespace covers all five attribution cases:
    workspace:<id> match, workspace:<id> no-match-fallback, team:*,
    org:*, custom:* → first-member-of-root-group; plus empty-members
    fallback.
  * All 9 existing TestExport_* / TestImport_* / TestPickOwner /
    TestNamespaceKindFromLegacyScope / TestSkipImport / etc. tests
    remain green (verified with -run "Export").

The legacy DB path (when MEMORY_V2_CUTOVER unset) is unchanged.
2026-05-04 09:17:30 -07:00
19 changed files with 1590 additions and 211 deletions
+2
View File
@@ -58,6 +58,7 @@ TOP_LEVEL_MODULES = {
"adapter_base",
"agent",
"agents_md",
"card_helpers",
"config",
"configs_dir",
"consolidation",
@@ -73,6 +74,7 @@ TOP_LEVEL_MODULES = {
"main",
"mcp_cli",
"molecule_ai_status",
"not_configured_handler",
"platform_auth",
"platform_inbound_auth",
"plugins",
+12 -1
View File
@@ -18,6 +18,7 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/imagewatch"
memwiring "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/wiring"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/router"
@@ -166,6 +167,16 @@ func main() {
wh.SetCPProvisioner(cpProv)
}
// Memory v2 plugin (RFC #2728): build the dependency bundle once
// here so all three handlers (MCPHandler, AdminMemoriesHandler,
// WorkspaceHandler) get the same plugin/resolver pair. memBundle
// is nil when MEMORY_PLUGIN_URL is unset — every consumer
// nil-checks before using.
memBundle := memwiring.Build(db.DB)
if memBundle != nil {
wh.WithNamespaceCleanup(memBundle.NamespaceCleanupFn())
}
// External-plugin env mutators — each plugin contributes 0+ mutators
// onto a shared registry. Order matters: gh-identity populates
// MOLECULE_AGENT_ROLE-derived attribution env vars that downstream
@@ -306,7 +317,7 @@ func main() {
cronSched.SetChannels(channelMgr)
// Router
r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr)
r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr, memBundle)
// HTTP server with graceful shutdown
srv := &http.Server{
@@ -2,6 +2,7 @@ package handlers
import (
"context"
"database/sql"
"log"
"net/http"
"os"
@@ -255,68 +256,185 @@ func (h *AdminMemoriesHandler) Import(c *gin.Context) {
// the legacy memoryExportEntry shape so existing tooling that consumes
// the export keeps working.
//
// Strategy: enumerate workspaces, ask the resolver for each one's
// readable namespaces, search each namespace once. Deduplicate by
// memory id (a single memory in team:X is visible to every workspace
// under root X — we want one row per memory, not N).
// Optimization (#289 fix): the previous implementation was O(workspaces)
// in BOTH resolver CTE walks AND plugin search calls. For a 1000-tenant
// org, that's 1000 × resolver + 1000 × HTTP, where most are redundant
// because workspaces sharing a team/org root see identical namespaces.
//
// New strategy:
// 1. Single SQL pass walks parent_id chains, returning each
// workspace's root_id alongside its name.
// 2. Group workspaces by root → unique tree count is typically <<
// workspace count.
// 3. Resolve namespaces ONCE per root (any workspace under that
// root produces the same readable list).
// 4. Build a UNION of namespaces across all roots; single plugin
// search call.
// 5. Map each memory back to a workspace_name via a namespace→ws
// lookup table built up from step 3.
//
// Net cost: 1 SQL + N_roots resolver calls + 1 plugin call (vs
// N_workspaces resolver + N_workspaces plugin in the old code).
func (h *AdminMemoriesHandler) exportViaPlugin(c *gin.Context, ctx context.Context) {
rows, err := db.DB.QueryContext(ctx, `SELECT id::text, name FROM workspaces ORDER BY created_at`)
// 1. One SQL pass: every workspace + its root id.
wsRows, err := loadWorkspacesWithRoots(ctx, db.DB)
if err != nil {
log.Printf("admin/memories/export (cutover): workspaces query: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "export query failed"})
return
}
defer rows.Close()
type wsRow struct{ ID, Name string }
var workspaces []wsRow
for rows.Next() {
var w wsRow
if err := rows.Scan(&w.ID, &w.Name); err != nil {
continue
}
workspaces = append(workspaces, w)
// 2. Group by root → list of workspaces.
rootToWorkspaces := make(map[string][]workspaceRow, len(wsRows))
for _, w := range wsRows {
rootToWorkspaces[w.RootID] = append(rootToWorkspaces[w.RootID], w)
}
seen := make(map[string]struct{})
memories := make([]memoryExportEntry, 0)
for _, w := range workspaces {
readable, err := h.resolver.ReadableNamespaces(ctx, w.ID)
// 3. Resolve team/org namespaces once per root, then add each
// member's private workspace:<id> namespace explicitly.
//
// IMPORTANT: ReadableNamespaces(rootID) returns
// {workspace:rootID, team:rootID, org:rootID}. Calling it once
// per root is enough for team:/org:/custom: (those are shared by
// every member of the root group), but the workspace: namespace
// it returns is rootID's only — child members' private
// workspace:<childID> namespaces would be silently dropped from
// the export. Inject each member's workspace:<id> below to keep
// coverage parity with the legacy per-workspace iteration.
nsToOwner := make(map[string]string) // namespace → workspace_name (first matching wins)
allNamespaces := make(map[string]struct{}) // union for plugin search
for rootID, members := range rootToWorkspaces {
readable, err := h.resolver.ReadableNamespaces(ctx, rootID)
if err != nil {
log.Printf("admin/memories/export (cutover) workspace=%s: resolve: %v", w.Name, err)
log.Printf("admin/memories/export (cutover) root=%s: resolve: %v", rootID, err)
continue
}
nsList := make([]string, len(readable))
for i, ns := range readable {
nsList[i] = ns.Name
}
if len(nsList) == 0 {
continue
}
resp, err := h.plugin.Search(ctx, contract.SearchRequest{Namespaces: nsList, Limit: 100})
if err != nil {
log.Printf("admin/memories/export (cutover) workspace=%s: plugin search: %v", w.Name, err)
continue
}
for _, m := range resp.Memories {
if _, dup := seen[m.ID]; dup {
// Collect non-workspace namespaces (team:/org:/custom:/...) from
// the root view; these are identical across every member.
for _, ns := range readable {
if strings.HasPrefix(ns.Name, "workspace:") {
continue
}
seen[m.ID] = struct{}{}
redacted, _ := redactSecrets(w.Name, m.Content)
memories = append(memories, memoryExportEntry{
ID: m.ID,
Content: redacted,
Scope: legacyScopeFromNamespace(m.Namespace),
Namespace: m.Namespace,
CreatedAt: m.CreatedAt,
WorkspaceName: w.Name,
})
allNamespaces[ns.Name] = struct{}{}
if _, alreadyMapped := nsToOwner[ns.Name]; alreadyMapped {
continue
}
if owner := pickOwnerForNamespace(ns.Name, members); owner != "" {
nsToOwner[ns.Name] = owner
}
}
// Inject each member's private workspace:<id> namespace + its
// owner. Children's private memories live in workspace:<childID>
// which the root-only resolve doesn't surface.
for _, m := range members {
ns := "workspace:" + m.ID
allNamespaces[ns] = struct{}{}
nsToOwner[ns] = m.Name
}
}
if len(allNamespaces) == 0 {
c.JSON(http.StatusOK, []memoryExportEntry{})
return
}
// 4. Single plugin search across the union.
nsList := make([]string, 0, len(allNamespaces))
for ns := range allNamespaces {
nsList = append(nsList, ns)
}
resp, err := h.plugin.Search(ctx, contract.SearchRequest{Namespaces: nsList, Limit: 100})
if err != nil {
log.Printf("admin/memories/export (cutover): plugin search: %v", err)
c.JSON(http.StatusOK, []memoryExportEntry{})
return
}
// 5. Map each memory to a workspace_name, redact, emit.
seen := make(map[string]struct{})
memories := make([]memoryExportEntry, 0, len(resp.Memories))
for _, m := range resp.Memories {
if _, dup := seen[m.ID]; dup {
continue
}
seen[m.ID] = struct{}{}
owner := nsToOwner[m.Namespace]
redacted, _ := redactSecrets(owner, m.Content)
memories = append(memories, memoryExportEntry{
ID: m.ID,
Content: redacted,
Scope: legacyScopeFromNamespace(m.Namespace),
Namespace: m.Namespace,
CreatedAt: m.CreatedAt,
WorkspaceName: owner,
})
}
c.JSON(http.StatusOK, memories)
}
// workspaceRow bundles the per-workspace fields the optimized export
// needs (id + name + root for grouping).
type workspaceRow struct {
ID string
Name string
RootID string
}
// loadWorkspacesWithRoots returns one row per workspace with its root
// id computed via a recursive CTE. Single SQL pass — replaces the
// previous N×ReadableNamespaces pattern that walked each tree
// independently.
func loadWorkspacesWithRoots(ctx context.Context, conn *sql.DB) ([]workspaceRow, error) {
rows, err := conn.QueryContext(ctx, `
WITH RECURSIVE chain AS (
SELECT id, parent_id, name, id AS root_id, 0 AS depth
FROM workspaces
WHERE parent_id IS NULL
UNION ALL
SELECT w.id, w.parent_id, w.name, c.root_id, c.depth + 1
FROM workspaces w
JOIN chain c ON w.parent_id = c.id
WHERE c.depth < 50
)
SELECT id::text, name, root_id::text FROM chain ORDER BY name
`)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]workspaceRow, 0)
for rows.Next() {
var w workspaceRow
if err := rows.Scan(&w.ID, &w.Name, &w.RootID); err != nil {
return nil, err
}
out = append(out, w)
}
return out, rows.Err()
}
// pickOwnerForNamespace returns the workspace_name to attribute a
// namespace to in the export. workspace:<id> namespaces map to the
// matching member; team:* / org:* / custom:* fall back to the first
// member of the root group (canonical owner).
func pickOwnerForNamespace(ns string, members []workspaceRow) string {
if strings.HasPrefix(ns, "workspace:") {
wantID := strings.TrimPrefix(ns, "workspace:")
for _, m := range members {
if m.ID == wantID {
return m.Name
}
}
}
// Non-workspace namespaces: attribute to first member of the root
// group. Stable because loadWorkspacesWithRoots returns ORDER BY
// name, so the same root group always picks the same owner.
if len(members) > 0 {
return members[0].Name
}
return ""
}
// importViaPlugin writes the entries through the plugin instead of
// directly to agent_memories. Workspaces are resolved by name like
// the legacy path. Scope→namespace mapping mirrors the PR-6 shim.
@@ -151,9 +151,9 @@ func TestExport_RoutesThroughPluginWhenCutoverActive(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1"))
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
@@ -196,10 +196,10 @@ func TestExport_DeduplicatesByMemoryID(t *testing.T) {
mock := installMockDB(t)
// Two workspaces, both will see the same team-shared memory.
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha").
AddRow("ws-2", "beta"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1").
AddRow("ws-2", "beta", "ws-2"))
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
@@ -225,9 +225,9 @@ func TestExport_DeduplicatesByMemoryID(t *testing.T) {
func TestExport_SkipsWorkspaceWhenResolverFails(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1"))
plugin := &stubAdminPlugin{}
resolver := &stubAdminResolver{err: errors.New("resolver dead")}
@@ -247,9 +247,9 @@ func TestExport_SkipsWorkspaceWhenResolverFails(t *testing.T) {
func TestExport_SkipsWorkspaceWhenPluginSearchFails(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1"))
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) {
@@ -271,7 +271,7 @@ func TestExport_SkipsWorkspaceWhenPluginSearchFails(t *testing.T) {
func TestExport_WorkspacesQueryFails(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnError(errors.New("db dead"))
plugin := &stubAdminPlugin{}
@@ -290,9 +290,9 @@ func TestExport_WorkspacesQueryFails(t *testing.T) {
func TestExport_EmptyReadable(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1"))
resolver := &stubAdminResolver{readable: []namespace.Namespace{}}
h := NewAdminMemoriesHandler().withMemoryV2APIs(&stubAdminPlugin{}, resolver)
@@ -312,9 +312,9 @@ func TestExport_EmptyReadable(t *testing.T) {
func TestExport_RedactsSecretsInPluginPath(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1"))
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) {
@@ -535,6 +535,202 @@ func TestImport_SkipsWhenResolverErrors(t *testing.T) {
}
}
// TestExport_BatchesPluginCallsByRoot pins the I3 fix: previously the
// export ran one resolver + one plugin search per workspace (N+1 in
// both); now it groups by root and runs one resolver + one plugin
// search per UNIQUE root.
//
// Setup: 3 workspaces under 1 root → 1 resolver call + 1 plugin call
// (was: 3 resolver + 3 plugin in the old code). The plugin search
// receives 5 namespaces: each member's workspace:<id> + team:root-1
// + org:root-1. (Children's workspace:<id> namespaces must be
// included or admin export silently drops their private memories.)
func TestExport_BatchesPluginCallsByRoot(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("root-1", "alpha", "root-1").
AddRow("child-1", "alpha-child", "root-1").
AddRow("child-2", "alpha-grandchild", "root-1"))
pluginSearchCount := 0
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
pluginSearchCount++
if len(body.Namespaces) != 5 {
t.Errorf("plugin search call %d: namespaces len = %d, want 5 (3 workspace + team + org); got %v", pluginSearchCount, len(body.Namespaces), body.Namespaces)
}
return &contract.SearchResponse{}, nil
},
}
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver())
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
h.Export(c)
if w.Code != http.StatusOK {
t.Errorf("code = %d body=%s", w.Code, w.Body.String())
}
if pluginSearchCount != 1 {
t.Errorf("plugin search called %d times, want 1 (was 3 with the old N+1 code)", pluginSearchCount)
}
}
// perWorkspaceResolver mimics the real resolver: ReadableNamespaces
// returns the SPECIFIC workspace's view (workspace:<that ID> +
// team:<root> + org:<root>), not a constant set. The legacy
// stubAdminResolver hides the I3 silent-drop bug by ignoring its
// workspace-id argument.
type perWorkspaceResolver map[string][]namespace.Namespace
func (r perWorkspaceResolver) ReadableNamespaces(_ context.Context, ws string) ([]namespace.Namespace, error) {
v, ok := r[ws]
if !ok {
return nil, errors.New("perWorkspaceResolver: unknown ws " + ws)
}
return v, nil
}
func (r perWorkspaceResolver) WritableNamespaces(_ context.Context, ws string) ([]namespace.Namespace, error) {
return r.ReadableNamespaces(nil, ws)
}
// TestExport_IncludesEveryMembersPrivateNamespace pins the I3 follow-up
// fix: when a root group has multiple members, the export must surface
// each member's workspace:<id> namespace, not just the root's. Before
// the fix, calling ReadableNamespaces(rootID) returned only
// workspace:rootID + team:rootID + org:rootID — every child workspace's
// private memories were silently dropped from admin export.
func TestExport_IncludesEveryMembersPrivateNamespace(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("root-1", "alpha", "root-1").
AddRow("child-1", "alpha-child", "root-1").
AddRow("child-2", "alpha-grandchild", "root-1"))
resolver := perWorkspaceResolver{
"root-1": {
{Name: "workspace:root-1", Kind: contract.NamespaceKindWorkspace, Writable: true},
{Name: "team:root-1", Kind: contract.NamespaceKindTeam, Writable: true},
{Name: "org:root-1", Kind: contract.NamespaceKindOrg, Writable: true},
},
"child-1": {
{Name: "workspace:child-1", Kind: contract.NamespaceKindWorkspace, Writable: true},
{Name: "team:root-1", Kind: contract.NamespaceKindTeam, Writable: true},
{Name: "org:root-1", Kind: contract.NamespaceKindOrg, Writable: true},
},
"child-2": {
{Name: "workspace:child-2", Kind: contract.NamespaceKindWorkspace, Writable: true},
{Name: "team:root-1", Kind: contract.NamespaceKindTeam, Writable: true},
{Name: "org:root-1", Kind: contract.NamespaceKindOrg, Writable: true},
},
}
var passedNamespaces []string
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
passedNamespaces = append(passedNamespaces, body.Namespaces...)
return &contract.SearchResponse{Memories: []contract.Memory{
{ID: "m-root", Namespace: "workspace:root-1", Content: "root private", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
{ID: "m-child1", Namespace: "workspace:child-1", Content: "child-1 private", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
{ID: "m-child2", Namespace: "workspace:child-2", Content: "child-2 private", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
{ID: "m-team", Namespace: "team:root-1", Content: "shared team", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
}}, nil
},
}
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, resolver)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
h.Export(c)
if w.Code != http.StatusOK {
t.Fatalf("code = %d body=%s", w.Code, w.Body.String())
}
// Every member's private namespace must reach the plugin search.
want := []string{"workspace:root-1", "workspace:child-1", "workspace:child-2", "team:root-1", "org:root-1"}
got := make(map[string]bool, len(passedNamespaces))
for _, ns := range passedNamespaces {
got[ns] = true
}
for _, w := range want {
if !got[w] {
t.Errorf("plugin search missing namespace %q (got %v)", w, passedNamespaces)
}
}
if len(passedNamespaces) != 5 {
t.Errorf("plugin search namespace count = %d, want 5 (3 workspace + team + org)", len(passedNamespaces))
}
// Children's private memories must appear in the export, attributed
// to the right workspace_name.
var entries []memoryExportEntry
if err := json.Unmarshal(w.Body.Bytes(), &entries); err != nil {
t.Fatalf("unmarshal: %v", err)
}
byID := map[string]memoryExportEntry{}
for _, e := range entries {
byID[e.ID] = e
}
for _, exp := range []struct{ id, ns, owner string }{
{"m-root", "workspace:root-1", "alpha"},
{"m-child1", "workspace:child-1", "alpha-child"},
{"m-child2", "workspace:child-2", "alpha-grandchild"},
} {
e, ok := byID[exp.id]
if !ok {
t.Errorf("export missing memory %s — children's private memories silently dropped", exp.id)
continue
}
if e.Namespace != exp.ns {
t.Errorf("memory %s namespace = %q, want %q", exp.id, e.Namespace, exp.ns)
}
if e.WorkspaceName != exp.owner {
t.Errorf("memory %s owner = %q, want %q", exp.id, e.WorkspaceName, exp.owner)
}
}
}
// TestPickOwnerForNamespace covers the namespace→workspace_name
// attribution helper introduced in I3.
func TestPickOwnerForNamespace(t *testing.T) {
members := []workspaceRow{
{ID: "root-1", Name: "alpha", RootID: "root-1"},
{ID: "child-1", Name: "alpha-child", RootID: "root-1"},
}
cases := []struct {
name string
ns string
want string
}{
{"workspace ns matches member id", "workspace:child-1", "alpha-child"},
{"workspace ns no match → first", "workspace:foreign", "alpha"},
{"team ns → first member of root group", "team:root-1", "alpha"},
{"org ns → first member", "org:root-1", "alpha"},
{"custom ns → first member", "custom:foo", "alpha"},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := pickOwnerForNamespace(tc.ns, members); got != tc.want {
t.Errorf("pickOwnerForNamespace(%q) = %q, want %q", tc.ns, got, tc.want)
}
})
}
if got := pickOwnerForNamespace("workspace:abc", nil); got != "" {
t.Errorf("empty members must return \"\", got %q", got)
}
}
// --- Helper functions ---
func TestLegacyScopeFromNamespace(t *testing.T) {
@@ -0,0 +1,81 @@
// Package wiring constructs the v2 memory plugin dependency bundle
// at boot time so handlers can opt into the plugin path uniformly.
//
// The bundle is nil-safe: when MEMORY_PLUGIN_URL is unset, Build
// returns (nil, nil) so callers can detect "v2 not configured" with
// a single nil check instead of plumbing a feature flag through
// every handler.
//
// This package exists because the v2 plugin client + namespace
// resolver are needed by THREE different handler types (MCPHandler,
// AdminMemoriesHandler, WorkspaceHandler) constructed in two
// different files (main.go for WorkspaceHandler, router.go for the
// other two). A central Build() avoids each construction site
// re-implementing the env-var read + plugin instantiation.
package wiring
import (
"context"
"database/sql"
"log"
"os"
"time"
mclient "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/client"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/namespace"
)
// Bundle is the v2 dependency bundle. Pass it through Setup as a
// single param; handlers extract what they need.
//
// nil receiver = "v2 not configured" — every method on Bundle
// nil-checks itself, so callers can pass a nil Bundle through the
// hot path without conditional spread.
type Bundle struct {
Plugin *mclient.Client
Resolver *namespace.Resolver
}
// Build returns a wired Bundle if MEMORY_PLUGIN_URL is set, else nil.
//
// It probes /v1/health at boot — when the plugin is unreachable, we
// log a warning but STILL return the bundle. The MCP layer's
// circuit breaker handles ongoing unavailability; we don't want to
// block workspace-server boot just because the memory plugin is
// briefly down.
func Build(db *sql.DB) *Bundle {
if os.Getenv("MEMORY_PLUGIN_URL") == "" {
return nil
}
plugin := mclient.New(mclient.Config{})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if hr, err := plugin.Boot(ctx); err != nil {
log.Printf("memory-plugin: /v1/health probe failed (will retry per-request): %v", err)
} else {
log.Printf("memory-plugin: ok, capabilities=%v", hr.Capabilities)
}
return &Bundle{
Plugin: plugin,
Resolver: namespace.New(db),
}
}
// NamespaceCleanupFn returns a closure suitable for
// WorkspaceHandler.WithNamespaceCleanup. nil when bundle is nil so
// callers can pass it through unconditionally.
//
// The closure runs best-effort: errors are logged, never propagated.
// A misbehaving plugin must not block workspace purges.
func (b *Bundle) NamespaceCleanupFn() func(context.Context, string) {
if b == nil || b.Plugin == nil {
return nil
}
return func(ctx context.Context, workspaceID string) {
ns := "workspace:" + workspaceID
if err := b.Plugin.DeleteNamespace(ctx, ns); err != nil {
log.Printf("memory-plugin: namespace cleanup failed (workspace=%s ns=%s): %v",
workspaceID, ns, err)
}
}
}
@@ -0,0 +1,160 @@
package wiring
import (
"context"
"net/http"
"net/http/httptest"
"sync"
"testing"
"github.com/DATA-DOG/go-sqlmock"
)
// TestBuild_NilWhenURLUnset pins the operator-friendly default: no
// MEMORY_PLUGIN_URL → nil bundle → all callers fall through to legacy
// behavior with no surprises.
func TestBuild_NilWhenURLUnset(t *testing.T) {
t.Setenv("MEMORY_PLUGIN_URL", "")
if got := Build(nil); got != nil {
t.Errorf("expected nil bundle when MEMORY_PLUGIN_URL unset, got %+v", got)
}
}
// TestBuild_NonNilWhenURLSet pins that the bundle is constructed even
// when the plugin's /v1/health probe fails — we don't want workspace-
// server boot to depend on a transiently unavailable plugin.
func TestBuild_NonNilWhenURLSet(t *testing.T) {
t.Setenv("MEMORY_PLUGIN_URL", "http://127.0.0.1:1") // bogus port = probe will fail
db, _, _ := sqlmock.New()
defer db.Close()
bundle := Build(db)
if bundle == nil {
t.Fatal("expected non-nil bundle when MEMORY_PLUGIN_URL is set")
}
if bundle.Plugin == nil {
t.Error("Plugin must be wired")
}
if bundle.Resolver == nil {
t.Error("Resolver must be wired")
}
}
// TestNamespaceCleanupFn_NilBundle pins the nil-safe path: callers
// that pass `bundle.NamespaceCleanupFn()` unconditionally don't need
// to nil-check the bundle separately.
func TestNamespaceCleanupFn_NilBundle(t *testing.T) {
var b *Bundle // nil receiver
if got := b.NamespaceCleanupFn(); got != nil {
t.Errorf("nil bundle must return nil cleanup fn, got non-nil")
}
}
// TestNamespaceCleanupFn_NilPlugin: bundle exists but plugin is nil —
// also returns nil cleanup fn (defensive in case of partial wiring).
func TestNamespaceCleanupFn_NilPlugin(t *testing.T) {
b := &Bundle{} // both fields nil
if got := b.NamespaceCleanupFn(); got != nil {
t.Errorf("bundle with nil plugin must return nil cleanup fn")
}
}
// TestNamespaceCleanupFn_HitsPluginAtCorrectNamespace is the real
// integration gate for the closure: it spins up an httptest.Server
// that records every DELETE request, points MEMORY_PLUGIN_URL at it,
// runs Build(), then invokes the returned closure and asserts the
// server saw `DELETE /v1/namespaces/workspace:<id>`.
//
// This replaces two earlier tests that exercised parallel
// implementations rather than the production closure (caught in
// self-review).
func TestNamespaceCleanupFn_HitsPluginAtCorrectNamespace(t *testing.T) {
var (
mu sync.Mutex
gotPaths []string
gotMethods []string
)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
gotPaths = append(gotPaths, r.URL.Path)
gotMethods = append(gotMethods, r.Method)
mu.Unlock()
switch r.URL.Path {
case "/v1/health":
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"status":"ok","version":"1.0.0","capabilities":[]}`))
default:
w.WriteHeader(http.StatusNoContent)
}
}))
t.Cleanup(srv.Close)
t.Setenv("MEMORY_PLUGIN_URL", srv.URL)
db, _, _ := sqlmock.New()
defer db.Close()
bundle := Build(db)
if bundle == nil {
t.Fatal("Build returned nil with MEMORY_PLUGIN_URL set")
}
cleanup := bundle.NamespaceCleanupFn()
if cleanup == nil {
t.Fatal("NamespaceCleanupFn returned nil with non-nil Plugin")
}
cleanup(context.Background(), "abc-123")
mu.Lock()
defer mu.Unlock()
// Two requests expected: /v1/health probe at Boot + DELETE for cleanup.
foundDelete := false
for i, p := range gotPaths {
if gotMethods[i] == "DELETE" && p == "/v1/namespaces/workspace:abc-123" {
foundDelete = true
}
}
if !foundDelete {
t.Errorf("expected DELETE /v1/namespaces/workspace:abc-123, got %v",
pathsAndMethods(gotPaths, gotMethods))
}
}
// TestNamespaceCleanupFn_PluginErrorDoesNotPanic exercises the failure
// path for real: server returns 500 on DELETE; the closure must log
// and return without propagating. Replaces the parallel-implementation
// version that didn't actually test the production code.
func TestNamespaceCleanupFn_PluginErrorDoesNotPanic(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/v1/health" {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"status":"ok","version":"1.0.0","capabilities":[]}`))
return
}
http.Error(w, "boom", http.StatusInternalServerError)
}))
t.Cleanup(srv.Close)
t.Setenv("MEMORY_PLUGIN_URL", srv.URL)
db, _, _ := sqlmock.New()
defer db.Close()
bundle := Build(db)
cleanup := bundle.NamespaceCleanupFn()
// Must not panic, must not propagate the 500. Recovering with
// defer is belt-and-suspenders — production calls this from a
// for-loop in workspace_crud.go that has no recover.
defer func() {
if r := recover(); r != nil {
t.Errorf("cleanup panicked on plugin 500: %v", r)
}
}()
cleanup(context.Background(), "ws-1")
}
func pathsAndMethods(paths, methods []string) []string {
out := make([]string, len(paths))
for i := range paths {
out[i] = methods[i] + " " + paths[i]
}
return out
}
+8 -1
View File
@@ -13,6 +13,7 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers"
memwiring "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/wiring"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
@@ -23,7 +24,7 @@ import (
"github.com/gin-gonic/gin"
)
func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provisioner, platformURL, configsDir string, wh *handlers.WorkspaceHandler, channelMgr *channels.Manager) *gin.Engine {
func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provisioner, platformURL, configsDir string, wh *handlers.WorkspaceHandler, channelMgr *channels.Manager, memBundle *memwiring.Bundle) *gin.Engine {
r := gin.Default()
// Issue #179 — trust no reverse-proxy headers. Without this call Gin's
@@ -150,6 +151,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// F1084/#1131: Export applies redactSecrets before returning content.
// F1085/#1132: Import applies redactSecrets before persisting content.)
adminMemH := handlers.NewAdminMemoriesHandler()
if memBundle != nil {
adminMemH.WithMemoryV2(memBundle.Plugin, memBundle.Resolver)
}
wsAdmin.GET("/admin/memories/export", adminMemH.Export)
wsAdmin.POST("/admin/memories/import", adminMemH.Import)
}
@@ -370,6 +374,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// C3: commit_memory/recall_memory with scope=GLOBAL → permission error;
// send_message_to_user excluded unless MOLECULE_MCP_ALLOW_SEND_MESSAGE=true.
mcpH := handlers.NewMCPHandler(db.DB, broadcaster)
if memBundle != nil {
mcpH.WithMemoryV2(memBundle.Plugin, memBundle.Resolver)
}
mcpRl := middleware.NewMCPRateLimiter(120, time.Minute, context.Background())
wsAuth.GET("/mcp/stream", mcpRl.Middleware(), mcpH.Stream)
wsAuth.POST("/mcp", mcpRl.Middleware(), mcpH.Call)
+10 -4
View File
@@ -491,20 +491,26 @@ async def get_peers() -> list[dict]:
return peers
async def get_workspace_info() -> dict:
async def get_workspace_info(source_workspace_id: str | None = None) -> dict:
"""Get this workspace's info from the platform.
``source_workspace_id`` selects which registered workspace to
introspect when the agent is registered into multiple workspaces
(multi-workspace mode). Unset → defaults to the module-level
WORKSPACE_ID — single-workspace operators see no behaviour change.
Distinguishes three failure shapes so callers can handle them
distinctly (#2429):
- 410 Gone → workspace was deleted; re-onboard required
- 404 / other → workspace never existed (or transient)
- exception → network / auth failure
"""
src = source_workspace_id or WORKSPACE_ID
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}",
headers=auth_headers(),
f"{PLATFORM_URL}/workspaces/{src}",
headers=auth_headers(src),
)
if resp.status_code == 200:
return resp.json()
@@ -521,7 +527,7 @@ async def get_workspace_info() -> dict:
body = {}
return {
"error": "removed",
"id": body.get("id", WORKSPACE_ID),
"id": body.get("id", src),
"removed_at": body.get("removed_at"),
"hint": body.get(
"hint",
+51 -14
View File
@@ -545,19 +545,34 @@ async def tool_list_peers(source_workspace_id: str | None = None) -> str:
return "\n".join(lines)
async def tool_get_workspace_info() -> str:
"""Get this workspace's own info."""
info = await get_workspace_info()
async def tool_get_workspace_info(source_workspace_id: str | None = None) -> str:
"""Get this workspace's own info.
``source_workspace_id`` selects which registered workspace to
introspect when the agent is registered into multiple workspaces.
Unset → falls back to module-level WORKSPACE_ID.
"""
info = await get_workspace_info(source_workspace_id=source_workspace_id)
return json.dumps(info, indent=2)
async def tool_commit_memory(content: str, scope: str = "LOCAL") -> str:
async def tool_commit_memory(
content: str,
scope: str = "LOCAL",
source_workspace_id: str | None = None,
) -> str:
"""Save important information to persistent memory.
GLOBAL scope is writable only by root workspaces (tier == 0).
RBAC memory.write permission is required for all scope levels.
The source workspace_id is embedded in every record so the platform
can enforce cross-workspace isolation and audit trail.
``source_workspace_id`` selects which registered workspace this
memory belongs to when the agent is registered into multiple
workspaces (PR-1 / multi-workspace mode). When unset, falls back
to the module-level WORKSPACE_ID — single-workspace operators see
no behaviour change.
"""
if not content:
return "Error: content is required"
@@ -581,18 +596,19 @@ async def tool_commit_memory(content: str, scope: str = "LOCAL") -> str:
"Non-root workspaces may use LOCAL or TEAM scope."
)
src = source_workspace_id or WORKSPACE_ID
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/memories",
f"{PLATFORM_URL}/workspaces/{src}/memories",
json={
"content": content,
"scope": scope,
# Embed source workspace so the platform can namespace-isolate
# and audit cross-workspace writes (GH#1610 fix).
"workspace_id": WORKSPACE_ID,
"workspace_id": src,
},
headers=_auth_headers_for_heartbeat(),
headers=_auth_headers_for_heartbeat(src),
)
data = resp.json()
if resp.status_code in (200, 201):
@@ -602,13 +618,21 @@ async def tool_commit_memory(content: str, scope: str = "LOCAL") -> str:
return f"Error saving memory: {e}"
async def tool_recall_memory(query: str = "", scope: str = "") -> str:
async def tool_recall_memory(
query: str = "",
scope: str = "",
source_workspace_id: str | None = None,
) -> str:
"""Search persistent memory for previously saved information.
RBAC memory.read permission is required (mirrors builtin_tools/memory.py).
The workspace_id is sent as a query parameter so the platform can
cross-validate it against the auth token and defend against any future
path traversal / cross-tenant read bugs in the platform itself.
``source_workspace_id`` selects which registered workspace's memories
to search when the agent is registered into multiple workspaces.
Unset → defaults to the module-level WORKSPACE_ID.
"""
# RBAC: require memory.read permission (mirrors builtin_tools/memory.py)
if not _check_memory_read_permission():
@@ -617,7 +641,8 @@ async def tool_recall_memory(query: str = "", scope: str = "") -> str:
"permission for this operation."
)
params: dict[str, str] = {"workspace_id": WORKSPACE_ID}
src = source_workspace_id or WORKSPACE_ID
params: dict[str, str] = {"workspace_id": src}
if query:
params["q"] = query
if scope:
@@ -625,9 +650,9 @@ async def tool_recall_memory(query: str = "", scope: str = "") -> str:
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/memories",
f"{PLATFORM_URL}/workspaces/{src}/memories",
params=params,
headers=_auth_headers_for_heartbeat(),
headers=_auth_headers_for_heartbeat(src),
)
data = resp.json()
if isinstance(data, list):
@@ -664,7 +689,12 @@ _INBOX_NOT_ENABLED_MSG = (
)
async def tool_chat_history(peer_id: str, limit: int = 20, before_ts: str = "") -> str:
async def tool_chat_history(
peer_id: str,
limit: int = 20,
before_ts: str = "",
source_workspace_id: str | None = None,
) -> str:
"""Fetch the prior conversation with one peer.
Hits ``/workspaces/<self>/activity?peer_id=<peer>&limit=<N>``
@@ -686,6 +716,11 @@ async def tool_chat_history(peer_id: str, limit: int = 20, before_ts: str = "")
histories — pass the oldest ``ts`` from the previous
response. Empty (default) returns the most recent ``limit``
rows.
source_workspace_id: Which registered workspace's activity log
to query. Auto-routes via ``_peer_to_source`` cache when
unset (the workspace this peer was discovered through);
falls back to module-level WORKSPACE_ID for single-workspace
operators.
Returns a JSON-encoded list of activity rows (or an error string
starting with ``Error:`` so the agent can branch). Each row carries
@@ -701,6 +736,8 @@ async def tool_chat_history(peer_id: str, limit: int = 20, before_ts: str = "")
if limit > 500:
limit = 500
src = source_workspace_id or _peer_to_source.get(peer_id) or WORKSPACE_ID
params: dict[str, str] = {
"peer_id": peer_id,
"limit": str(limit),
@@ -713,9 +750,9 @@ async def tool_chat_history(peer_id: str, limit: int = 20, before_ts: str = "")
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{PLATFORM_URL}/workspaces/{WORKSPACE_ID}/activity",
f"{PLATFORM_URL}/workspaces/{src}/activity",
params=params,
headers=_auth_headers_for_heartbeat(),
headers=_auth_headers_for_heartbeat(src),
)
except Exception as exc: # noqa: BLE001
return f"Error: chat_history request failed: {exc}"
+57
View File
@@ -0,0 +1,57 @@
"""Helpers for building / mutating the workspace ``AgentCard``.
Kept as their own module so the behavior is unit-testable without booting
the whole runtime (``main.py`` is ``# pragma: no cover``).
"""
from __future__ import annotations
from typing import Iterable
from a2a.types import AgentCard, AgentSkill
def enrich_card_skills(card: AgentCard, loaded_skills: Iterable | None) -> bool:
"""Replace ``card.skills`` with rich metadata from the adapter's loaded
skills, in place. Pairs with PR #2756: the card was built up front from
static ``config.skills`` names so /.well-known/agent-card.json could
serve before ``adapter.setup()`` finishes; this swaps in the richer
descriptions/tags/examples that ``setup()``'s skill loader produces.
Returns ``True`` on swap, ``False`` when the swap was skipped or
failed. Failure cases:
* ``loaded_skills`` is None / empty — caller didn't load any.
* Any element doesn't expose ``.metadata.{id,name,description,tags,examples}``
(a future adapter that doesn't follow the canonical shape).
Failures DO NOT raise — a malformed ``loaded_skills`` shape would
otherwise propagate to ``main.py``'s outer ``except Exception``,
silently degrading an OK boot to the not-configured state. Static
stubs from ``config.skills`` stay in place; setup() already
succeeded, the agent works, only the card's skill enrichment is
degraded. Operator sees a clear log line; tests assert this
distinction.
"""
if not loaded_skills:
return False
try:
rich = [
AgentSkill(
id=skill.metadata.id,
name=skill.metadata.name,
description=skill.metadata.description,
tags=skill.metadata.tags,
examples=skill.metadata.examples,
)
for skill in loaded_skills
]
except Exception as enrich_err: # noqa: BLE001
print(
f"Warning: skill metadata enrichment failed (keeping static "
f"stubs from config.skills): {type(enrich_err).__name__}: {enrich_err}",
flush=True,
)
return False
card.skills = rich
return True
+162 -99
View File
@@ -148,62 +148,15 @@ async def main(): # pragma: no cover
heartbeat=heartbeat,
)
# 5. Setup adapter and create executor
# If setup fails, ensure heartbeat is stopped to prevent resource leak
try:
await adapter.setup(adapter_config)
executor = await adapter.create_executor(adapter_config)
# 5a. Boot-smoke short-circuit (issue #2275): if MOLECULE_SMOKE_MODE
# is set, exercise the executor's full import tree by calling
# execute() once with stub deps + a short timeout. Skips platform
# registration + uvicorn entirely. Returns process exit code.
from smoke_mode import is_smoke_mode, run_executor_smoke
if is_smoke_mode():
exit_code = await run_executor_smoke(executor)
if hasattr(heartbeat, "stop"):
try:
await heartbeat.stop()
except Exception: # noqa: BLE001
pass
raise SystemExit(exit_code)
# 5b. Restore from pre-stop snapshot if one exists (GH#1391).
# The snapshot is scrubbed before being written, so secrets are
# already redacted — restore_state must not re-expose them.
from lib.pre_stop import read_snapshot
snapshot = read_snapshot()
if snapshot:
try:
adapter.restore_state(snapshot)
print(
f"Pre-stop snapshot restored: task={snapshot.get('current_task', '')!r}, "
f"uptime={snapshot.get('uptime_seconds', 0)}s"
)
except Exception as restore_err:
print(f"Warning: snapshot restore failed (continuing): {restore_err}")
except Exception:
# heartbeat hasn't started yet but may have async tasks pending
if hasattr(heartbeat, "stop"):
try:
await heartbeat.stop()
except Exception:
pass
raise
# 5.5. Initialise Temporal durable execution wrapper (optional)
# Connects to TEMPORAL_HOST (default: localhost:7233) and starts a
# co-located Temporal worker as a background asyncio task.
# No-op with a warning log if Temporal is unreachable or temporalio
# is not installed — all tasks fall back to direct execution transparently.
from builtin_tools.temporal_workflow import create_wrapper as _create_temporal_wrapper
temporal_wrapper = _create_temporal_wrapper()
await temporal_wrapper.start()
# Get loaded skills for agent card (adapter may have populated them)
loaded_skills = getattr(adapter, "loaded_skills", [])
# 6. Build Agent Card
# 5. Build the AgentCard *before* adapter.setup() so /.well-known/agent-card.json
# is reachable as soon as uvicorn binds, regardless of whether the adapter
# has working LLM credentials. Decoupling readiness ("is the workspace up?")
# from configuration ("can it actually answer?") means a workspace with a
# missing/rotated key stays REACHABLE — canvas can render a clear
# "agent not configured" error instead of "stuck booting forever," and
# operators can deprovision/redeploy normally. Skills built from
# config.skills (static names from config.yaml) up front; richer metadata
# from the adapter's loaded_skills swaps in below if setup() succeeds.
machine_ip = os.environ.get("HOSTNAME", get_machine_ip())
workspace_url = f"http://{machine_ip}:{port}"
@@ -237,20 +190,91 @@ async def main(): # pragma: no cover
# always available and tasks/get accepts historyLength via
# apply_history_length(). Don't add this kwarg back.
),
# Static skill stubs from config.yaml; replaced with rich metadata
# below if adapter.setup() loads skills successfully.
skills=[
AgentSkill(
id=skill.metadata.id,
name=skill.metadata.name,
description=skill.metadata.description,
tags=skill.metadata.tags,
examples=skill.metadata.examples,
)
for skill in loaded_skills
AgentSkill(id=name, name=name, description=name, tags=[], examples=[])
for name in (config.skills or [])
],
default_input_modes=["text/plain", "application/json"],
default_output_modes=["text/plain", "application/json"],
)
# 6. Setup adapter and create executor
# On failure: log + continue. The card route stays mounted (above);
# the JSON-RPC route below returns -32603 "agent not configured" until
# the operator fixes credentials and redeploys. Heartbeat keeps running
# so the platform sees the workspace as reachable-but-misconfigured
# rather than crash-looping.
adapter_ready = False
adapter_error: str | None = None
executor = None
try:
await adapter.setup(adapter_config)
executor = await adapter.create_executor(adapter_config)
# 6a. Boot-smoke short-circuit (issue #2275): if MOLECULE_SMOKE_MODE
# is set, exercise the executor's full import tree by calling
# execute() once with stub deps + a short timeout. Skips platform
# registration + uvicorn entirely. Returns process exit code.
from smoke_mode import is_smoke_mode, run_executor_smoke
if is_smoke_mode():
exit_code = await run_executor_smoke(executor)
if hasattr(heartbeat, "stop"):
try:
await heartbeat.stop()
except Exception: # noqa: BLE001
pass
raise SystemExit(exit_code)
# 6b. Restore from pre-stop snapshot if one exists (GH#1391).
# The snapshot is scrubbed before being written, so secrets are
# already redacted — restore_state must not re-expose them.
from lib.pre_stop import read_snapshot
snapshot = read_snapshot()
if snapshot:
try:
adapter.restore_state(snapshot)
print(
f"Pre-stop snapshot restored: task={snapshot.get('current_task', '')!r}, "
f"uptime={snapshot.get('uptime_seconds', 0)}s"
)
except Exception as restore_err:
print(f"Warning: snapshot restore failed (continuing): {restore_err}")
# 6c. Swap rich skill metadata into the card now that setup() loaded
# them. In-place mutation: a2a-sdk's create_agent_card_routes serialises
# the card on each request, so the route mounted below sees the update.
# Isolated via card_helpers.enrich_card_skills — a malformed
# loaded_skills shape (e.g., a future adapter that doesn't follow
# the .metadata convention) is logged + swallowed instead of
# propagating up to the outer except, where it would silently
# degrade an OK boot to the not-configured state.
from card_helpers import enrich_card_skills
enrich_card_skills(agent_card, getattr(adapter, "loaded_skills", None))
adapter_ready = True
except SystemExit:
# Smoke-mode exit signal — propagate untouched.
raise
except Exception as setup_err: # noqa: BLE001
adapter_error = f"{type(setup_err).__name__}: {setup_err}"
print(
f"WARNING: adapter.setup() failed — workspace will serve agent-card "
f"but JSON-RPC will return -32603 until configuration is fixed. "
f"Reason: {adapter_error}",
flush=True,
)
# Heartbeat keeps running so the platform marks the workspace as
# reachable-but-misconfigured. Operators can then redeploy with the
# correct env vars without having to chase a crash-loop.
# 6.5. Initialise Temporal durable execution wrapper (optional). Only
# meaningful when an executor exists; skipped on misconfigured boots.
if adapter_ready:
from builtin_tools.temporal_workflow import create_wrapper as _create_temporal_wrapper
temporal_wrapper = _create_temporal_wrapper()
await temporal_wrapper.start()
# 7. Wrap in A2A.
#
# Regression fix (#204): PR #198 tried to wire push_config_store +
@@ -262,42 +286,51 @@ async def main(): # pragma: no cover
# in the AgentCard below is still advertised via AgentCapabilities so
# clients know we COULD do pushes; actually implementing them requires
# a concrete sender subclass, tracked as a Phase-H follow-up to #175.
handler = DefaultRequestHandler(
agent_executor=executor,
task_store=InMemoryTaskStore(),
# a2a-sdk 1.x added agent_card as a required positional/keyword
# argument — it's used internally for capability dispatch (e.g.
# routing tasks/get historyLength based on the card's protocol
# version). Pass the same agent_card we registered with the
# platform so the handler's capability surface matches what the
# AgentCard advertises.
agent_card=agent_card,
)
# v1: replace A2AStarletteApplication with Starlette route factory.
# rpc_url is required in a2a-sdk 1.x (was implicit at root in 0.x).
# Use '/' to match a2a.utils.constants.DEFAULT_RPC_URL — that's also
# what the platform's a2a_proxy.go POSTs to (it forwards to the
# workspace's URL without appending a path). Card endpoint stays at
# the well-known path /.well-known/agent-card.json (handled by
# create_agent_card_routes default).
routes = []
routes.extend(create_agent_card_routes(agent_card))
# enable_v0_3_compat=True is the JSON-RPC wire-compat path: clients
# using v0.3-shaped payloads (`"role": "user"` lowercase + camelCase
# Pydantic field names) can talk to us without re-deploying. Outbound
# JSON-RPC wire payloads MUST also use v0.3 shape — the v0.3 compat
# adapter at /usr/local/lib/python3.11/site-packages/a2a/compat/v0_3/
# validates against Pydantic Role enum (`agent`|`user`) and rejects
# the protobuf-style `ROLE_USER` enum names with JSON-RPC -32600
# (Invalid Request). Native v1.x types (a2a.types.Role.ROLE_AGENT)
# are only for code that constructs Message objects in-process and
# hands them to the SDK, which serialises them correctly for the
# outbound wire format.
routes.extend(create_jsonrpc_routes(request_handler=handler, rpc_url="/", enable_v0_3_compat=True))
if adapter_ready:
handler = DefaultRequestHandler(
agent_executor=executor,
task_store=InMemoryTaskStore(),
# a2a-sdk 1.x added agent_card as a required positional/keyword
# argument — it's used internally for capability dispatch (e.g.
# routing tasks/get historyLength based on the card's protocol
# version). Pass the same agent_card we registered with the
# platform so the handler's capability surface matches what the
# AgentCard advertises.
agent_card=agent_card,
)
# v1: replace A2AStarletteApplication with Starlette route factory.
# rpc_url is required in a2a-sdk 1.x (was implicit at root in 0.x).
# Use '/' to match a2a.utils.constants.DEFAULT_RPC_URL — that's also
# what the platform's a2a_proxy.go POSTs to (it forwards to the
# workspace's URL without appending a path). Card endpoint stays at
# the well-known path /.well-known/agent-card.json (handled by
# create_agent_card_routes default).
# enable_v0_3_compat=True is the JSON-RPC wire-compat path: clients
# using v0.3-shaped payloads (`"role": "user"` lowercase + camelCase
# Pydantic field names) can talk to us without re-deploying.
routes.extend(create_jsonrpc_routes(request_handler=handler, rpc_url="/", enable_v0_3_compat=True))
else:
# Misconfigured: serve the card but reject JSON-RPC with -32603 so
# canvas surfaces a useful "agent not configured: <reason>" instead
# of letting requests time out. Handler factory is in its own module
# so the behavior is unit-testable (workspace/tests/test_not_configured_handler.py).
from starlette.routing import Route
from not_configured_handler import make_not_configured_handler
routes.append(
Route("/", make_not_configured_handler(adapter_error), methods=["POST"])
)
app = Starlette(routes=routes)
# 8. Register with platform
# When adapter.setup() failed, advertise via configuration_status so
# the platform/canvas can render "configured: false, reason: …" instead
# of a confused "ready but silent" state.
loaded_skills = getattr(adapter, "loaded_skills", None) or []
agent_card_dict = {
"name": config.name,
"description": config.description,
@@ -311,11 +344,16 @@ async def main(): # pragma: no cover
"tags": s.metadata.tags,
}
for s in loaded_skills
] if adapter_ready else [
{"id": n, "name": n, "description": n, "tags": []}
for n in (config.skills or [])
],
"capabilities": {
"streaming": config.a2a.streaming,
"pushNotifications": config.a2a.push_notifications,
},
"configuration_status": "ready" if adapter_ready else "not_configured",
**({"configuration_error": adapter_error} if adapter_error else {}),
}
async with httpx.AsyncClient(timeout=10.0) as client:
@@ -364,7 +402,9 @@ async def main(): # pragma: no cover
# 9b. Start skills hot-reload watcher (background task)
# When a skill file changes the watcher reloads the skill module and calls
# back into the adapter so the next A2A request uses the updated tools.
if config.skills:
# Skipped on misconfigured boots — adapter has no executor / tool registry
# to swap into, so reloading skills would NPE on the agent rebuild path.
if adapter_ready and config.skills:
try:
from skill_loader.watcher import SkillsWatcher
@@ -452,7 +492,24 @@ async def main(): # pragma: no cover
limit = int(request.query_params.get("limit", "100"))
except (TypeError, ValueError):
return JSONResponse({"error": "since and limit must be integers"}, status_code=400)
result = await adapter.transcript_lines(since=since, limit=limit)
# Isolate adapter call: misconfigured boots leave the adapter
# partially-initialised, and a future adapter override of
# transcript_lines might assume setup() ran. Surface a 503 with
# a clear reason instead of letting the exception propagate to
# Starlette's 500 handler — same pattern as the not-configured
# JSON-RPC route (PR #2756). BaseAdapter.transcript_lines's
# default returns {"supported": false} so today's 4 adapters
# never trigger this branch; this is the safety net.
try:
result = await adapter.transcript_lines(since=since, limit=limit)
except Exception as transcript_err: # noqa: BLE001
return JSONResponse(
{
"error": "transcript unavailable",
"detail": f"{type(transcript_err).__name__}: {transcript_err}",
},
status_code=503,
)
return JSONResponse(result)
starlette_app.add_route("/transcript", _transcript_handler, methods=["GET"])
@@ -495,9 +552,13 @@ async def main(): # pragma: no cover
# 10b. Schedule initial_prompt self-message after server is ready.
# Only runs on first boot — creates a marker file to prevent re-execution on restart.
# Skipped on misconfigured boots: the self-message would route through the
# platform back to /, hit the -32603 not-configured handler, and consume
# the marker for a fire that can't actually run. Wait until the operator
# fixes credentials and the workspace redeploys with adapter_ready=True.
initial_prompt_task = None
initial_prompt_marker = resolve_initial_prompt_marker(config_path)
if config.initial_prompt and not os.path.exists(initial_prompt_marker):
if adapter_ready and config.initial_prompt and not os.path.exists(initial_prompt_marker):
# Write the marker UP FRONT (#71): if the prompt later crashes or
# times out, we do NOT replay on next boot — that created a
# ProcessError cascade where every message kept crashing. Operators
@@ -615,7 +676,9 @@ async def main(): # pragma: no cover
# workspaces upgrade opt-in — set idle_prompt in org.yaml defaults or
# per-workspace to enable.
idle_loop_task = None
if config.idle_prompt:
# Skipped on misconfigured boots — the self-fire would route to the
# -32603 handler in a tight loop and consume cycles for no useful work.
if adapter_ready and config.idle_prompt:
# Idle-fire HTTP timeout. Kept tight relative to the fire cadence so a
# hung platform doesn't accumulate dangling requests — a fire that
# takes longer than the idle interval itself is almost certainly stuck.
+55
View File
@@ -0,0 +1,55 @@
"""Build a JSON-RPC handler that returns ``-32603 "agent not configured"``.
Used by the workspace runtime when ``adapter.setup()`` fails (most often
because an LLM credential is missing or rotated). Lets ``/.well-known/agent-card.json``
keep serving 200 — the workspace stays REACHABLE for canvas/operator
introspection — while message-send requests get a clear, immediate
error instead of silently timing out.
Kept as its own module so the behavior is unit-testable without booting
the whole runtime (main.py is ``# pragma: no cover``).
"""
from __future__ import annotations
from typing import Awaitable, Callable
from starlette.requests import Request
from starlette.responses import JSONResponse
def make_not_configured_handler(
reason: str | None,
) -> Callable[[Request], Awaitable[JSONResponse]]:
"""Return a Starlette POST handler that always 503s with JSON-RPC -32603.
``reason`` is surfaced in the JSON-RPC ``error.data`` field so canvas
can render "agent not configured: <reason>" to the user. Pass the
stringified ``adapter.setup()`` exception. ``None`` falls back to a
generic "adapter.setup() failed".
The handler echoes the request's JSON-RPC ``id`` when present so a
well-behaved JSON-RPC client can correlate the error to its request.
Malformed bodies (non-JSON, missing id) get ``id: null`` per spec.
"""
fallback = reason or "adapter.setup() failed"
async def _handler(request: Request) -> JSONResponse:
try:
body = await request.json()
except Exception: # noqa: BLE001
body = {}
return JSONResponse(
{
"jsonrpc": "2.0",
"id": body.get("id") if isinstance(body, dict) else None,
"error": {
"code": -32603,
"message": "Internal error: agent not configured",
"data": fallback,
},
},
status_code=503,
)
return _handler
+41 -1
View File
@@ -271,7 +271,19 @@ _GET_WORKSPACE_INFO = ToolSpec(
"back to the user, or to determine whether you're a tier-0 "
"root that can write GLOBAL memory)."
),
input_schema={"type": "object", "properties": {}},
input_schema={
"type": "object",
"properties": {
"source_workspace_id": {
"type": "string",
"description": (
"Optional. In multi-workspace mode (this agent registered "
"in N workspaces), introspect the named workspace instead "
"of the primary one. Single-workspace agents omit this."
),
},
},
},
impl=tool_get_workspace_info,
section=A2A_SECTION,
)
@@ -455,6 +467,14 @@ _CHAT_HISTORY = ToolSpec(
"Use the oldest `created_at` from a previous response."
),
},
"source_workspace_id": {
"type": "string",
"description": (
"Optional. Multi-workspace mode: query the named "
"workspace's activity log instead of the primary one. "
"Auto-routes via the peer-discovery cache when unset."
),
},
},
"required": ["peer_id"],
},
@@ -515,6 +535,16 @@ _COMMIT_MEMORY = ToolSpec(
"enum": ["LOCAL", "TEAM", "GLOBAL"],
"description": "Memory scope (default LOCAL).",
},
"source_workspace_id": {
"type": "string",
"description": (
"Optional. Multi-workspace mode: commit the memory "
"into the named workspace's namespace instead of "
"the primary one. Pair with the inbound message's "
"`arrival_workspace_id` so memories stay in the "
"tenant they were derived from."
),
},
},
"required": ["content"],
},
@@ -544,6 +574,16 @@ _RECALL_MEMORY = ToolSpec(
"enum": ["LOCAL", "TEAM", "GLOBAL", ""],
"description": "Filter by scope (empty = all accessible).",
},
"source_workspace_id": {
"type": "string",
"description": (
"Optional. Multi-workspace mode: search the named "
"workspace's memories instead of the primary one. "
"Pair with the inbound message's "
"`arrival_workspace_id` to recall context for the "
"right tenant."
),
},
},
},
impl=tool_recall_memory,
+26 -8
View File
@@ -204,17 +204,31 @@ def run_preflight(config: WorkspaceConfig, config_path: str) -> PreflightReport:
)
)
continue
report.failures.append(
# Missing required env is a CONFIGURATION issue, not a STRUCTURAL one.
# The workspace can still bind /.well-known/agent-card.json — adapter.setup()
# raises on the missing key, main.py's PR #2756 try/except mounts the
# not-configured JSON-RPC handler, canvas surfaces a clear "agent not
# configured: <reason>" error to the user. Hard-failing preflight here
# would crash before the not-configured path even loads, leaving the
# workspace invisible (the failure mode that bit codex/openclaw bench
# 25335853189 on 2026-05-04 even after PR #2756). Warn loudly so logs
# remain actionable, but let the boot continue.
report.warnings.append(
PreflightIssue(
severity="fail",
severity="warn",
title="Required env",
detail=f"Missing required environment variable: {env_var}",
fix=f"Set {env_var} via the secrets API (global or workspace-level).",
fix=(
f"Set {env_var} via the secrets API (global or workspace-level). "
"Workspace will boot in not-configured state until this is set; "
"JSON-RPC will return -32603 'agent not configured' on every request."
),
)
)
# Backward compat: if legacy auth_token_file is set, warn but don't block
# if the token is available via required_env or auth_token_env.
# Backward compat: if legacy auth_token_file is set, warn — same reasoning
# as the required_env block above. The downstream auth check fires inside
# adapter.setup(), which is wrapped by main.py's try/except.
token_file = getattr(config.runtime_config, "auth_token_file", "")
if token_file:
token_path = config_dir / token_file
@@ -226,12 +240,16 @@ def run_preflight(config: WorkspaceConfig, config_path: str) -> PreflightReport:
env_has_token = all(os.environ.get(e) for e in required_env)
if not env_has_token:
report.failures.append(
report.warnings.append(
PreflightIssue(
severity="fail",
severity="warn",
title="Auth token",
detail=f"Missing auth token file: {token_file}",
fix="Remove auth_token_file and use required_env + secrets API instead.",
fix=(
"Remove auth_token_file and use required_env + secrets API "
"instead. Workspace will boot in not-configured state until "
"the token is provided."
),
)
)
+36
View File
@@ -145,6 +145,42 @@ def _make_a2a_mocks():
types_mod.TaskStatus = TaskStatus
types_mod.TaskState = _TaskStateEnum
# v1 AgentCard / AgentSkill / AgentCapabilities / AgentInterface — used
# by main.py's static-card construction (PR #2756) and by
# card_helpers.enrich_card_skills's swap path. Stubs preserve kwargs so
# tests can assert on card.skills[i].name etc., and let card.skills be
# reassigned in place (the production code's enrichment pattern).
class AgentSkill:
def __init__(self, id="", name="", description="", tags=None, examples=None, **kwargs):
self.id = id
self.name = name
self.description = description
self.tags = list(tags) if tags is not None else []
self.examples = list(examples) if examples is not None else []
for k, v in kwargs.items():
setattr(self, k, v)
class AgentCapabilities:
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
class AgentInterface:
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
class AgentCard:
def __init__(self, **kwargs):
self.skills = []
for k, v in kwargs.items():
setattr(self, k, v)
types_mod.AgentSkill = AgentSkill
types_mod.AgentCapabilities = AgentCapabilities
types_mod.AgentInterface = AgentInterface
types_mod.AgentCard = AgentCard
# a2a.helpers (v1: moved from a2a.utils, renamed new_agent_text_message
# → new_text_message). Mock both names — production code only calls
# new_text_message, but if any test still references the old name it
+217
View File
@@ -426,3 +426,220 @@ class TestListRegisteredWorkspaces:
platform_auth.register_workspace_token("ws-1", "tok-1")
platform_auth.clear_cache()
assert platform_auth.list_registered_workspaces() == []
# ---------------------------------------------------------------------------
# Memory tools — commit/recall must namespace under source_workspace_id
# so an agent serving multiple tenants doesn't bleed memories across
# them. Single-workspace path (no source arg) keeps using WORKSPACE_ID.
# ---------------------------------------------------------------------------
class TestCommitMemorySourceRouting:
@pytest.mark.asyncio
async def test_url_and_auth_use_source_workspace_id(self, monkeypatch):
"""commit_memory(source_workspace_id=X) must POST to /workspaces/X/
with X's bearer token — otherwise a multi-tenant agent could
write into the wrong tenant's memory namespace."""
import platform_auth, a2a_tools
platform_auth.register_workspace_token("ffff6666-ffff-ffff-ffff-ffffffffffff", "token-F")
captured: dict = {}
class _Resp:
status_code = 200
def json(self):
return {"id": "mem-1"}
class _Client:
async def __aenter__(self): return self
async def __aexit__(self, *a): return None
async def post(self, url, headers, json):
captured["url"] = url
captured["headers"] = headers
captured["body"] = json
return _Resp()
monkeypatch.setattr(a2a_tools.httpx, "AsyncClient", lambda timeout: _Client())
result = await a2a_tools.tool_commit_memory(
"remember this",
source_workspace_id="ffff6666-ffff-ffff-ffff-ffffffffffff",
)
assert "/workspaces/ffff6666-ffff-ffff-ffff-ffffffffffff/memories" in captured["url"]
assert captured["headers"]["Authorization"] == "Bearer token-F"
assert captured["body"]["workspace_id"] == "ffff6666-ffff-ffff-ffff-ffffffffffff"
import json as _json
assert _json.loads(result)["success"] is True
@pytest.mark.asyncio
async def test_falls_back_to_module_workspace_id(self, monkeypatch):
"""Without source_workspace_id, single-workspace operators keep
the legacy WORKSPACE_ID-based POST — no behavior change."""
import a2a_client, a2a_tools
captured: dict = {}
class _Resp:
status_code = 200
def json(self):
return {"id": "mem-1"}
class _Client:
async def __aenter__(self): return self
async def __aexit__(self, *a): return None
async def post(self, url, headers, json):
captured["url"] = url
return _Resp()
monkeypatch.setattr(a2a_tools.httpx, "AsyncClient", lambda timeout: _Client())
await a2a_tools.tool_commit_memory("remember this")
assert f"/workspaces/{a2a_client.WORKSPACE_ID}/memories" in captured["url"]
class TestRecallMemorySourceRouting:
@pytest.mark.asyncio
async def test_url_params_and_auth_use_source(self, monkeypatch):
"""recall_memory routes the GET, the workspace_id query param,
and the auth header through source_workspace_id."""
import platform_auth, a2a_tools
platform_auth.register_workspace_token("aaaa7777-aaaa-aaaa-aaaa-aaaaaaaaaaaa", "token-G")
captured: dict = {}
class _Resp:
status_code = 200
def json(self):
return []
class _Client:
async def __aenter__(self): return self
async def __aexit__(self, *a): return None
async def get(self, url, params, headers):
captured["url"] = url
captured["params"] = params
captured["headers"] = headers
return _Resp()
monkeypatch.setattr(a2a_tools.httpx, "AsyncClient", lambda timeout: _Client())
await a2a_tools.tool_recall_memory(
query="x",
source_workspace_id="aaaa7777-aaaa-aaaa-aaaa-aaaaaaaaaaaa",
)
assert "/workspaces/aaaa7777-aaaa-aaaa-aaaa-aaaaaaaaaaaa/memories" in captured["url"]
assert captured["params"]["workspace_id"] == "aaaa7777-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
assert captured["headers"]["Authorization"] == "Bearer token-G"
# ---------------------------------------------------------------------------
# chat_history — auto-routes via the peer→source cache so an inbound
# peer_agent push from workspace X sees its history queried against X.
# ---------------------------------------------------------------------------
class TestChatHistorySourceRouting:
@pytest.mark.asyncio
async def test_auto_routes_via_peer_cache(self, monkeypatch):
"""chat_history(peer_id) without an explicit source falls back to
``_peer_to_source[peer_id]`` — same auto-routing as delegate_task,
so the agent doesn't have to remember which workspace surfaced
each peer."""
import platform_auth, a2a_client, a2a_tools
platform_auth.register_workspace_token("bbbb8888-bbbb-bbbb-bbbb-bbbbbbbbbbbb", "token-H")
peer_id = "1111aaaa-1111-1111-1111-111111111111"
a2a_client._peer_to_source[peer_id] = "bbbb8888-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
captured: dict = {}
class _Resp:
status_code = 200
def json(self):
return []
class _Client:
async def __aenter__(self): return self
async def __aexit__(self, *a): return None
async def get(self, url, params, headers):
captured["url"] = url
captured["headers"] = headers
return _Resp()
monkeypatch.setattr(a2a_tools.httpx, "AsyncClient", lambda timeout: _Client())
await a2a_tools.tool_chat_history(peer_id, limit=5)
assert "/workspaces/bbbb8888-bbbb-bbbb-bbbb-bbbbbbbbbbbb/activity" in captured["url"]
assert captured["headers"]["Authorization"] == "Bearer token-H"
@pytest.mark.asyncio
async def test_explicit_source_beats_cache(self, monkeypatch):
import platform_auth, a2a_client, a2a_tools
platform_auth.register_workspace_token("cccc9999-cccc-cccc-cccc-cccccccccccc", "token-I")
peer_id = "1111aaaa-1111-1111-1111-111111111111"
a2a_client._peer_to_source[peer_id] = "should-not-be-used"
captured: dict = {}
class _Resp:
status_code = 200
def json(self):
return []
class _Client:
async def __aenter__(self): return self
async def __aexit__(self, *a): return None
async def get(self, url, params, headers):
captured["url"] = url
return _Resp()
monkeypatch.setattr(a2a_tools.httpx, "AsyncClient", lambda timeout: _Client())
await a2a_tools.tool_chat_history(
peer_id, source_workspace_id="cccc9999-cccc-cccc-cccc-cccccccccccc",
)
assert "/workspaces/cccc9999-cccc-cccc-cccc-cccccccccccc/activity" in captured["url"]
# ---------------------------------------------------------------------------
# get_workspace_info — multi-workspace introspection.
# ---------------------------------------------------------------------------
class TestGetWorkspaceInfoSourceRouting:
@pytest.mark.asyncio
async def test_introspects_named_workspace(self, monkeypatch):
import platform_auth, a2a_client
platform_auth.register_workspace_token("dddd0000-dddd-dddd-dddd-dddddddddddd", "token-J")
captured: dict = {}
class _Resp:
status_code = 200
def json(self):
return {"id": "dddd0000-dddd-dddd-dddd-dddddddddddd", "name": "wsJ"}
class _Client:
async def __aenter__(self): return self
async def __aexit__(self, *a): return None
async def get(self, url, headers):
captured["url"] = url
captured["headers"] = headers
return _Resp()
monkeypatch.setattr(a2a_client.httpx, "AsyncClient", lambda timeout: _Client())
info = await a2a_client.get_workspace_info(
source_workspace_id="dddd0000-dddd-dddd-dddd-dddddddddddd",
)
assert info["id"] == "dddd0000-dddd-dddd-dddd-dddddddddddd"
assert "/workspaces/dddd0000-dddd-dddd-dddd-dddddddddddd" in captured["url"]
assert captured["headers"]["Authorization"] == "Bearer token-J"
+163
View File
@@ -0,0 +1,163 @@
"""Tests for ``card_helpers.enrich_card_skills`` — the defensive swap that
replaces ``AgentCard.skills`` with rich metadata from the adapter's
loaded skills, falling back to the static stubs on shape mismatch.
The whole point of the helper (vs inline in main.py) is that a future
adapter author who returns a non-standard ``loaded_skills`` shape
should NOT silently downgrade their workspace boot to not-configured —
``setup()`` succeeded, the agent works, only the card's skill metadata
enrichment is degraded.
"""
from __future__ import annotations
import sys
from pathlib import Path
WORKSPACE_DIR = Path(__file__).resolve().parents[1]
if str(WORKSPACE_DIR) not in sys.path:
sys.path.insert(0, str(WORKSPACE_DIR))
from a2a.types import AgentCard, AgentCapabilities, AgentInterface, AgentSkill
from card_helpers import enrich_card_skills
def _make_card(static_skill_names):
return AgentCard(
name="test-agent",
description="test",
version="0.0.0",
supported_interfaces=[
AgentInterface(protocol_binding="https://a2a.g/v1", url="http://x:8000")
],
capabilities=AgentCapabilities(streaming=True, push_notifications=False),
skills=[
AgentSkill(id=n, name=n, description=n, tags=[], examples=[])
for n in static_skill_names
],
default_input_modes=["text/plain"],
default_output_modes=["text/plain"],
)
class _SkillMetadata:
"""Mimics the adapter-side Skill.metadata shape."""
def __init__(self, id, name, description, tags, examples):
self.id = id
self.name = name
self.description = description
self.tags = tags
self.examples = examples
class _Skill:
def __init__(self, **kwargs):
self.metadata = _SkillMetadata(**kwargs)
def test_returns_false_on_none():
"""No loaded_skills → caller didn't load any → no swap, no log spam."""
card = _make_card(["a", "b"])
assert enrich_card_skills(card, None) is False
# Static stubs preserved.
assert [s.id for s in card.skills] == ["a", "b"]
def test_returns_false_on_empty_list():
"""Empty list → same treatment as None: nothing to enrich."""
card = _make_card(["a"])
assert enrich_card_skills(card, []) is False
assert [s.id for s in card.skills] == ["a"]
def test_swaps_in_rich_metadata_on_canonical_shape():
"""The happy path: adapter returns Skill objects with the canonical
.metadata shape, card gets the richer descriptions/tags/examples."""
card = _make_card(["search"]) # static stub
rich = [
_Skill(
id="search",
name="Web Search",
description="Search the web for the user's question",
tags=["web", "io"],
examples=["who won the world cup in 2022?"],
),
]
assert enrich_card_skills(card, rich) is True
assert len(card.skills) == 1
assert card.skills[0].id == "search"
assert card.skills[0].name == "Web Search"
assert "web" in card.skills[0].tags
assert card.skills[0].examples == ["who won the world cup in 2022?"]
def test_returns_false_and_keeps_stubs_when_metadata_attr_missing(capsys):
"""Defensive: a future adapter that returns objects without
``.metadata`` would otherwise raise AttributeError and propagate to
main.py's outer except — silently degrading an OK boot to
not-configured. Helper logs + returns False instead, static stubs
stay in place.
This is the reason the helper exists at all; without it the
inline swap in main.py at PR #2756 was a coupling between adapter
discipline and tenant-facing readiness."""
card = _make_card(["a"])
class NoMetadata:
id = "x" # has id but no .metadata.id (the canonical path)
assert enrich_card_skills(card, [NoMetadata()]) is False
# Static stub preserved.
assert [s.id for s in card.skills] == ["a"]
# Operator gets a log line.
captured = capsys.readouterr()
assert "skill metadata enrichment failed" in captured.out
def test_returns_false_when_metadata_is_partial(capsys):
"""Partial shape — has .metadata but the .metadata object lacks one
of the canonical attrs (here: ``examples``). The list comprehension
raises AttributeError on ``skill.metadata.examples`` access, which
the helper swallows. (In production, a2a.types.AgentSkill is a
Pydantic model that ALSO raises on missing required fields — both
failure modes route through the same except branch.)"""
card = _make_card(["a"])
class PartialMeta:
def __init__(self):
self.id = "x"
self.name = "x"
self.description = "x"
self.tags = []
# examples missing
class PartialSkill:
def __init__(self):
self.metadata = PartialMeta()
result = enrich_card_skills(card, [PartialSkill()])
assert result is False
assert [s.id for s in card.skills] == ["a"]
captured = capsys.readouterr()
assert "skill metadata enrichment failed" in captured.out
def test_failure_is_atomic_no_partial_swap(capsys):
"""If the second skill is malformed, the FIRST skill's swap must NOT
leak into card.skills. We use a list-comprehension which builds the
full list before assignment; verify that property holds.
Without this property, a misbehaving adapter could half-corrupt the
card — operators would see "1 skill listed" when 3 were declared,
no log line if the inline swap was partial."""
card = _make_card(["a", "b"])
valid = _Skill(id="x", name="x", description="x", tags=[], examples=[])
class BadSkill:
# No .metadata at all.
pass
assert enrich_card_skills(card, [valid, BadSkill()]) is False
# Original two static stubs intact — card.skills was never reassigned.
assert [s.id for s in card.skills] == ["a", "b"]
@@ -0,0 +1,87 @@
"""Tests for ``not_configured_handler`` — the JSON-RPC -32603 fallback the
runtime mounts when ``adapter.setup()`` fails.
Tests the behavior end-to-end via Starlette's TestClient so the JSON-RPC
wire shape (status 503, code -32603, id-echo) is exercised the same way
canvas would see it.
"""
from __future__ import annotations
import sys
from pathlib import Path
# Make workspace/ importable in test isolation — same pattern as the
# adjacent tests (test_smoke_mode.py, test_heartbeat.py).
WORKSPACE_DIR = Path(__file__).resolve().parents[1]
if str(WORKSPACE_DIR) not in sys.path:
sys.path.insert(0, str(WORKSPACE_DIR))
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.testclient import TestClient
from not_configured_handler import make_not_configured_handler
def _build_app(reason: str | None) -> TestClient:
handler = make_not_configured_handler(reason)
app = Starlette(routes=[Route("/", handler, methods=["POST"])])
return TestClient(app)
def test_returns_503_with_jsonrpc_error_envelope():
"""Status 503; body is a valid JSON-RPC 2.0 error envelope."""
client = _build_app("MINIMAX_API_KEY not set")
resp = client.post("/", json={"jsonrpc": "2.0", "id": 7, "method": "message/send"})
assert resp.status_code == 503
body = resp.json()
assert body["jsonrpc"] == "2.0"
assert body["error"]["code"] == -32603
assert body["error"]["message"] == "Internal error: agent not configured"
def test_echoes_request_id_when_present():
"""JSON-RPC clients correlate replies via id; the handler must echo it."""
client = _build_app("reason")
resp = client.post("/", json={"jsonrpc": "2.0", "id": "abc-123", "method": "x"})
assert resp.json()["id"] == "abc-123"
def test_id_is_null_when_body_malformed():
"""Per JSON-RPC 2.0: id MUST be null when it can't be determined from
the request. Malformed bodies (non-JSON, empty, non-object) all map
to id=null."""
client = _build_app("reason")
resp = client.post("/", content=b"not json at all", headers={"content-type": "application/json"})
assert resp.status_code == 503
assert resp.json()["id"] is None
def test_reason_surfaces_in_error_data():
"""Operators read ``error.data`` to figure out what to fix. The
setup() exception string lands there verbatim."""
client = _build_app("RuntimeError: Neither OPENAI_API_KEY nor MINIMAX_API_KEY is set")
resp = client.post("/", json={"jsonrpc": "2.0", "id": 1, "method": "x"})
assert resp.json()["error"]["data"] == (
"RuntimeError: Neither OPENAI_API_KEY nor MINIMAX_API_KEY is set"
)
def test_none_reason_falls_back_to_generic_message():
"""If the adapter raised but we couldn't capture a reason, give the
operator a hint where to look (still better than a stuck-booting
workspace with no log line)."""
client = _build_app(None)
resp = client.post("/", json={"jsonrpc": "2.0", "id": 1, "method": "x"})
assert resp.json()["error"]["data"] == "adapter.setup() failed"
def test_array_body_does_not_crash_id_extraction():
"""JSON-RPC supports batch (array) requests. We don't currently
support batch in the runtime, but the handler shouldn't crash on a
batch body — it should just respond with id=null and the same -32603
so the client sees a clear error instead of a 500."""
client = _build_app("reason")
resp = client.post("/", json=[{"jsonrpc": "2.0", "id": 1, "method": "x"}])
assert resp.status_code == 503
assert resp.json()["id"] is None
+45 -20
View File
@@ -225,8 +225,14 @@ def test_required_env_present_passes(tmp_path, monkeypatch):
assert not any(issue.title == "Required env" for issue in report.failures)
def test_required_env_missing_fails(tmp_path, monkeypatch):
"""When a required_env var is missing, preflight fails."""
def test_required_env_missing_warns_does_not_fail(tmp_path, monkeypatch):
"""When a required_env var is missing, preflight WARNS but does not
fail the boot. Pairs with PR #2756 (molecule-core): the workspace
binds /.well-known/agent-card.json regardless of credentials and
routes JSON-RPC to a -32603 'agent not configured' handler. Hard
failing here would crash before the not-configured path even loads,
leaving the workspace invisible — that's the failure mode that bit
codex/openclaw bench 25335853189 on 2026-05-04 even after PR #2756."""
monkeypatch.delenv("CLAUDE_CODE_OAUTH_TOKEN", raising=False)
config = make_config(
@@ -236,10 +242,13 @@ def test_required_env_missing_fails(tmp_path, monkeypatch):
report = run_preflight(config, str(tmp_path))
assert report.ok is False
assert report.ok is True
assert any(
issue.title == "Required env" and "CLAUDE_CODE_OAUTH_TOKEN" in issue.detail
for issue in report.failures
for issue in report.warnings
)
assert not any(
issue.title == "Required env" for issue in report.failures
)
@@ -257,8 +266,11 @@ def test_required_env_multiple_all_present_passes(tmp_path, monkeypatch):
assert report.ok is True
def test_required_env_multiple_one_missing_fails(tmp_path, monkeypatch):
"""If any required_env var is missing, preflight fails with that var named."""
def test_required_env_multiple_one_missing_warns(tmp_path, monkeypatch):
"""If any required_env var is missing, preflight warns with that var
named (and does NOT fail). The eventual setup() failure is what
actually surfaces to the user via the -32603 handler — preflight is
just a logging signal for operators inspecting boot logs."""
monkeypatch.setenv("API_KEY_A", "key-a")
monkeypatch.delenv("API_KEY_B", raising=False)
@@ -268,10 +280,10 @@ def test_required_env_multiple_one_missing_fails(tmp_path, monkeypatch):
report = run_preflight(config, str(tmp_path))
assert report.ok is False
assert report.ok is True
assert any(
issue.title == "Required env" and "API_KEY_B" in issue.detail
for issue in report.failures
for issue in report.warnings
)
@@ -317,8 +329,10 @@ def test_required_env_skipped_in_smoke_mode(tmp_path, monkeypatch):
)
def test_required_env_smoke_mode_off_still_fails(tmp_path, monkeypatch):
"""Sanity: smoke bypass is OFF when MOLECULE_SMOKE_MODE is unset."""
def test_required_env_smoke_mode_off_still_warns(tmp_path, monkeypatch):
"""Sanity: smoke bypass is OFF when MOLECULE_SMOKE_MODE is unset, but
the warning still fires (and preflight no longer hard-fails — see
test_required_env_missing_warns_does_not_fail for the rationale)."""
monkeypatch.delenv("HERMES_API_KEY", raising=False)
monkeypatch.delenv("MOLECULE_SMOKE_MODE", raising=False)
@@ -328,10 +342,13 @@ def test_required_env_smoke_mode_off_still_fails(tmp_path, monkeypatch):
report = run_preflight(config, str(tmp_path))
assert report.ok is False
assert report.ok is True
assert any(
issue.title == "Required env" and "HERMES_API_KEY" in issue.detail
for issue in report.failures
for issue in report.warnings
)
assert not any(
issue.title == "Required env" for issue in report.failures
)
@@ -383,10 +400,12 @@ def test_top_level_required_env_used_when_no_models_declared(tmp_path, monkeypat
report = run_preflight(config, str(tmp_path))
assert report.ok is False
# Missing required_env is now a warning (workspace boots in
# not-configured state); see test_required_env_missing_warns_does_not_fail.
assert report.ok is True
assert any(
issue.title == "Required env" and "CLAUDE_CODE_OAUTH_TOKEN" in issue.detail
for issue in report.failures
for issue in report.warnings
)
@@ -411,10 +430,10 @@ def test_top_level_used_when_picked_model_not_in_models_list(tmp_path, monkeypat
report = run_preflight(config, str(tmp_path))
assert report.ok is False
assert report.ok is True
assert any(
issue.title == "Required env" and "CLAUDE_CODE_OAUTH_TOKEN" in issue.detail
for issue in report.failures
for issue in report.warnings
)
@@ -526,8 +545,13 @@ def test_per_model_required_env_null_treated_as_empty_no_auth(tmp_path, monkeypa
# ---------- Legacy auth_token_file backward compat ----------
def test_legacy_auth_token_file_missing_no_env_fails(tmp_path, monkeypatch):
"""Legacy: missing auth_token_file with no env var should fail."""
def test_legacy_auth_token_file_missing_no_env_warns(tmp_path, monkeypatch):
"""Legacy: missing auth_token_file with no env var emits a warning,
not a hard failure. Same reasoning as
test_required_env_missing_warns_does_not_fail — adapter.setup() is
the authoritative auth check, preflight just surfaces the issue
early in the boot log. The workspace still binds /agent-card and
routes to the not-configured -32603 handler."""
monkeypatch.delenv("CLAUDE_CODE_OAUTH_TOKEN", raising=False)
config = make_config(
@@ -536,8 +560,9 @@ def test_legacy_auth_token_file_missing_no_env_fails(tmp_path, monkeypatch):
report = run_preflight(config, str(tmp_path))
assert report.ok is False
assert any(issue.title == "Auth token" for issue in report.failures)
assert report.ok is True
assert any(issue.title == "Auth token" for issue in report.warnings)
assert not any(issue.title == "Auth token" for issue in report.failures)
def test_legacy_auth_token_file_missing_but_auth_token_env_passes(tmp_path, monkeypatch):