Compare commits

..

17 Commits

Author SHA1 Message Date
Hongming Wang 314277769e Merge pull request #2758 from Molecule-AI/staging
staging → main: auto-promote 4f9e3fe
2026-05-04 10:53:03 -07:00
hongming e0b567e992 Merge pull request #2757 from Molecule-AI/fix/memory-v2-wiring-real-tests
Memory v2 wiring: replace decorative tests with real integration
2026-05-04 17:43:09 +00:00
Hongming Wang 707e4d7342 Memory v2 wiring: replace decorative tests with real integration
Self-review of #2755 found two tests that didn't actually exercise the
production code path:

- TestNamespaceCleanupFn_NamespaceFormat asserted
  "workspace:" + "abc-123" == "workspace:abc-123" — a compile-time
  invariant, not runtime behavior. Provided no protection if the closure
  in Bundle.NamespaceCleanupFn ever stopped using that prefix.

- TestNamespaceCleanupFn_FailureLogsButReturns built a *parallel*
  cleanup closure inline with errors.New, then invoked the parallel
  closure. The production closure was never exercised. A regression
  in NamespaceCleanupFn (e.g. forgetting the deferred recover, calling
  the plugin without nil-check) would still pass this test.

Replaced both with real integration:

- TestNamespaceCleanupFn_HitsPluginAtCorrectNamespace spins up
  httptest.Server, points MEMORY_PLUGIN_URL at it, calls Build(),
  invokes the production closure, and asserts the server actually
  saw DELETE /v1/namespaces/workspace:abc-123.

- TestNamespaceCleanupFn_PluginErrorDoesNotPanic exercises the
  failure path for real: server returns 500 on DELETE, closure must
  log and return without propagating. defer-recover is belt-and-
  suspenders since production calls this from a for-loop in
  workspace_crud.go that has no recover.

Couldn't ship with #2755 because the merge queue locks the branch
once enqueued. Following up now that #2755 is merged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 10:38:59 -07:00
Hongming Wang 4f9e3feece Merge pull request #2756 from Molecule-AI/fix/agent-card-decouple-from-setup
fix(runtime): decouple agent-card readiness from adapter.setup()
2026-05-04 17:32:02 +00:00
Hongming Wang 10752fe330 Merge pull request #2755 from Molecule-AI/fix/memory-v2-main-wiring
Memory v2 fixup CRITICAL: wire plugin from main.go (was fully dormant)
2026-05-04 17:31:01 +00:00
Hongming Wang 8f7122a9b6 Merge branch 'staging' into fix/agent-card-decouple-from-setup 2026-05-04 10:24:41 -07:00
Hongming Wang b3982035b3 Merge branch 'staging' into fix/memory-v2-main-wiring 2026-05-04 10:24:31 -07:00
Hongming Wang d1122f8d28 fix(build): register not_configured_handler in TOP_LEVEL_MODULES
The wheel-build drift gate caught the new module added in this PR —
without registering it, the published wheel would ship `import
not_configured_handler` un-rewritten, which would `ModuleNotFoundError`
at runtime under `molecule_runtime.main`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 10:24:02 -07:00
Hongming Wang 4b35d25d86 fix(runtime): decouple agent-card readiness from adapter.setup()
Today, if `adapter.setup()` raises (most often: an LLM credential is
missing/rotated), main.py crashes before the agent-card route is mounted.
start.sh restart-loops, /.well-known/agent-card.json never returns 200,
and the workspace is invisible to the bench/canvas — operators see
"stuck booting forever" with no clear error to act on.

The agent-card is a static capability advertisement (name, version,
skills, supported protocols). It doesn't need a working LLM. Coupling
its mount to setup() conflates *availability* ("am I up?") with
*configuration* ("can I actually answer?"). They're different concerns.

This change:
- Builds AgentCard from `config.skills` (static names from config.yaml)
  BEFORE adapter.setup(), so the route mounts independent of setup state.
- Wraps setup() + create_executor in try/except. On success, mounts
  the real DefaultRequestHandler with rich loaded_skills metadata
  swapped into the card in-place. On failure, mounts a JSON-RPC
  handler that returns -32603 "agent not configured" with the
  setup() exception in error.data.
- Heartbeat keeps running on misconfigured boots so the platform
  marks the workspace as reachable-but-misconfigured rather than
  crash-looping. Operators redeploy with corrected env without
  chasing a restart loop.
- initial_prompt and idle_loop are skipped on misconfigured boots —
  they self-fire to /, which would land in -32603 anyway, and the
  marker would consume on the first useless attempt.

Bench impact (RFC #388 strict <120s): codex/openclaw bench-time-outs
were the agent-card-never-returns-200 symptom. With this fix those
runtimes serve the card immediately on EC2 boot, so the bench
measures infrastructure cold-start (claude-code class: ~50–80s)
instead of credential-coupled boot.

Adds workspace/not_configured_handler.py (factory + module-level so
behavior is unit-testable; main.py is `# pragma: no cover`) and
workspace/tests/test_not_configured_handler.py (6 tests covering
status code, JSON-RPC envelope shape, id-echo, malformed-body
fallback, reason surfacing, batch-body safety).

All 1665 existing workspace tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 10:22:31 -07:00
Hongming Wang 46731729d4 Memory v2 fixup Critical: wire plugin from main.go (was fully dormant)
Caught during continued review: the entire v2 plugin system shipped
in PRs #2729-#2742 + #2744-#2751 was never actually invoked because
main.go and router.go don't construct the plugin client/resolver or
attach the WithMemoryV2 / WithNamespaceCleanup hooks.

Operators setting MEMORY_PLUGIN_URL=... saw zero behavior change
because nothing read it. Every fixup we shipped (idempotency, verify
mode, expires_at validation, audit JSON, namespace cleanup, O(N)
export, boot E2E) was also dormant for the same reason.

Root cause: when a multi-handler feature lands across many PRs, none
of them are individually responsible for wiring main.go — and the
master-task-tracking issue didn't gate-check that the wiring landed.
Add main.go integration to every multi-handler RFC checklist.

What ships:

  * internal/memory/wiring/wiring.go: new package that constructs the
    plugin client + resolver from MEMORY_PLUGIN_URL once. Returns nil
    when unset (preserves zero-config legacy behavior). Probes
    /v1/health at boot but doesn't fail-closed — the MCP layer's
    circuit breaker handles ongoing unavailability.

  * internal/memory/wiring/wiring_test.go: 6 tests covering the
    nil/non-nil bundle paths + the namespace-cleanup closure
    contract (nil-safe, format-stable, failure-tolerant).

  * cmd/server/main.go: imports memwiring, calls Build(db.DB) once
    after WorkspaceHandler creation, attaches WithNamespaceCleanup,
    threads the bundle through router.Setup.

  * internal/router/router.go: Setup signature gains *memwiring.Bundle
    param. Inside, attaches WithMemoryV2 to AdminMemoriesHandler and
    MCPHandler when the bundle is non-nil.

After this, the v2 plugin is reachable end-to-end:

  Operator sets MEMORY_PLUGIN_URL → main.Build instantiates client +
  resolver → WorkspaceHandler gets cleanup hook → router wires
  AdminMemoriesHandler + MCPHandler with WithMemoryV2 → MCP tool
  calls (commit_memory_v2, search_memory, etc.) actually do
  something → admin export/import respects MEMORY_V2_CUTOVER.

Prerequisite for #292 (staging verification) — without this, the
operator runbook's step 2 (set MEMORY_PLUGIN_URL, observe behavior)
silently no-ops.

Verified: all 9 affected test packages still green
(memory/{client,contract,e2e,namespace,pgplugin,wiring}, handlers,
router, plus the build).
2026-05-04 10:22:30 -07:00
Hongming Wang 6dc2d907a2 Merge pull request #2754 from Molecule-AI/auto-sync/main-849bc973
chore: sync main → staging (auto, ff to 849bc973)
2026-05-04 17:19:03 +00:00
molecule-ai[bot] 849bc97349 Merge pull request #2753 from Molecule-AI/staging
staging → main: auto-promote e13dcab
2026-05-04 17:08:11 +00:00
Hongming Wang e13dcab5e0 Merge pull request #2749 from Molecule-AI/fix/memory-v2-i3-export-on
Memory v2 fixup I3: admin export O(workspaces) → O(N_roots+1)
2026-05-04 16:49:43 +00:00
Hongming Wang 721010307c Merge pull request #2752 from Molecule-AI/auto-sync/main-73a949bb
chore: sync main → staging (auto, ff to 73a949bb)
2026-05-04 16:49:23 +00:00
Hongming Wang 9f47ecf86e Merge branch 'staging' into fix/memory-v2-i3-export-on 2026-05-04 09:44:37 -07:00
Hongming Wang ebc20794f3 fix(admin-memories): include each member's private namespace in export
ReadableNamespaces(rootID) returns {workspace:rootID, team:rootID,
org:rootID} — the workspace: namespace it surfaces is the root's only.
The I3 batching change resolved namespaces once per root which silently
dropped every child workspace's private memories from admin export
(workspace:childID never reached the plugin search).

Keep the per-root batching win for team:/org:/custom: namespaces;
inject each member's workspace:<id> + owner mapping explicitly so
coverage matches the legacy per-workspace iteration.

Cost stays at 1 SQL + N_roots resolver + 1 plugin search.

Test changes:
- New TestExport_IncludesEveryMembersPrivateNamespace uses a
  per-workspace resolver stub (mirrors real behaviour) and asserts
  every member's workspace:<id> reaches the plugin search AND that
  children's private memories appear in the response with correct
  owner attribution. Verified to FAIL on the pre-fix code.
- TestExport_BatchesPluginCallsByRoot updated to expect 5 namespaces
  (3 workspace + team + org) instead of 3 — it had pinned the buggy
  3-namespace behaviour.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 09:44:06 -07:00
Hongming Wang 9a64aeaa2c Memory v2 fixup I3: admin export O(workspaces) → O(N_roots+1)
Self-review #289. The previous exportViaPlugin ran one resolver CTE
walk + one plugin search PER WORKSPACE. For a 1000-workspace tenant
that's 1000× of each, mostly redundant — workspaces sharing a
team/org root see identical readable namespaces.

New strategy:
  1. Single SQL pass returns each workspace + its computed root_id
     via a recursive CTE (loadWorkspacesWithRoots).
  2. Group by root → unique tree count is typically << workspace
     count.
  3. Resolver runs ONCE per root (any member sees the same readable
     list).
  4. Build the union of all root namespaces; single plugin.Search
     call.
  5. Map each memory back to a workspace_name via pickOwnerForNamespace
     (workspace:<id> → matching member; team:* / org:* / custom:* →
     canonical first member of root group).

Net call cost: 1 SQL + N_roots resolver + 1 plugin call (vs
N_workspaces × resolver + N_workspaces × plugin in the old code).

Tests:
  * TestExport_BatchesPluginCallsByRoot pins the new behavior
    explicitly: 3 workspaces under 1 root → exactly 1 plugin search
    (was 3 with the old code).
  * TestPickOwnerForNamespace covers all five attribution cases:
    workspace:<id> match, workspace:<id> no-match-fallback, team:*,
    org:*, custom:* → first-member-of-root-group; plus empty-members
    fallback.
  * All 9 existing TestExport_* / TestImport_* / TestPickOwner /
    TestNamespaceKindFromLegacyScope / TestSkipImport / etc. tests
    remain green (verified with -run "Export").

The legacy DB path (when MEMORY_V2_CUTOVER unset) is unchanged.
2026-05-04 09:17:30 -07:00
10 changed files with 930 additions and 163 deletions
+1
View File
@@ -73,6 +73,7 @@ TOP_LEVEL_MODULES = {
"main",
"mcp_cli",
"molecule_ai_status",
"not_configured_handler",
"platform_auth",
"platform_inbound_auth",
"plugins",
+12 -1
View File
@@ -18,6 +18,7 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/imagewatch"
memwiring "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/wiring"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/router"
@@ -166,6 +167,16 @@ func main() {
wh.SetCPProvisioner(cpProv)
}
// Memory v2 plugin (RFC #2728): build the dependency bundle once
// here so all three handlers (MCPHandler, AdminMemoriesHandler,
// WorkspaceHandler) get the same plugin/resolver pair. memBundle
// is nil when MEMORY_PLUGIN_URL is unset — every consumer
// nil-checks before using.
memBundle := memwiring.Build(db.DB)
if memBundle != nil {
wh.WithNamespaceCleanup(memBundle.NamespaceCleanupFn())
}
// External-plugin env mutators — each plugin contributes 0+ mutators
// onto a shared registry. Order matters: gh-identity populates
// MOLECULE_AGENT_ROLE-derived attribution env vars that downstream
@@ -306,7 +317,7 @@ func main() {
cronSched.SetChannels(channelMgr)
// Router
r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr)
r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr, memBundle)
// HTTP server with graceful shutdown
srv := &http.Server{
@@ -2,6 +2,7 @@ package handlers
import (
"context"
"database/sql"
"log"
"net/http"
"os"
@@ -255,68 +256,185 @@ func (h *AdminMemoriesHandler) Import(c *gin.Context) {
// the legacy memoryExportEntry shape so existing tooling that consumes
// the export keeps working.
//
// Strategy: enumerate workspaces, ask the resolver for each one's
// readable namespaces, search each namespace once. Deduplicate by
// memory id (a single memory in team:X is visible to every workspace
// under root X — we want one row per memory, not N).
// Optimization (#289 fix): the previous implementation was O(workspaces)
// in BOTH resolver CTE walks AND plugin search calls. For a 1000-tenant
// org, that's 1000 × resolver + 1000 × HTTP, where most are redundant
// because workspaces sharing a team/org root see identical namespaces.
//
// New strategy:
// 1. Single SQL pass walks parent_id chains, returning each
// workspace's root_id alongside its name.
// 2. Group workspaces by root → unique tree count is typically <<
// workspace count.
// 3. Resolve namespaces ONCE per root (any workspace under that
// root produces the same readable list).
// 4. Build a UNION of namespaces across all roots; single plugin
// search call.
// 5. Map each memory back to a workspace_name via a namespace→ws
// lookup table built up from step 3.
//
// Net cost: 1 SQL + N_roots resolver calls + 1 plugin call (vs
// N_workspaces resolver + N_workspaces plugin in the old code).
func (h *AdminMemoriesHandler) exportViaPlugin(c *gin.Context, ctx context.Context) {
rows, err := db.DB.QueryContext(ctx, `SELECT id::text, name FROM workspaces ORDER BY created_at`)
// 1. One SQL pass: every workspace + its root id.
wsRows, err := loadWorkspacesWithRoots(ctx, db.DB)
if err != nil {
log.Printf("admin/memories/export (cutover): workspaces query: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "export query failed"})
return
}
defer rows.Close()
type wsRow struct{ ID, Name string }
var workspaces []wsRow
for rows.Next() {
var w wsRow
if err := rows.Scan(&w.ID, &w.Name); err != nil {
continue
}
workspaces = append(workspaces, w)
// 2. Group by root → list of workspaces.
rootToWorkspaces := make(map[string][]workspaceRow, len(wsRows))
for _, w := range wsRows {
rootToWorkspaces[w.RootID] = append(rootToWorkspaces[w.RootID], w)
}
seen := make(map[string]struct{})
memories := make([]memoryExportEntry, 0)
for _, w := range workspaces {
readable, err := h.resolver.ReadableNamespaces(ctx, w.ID)
// 3. Resolve team/org namespaces once per root, then add each
// member's private workspace:<id> namespace explicitly.
//
// IMPORTANT: ReadableNamespaces(rootID) returns
// {workspace:rootID, team:rootID, org:rootID}. Calling it once
// per root is enough for team:/org:/custom: (those are shared by
// every member of the root group), but the workspace: namespace
// it returns is rootID's only — child members' private
// workspace:<childID> namespaces would be silently dropped from
// the export. Inject each member's workspace:<id> below to keep
// coverage parity with the legacy per-workspace iteration.
nsToOwner := make(map[string]string) // namespace → workspace_name (first matching wins)
allNamespaces := make(map[string]struct{}) // union for plugin search
for rootID, members := range rootToWorkspaces {
readable, err := h.resolver.ReadableNamespaces(ctx, rootID)
if err != nil {
log.Printf("admin/memories/export (cutover) workspace=%s: resolve: %v", w.Name, err)
log.Printf("admin/memories/export (cutover) root=%s: resolve: %v", rootID, err)
continue
}
nsList := make([]string, len(readable))
for i, ns := range readable {
nsList[i] = ns.Name
}
if len(nsList) == 0 {
continue
}
resp, err := h.plugin.Search(ctx, contract.SearchRequest{Namespaces: nsList, Limit: 100})
if err != nil {
log.Printf("admin/memories/export (cutover) workspace=%s: plugin search: %v", w.Name, err)
continue
}
for _, m := range resp.Memories {
if _, dup := seen[m.ID]; dup {
// Collect non-workspace namespaces (team:/org:/custom:/...) from
// the root view; these are identical across every member.
for _, ns := range readable {
if strings.HasPrefix(ns.Name, "workspace:") {
continue
}
seen[m.ID] = struct{}{}
redacted, _ := redactSecrets(w.Name, m.Content)
memories = append(memories, memoryExportEntry{
ID: m.ID,
Content: redacted,
Scope: legacyScopeFromNamespace(m.Namespace),
Namespace: m.Namespace,
CreatedAt: m.CreatedAt,
WorkspaceName: w.Name,
})
allNamespaces[ns.Name] = struct{}{}
if _, alreadyMapped := nsToOwner[ns.Name]; alreadyMapped {
continue
}
if owner := pickOwnerForNamespace(ns.Name, members); owner != "" {
nsToOwner[ns.Name] = owner
}
}
// Inject each member's private workspace:<id> namespace + its
// owner. Children's private memories live in workspace:<childID>
// which the root-only resolve doesn't surface.
for _, m := range members {
ns := "workspace:" + m.ID
allNamespaces[ns] = struct{}{}
nsToOwner[ns] = m.Name
}
}
if len(allNamespaces) == 0 {
c.JSON(http.StatusOK, []memoryExportEntry{})
return
}
// 4. Single plugin search across the union.
nsList := make([]string, 0, len(allNamespaces))
for ns := range allNamespaces {
nsList = append(nsList, ns)
}
resp, err := h.plugin.Search(ctx, contract.SearchRequest{Namespaces: nsList, Limit: 100})
if err != nil {
log.Printf("admin/memories/export (cutover): plugin search: %v", err)
c.JSON(http.StatusOK, []memoryExportEntry{})
return
}
// 5. Map each memory to a workspace_name, redact, emit.
seen := make(map[string]struct{})
memories := make([]memoryExportEntry, 0, len(resp.Memories))
for _, m := range resp.Memories {
if _, dup := seen[m.ID]; dup {
continue
}
seen[m.ID] = struct{}{}
owner := nsToOwner[m.Namespace]
redacted, _ := redactSecrets(owner, m.Content)
memories = append(memories, memoryExportEntry{
ID: m.ID,
Content: redacted,
Scope: legacyScopeFromNamespace(m.Namespace),
Namespace: m.Namespace,
CreatedAt: m.CreatedAt,
WorkspaceName: owner,
})
}
c.JSON(http.StatusOK, memories)
}
// workspaceRow bundles the per-workspace fields the optimized export
// needs (id + name + root for grouping).
type workspaceRow struct {
ID string
Name string
RootID string
}
// loadWorkspacesWithRoots returns one row per workspace with its root
// id computed via a recursive CTE. Single SQL pass — replaces the
// previous N×ReadableNamespaces pattern that walked each tree
// independently.
func loadWorkspacesWithRoots(ctx context.Context, conn *sql.DB) ([]workspaceRow, error) {
rows, err := conn.QueryContext(ctx, `
WITH RECURSIVE chain AS (
SELECT id, parent_id, name, id AS root_id, 0 AS depth
FROM workspaces
WHERE parent_id IS NULL
UNION ALL
SELECT w.id, w.parent_id, w.name, c.root_id, c.depth + 1
FROM workspaces w
JOIN chain c ON w.parent_id = c.id
WHERE c.depth < 50
)
SELECT id::text, name, root_id::text FROM chain ORDER BY name
`)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]workspaceRow, 0)
for rows.Next() {
var w workspaceRow
if err := rows.Scan(&w.ID, &w.Name, &w.RootID); err != nil {
return nil, err
}
out = append(out, w)
}
return out, rows.Err()
}
// pickOwnerForNamespace returns the workspace_name to attribute a
// namespace to in the export. workspace:<id> namespaces map to the
// matching member; team:* / org:* / custom:* fall back to the first
// member of the root group (canonical owner).
func pickOwnerForNamespace(ns string, members []workspaceRow) string {
if strings.HasPrefix(ns, "workspace:") {
wantID := strings.TrimPrefix(ns, "workspace:")
for _, m := range members {
if m.ID == wantID {
return m.Name
}
}
}
// Non-workspace namespaces: attribute to first member of the root
// group. Stable because loadWorkspacesWithRoots returns ORDER BY
// name, so the same root group always picks the same owner.
if len(members) > 0 {
return members[0].Name
}
return ""
}
// importViaPlugin writes the entries through the plugin instead of
// directly to agent_memories. Workspaces are resolved by name like
// the legacy path. Scope→namespace mapping mirrors the PR-6 shim.
@@ -151,9 +151,9 @@ func TestExport_RoutesThroughPluginWhenCutoverActive(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1"))
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
@@ -196,10 +196,10 @@ func TestExport_DeduplicatesByMemoryID(t *testing.T) {
mock := installMockDB(t)
// Two workspaces, both will see the same team-shared memory.
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha").
AddRow("ws-2", "beta"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1").
AddRow("ws-2", "beta", "ws-2"))
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
@@ -225,9 +225,9 @@ func TestExport_DeduplicatesByMemoryID(t *testing.T) {
func TestExport_SkipsWorkspaceWhenResolverFails(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1"))
plugin := &stubAdminPlugin{}
resolver := &stubAdminResolver{err: errors.New("resolver dead")}
@@ -247,9 +247,9 @@ func TestExport_SkipsWorkspaceWhenResolverFails(t *testing.T) {
func TestExport_SkipsWorkspaceWhenPluginSearchFails(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1"))
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) {
@@ -271,7 +271,7 @@ func TestExport_SkipsWorkspaceWhenPluginSearchFails(t *testing.T) {
func TestExport_WorkspacesQueryFails(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnError(errors.New("db dead"))
plugin := &stubAdminPlugin{}
@@ -290,9 +290,9 @@ func TestExport_WorkspacesQueryFails(t *testing.T) {
func TestExport_EmptyReadable(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1"))
resolver := &stubAdminResolver{readable: []namespace.Namespace{}}
h := NewAdminMemoriesHandler().withMemoryV2APIs(&stubAdminPlugin{}, resolver)
@@ -312,9 +312,9 @@ func TestExport_EmptyReadable(t *testing.T) {
func TestExport_RedactsSecretsInPluginPath(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1"))
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) {
@@ -535,6 +535,202 @@ func TestImport_SkipsWhenResolverErrors(t *testing.T) {
}
}
// TestExport_BatchesPluginCallsByRoot pins the I3 fix: previously the
// export ran one resolver + one plugin search per workspace (N+1 in
// both); now it groups by root and runs one resolver + one plugin
// search per UNIQUE root.
//
// Setup: 3 workspaces under 1 root → 1 resolver call + 1 plugin call
// (was: 3 resolver + 3 plugin in the old code). The plugin search
// receives 5 namespaces: each member's workspace:<id> + team:root-1
// + org:root-1. (Children's workspace:<id> namespaces must be
// included or admin export silently drops their private memories.)
func TestExport_BatchesPluginCallsByRoot(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("root-1", "alpha", "root-1").
AddRow("child-1", "alpha-child", "root-1").
AddRow("child-2", "alpha-grandchild", "root-1"))
pluginSearchCount := 0
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
pluginSearchCount++
if len(body.Namespaces) != 5 {
t.Errorf("plugin search call %d: namespaces len = %d, want 5 (3 workspace + team + org); got %v", pluginSearchCount, len(body.Namespaces), body.Namespaces)
}
return &contract.SearchResponse{}, nil
},
}
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver())
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
h.Export(c)
if w.Code != http.StatusOK {
t.Errorf("code = %d body=%s", w.Code, w.Body.String())
}
if pluginSearchCount != 1 {
t.Errorf("plugin search called %d times, want 1 (was 3 with the old N+1 code)", pluginSearchCount)
}
}
// perWorkspaceResolver mimics the real resolver: ReadableNamespaces
// returns the SPECIFIC workspace's view (workspace:<that ID> +
// team:<root> + org:<root>), not a constant set. The legacy
// stubAdminResolver hides the I3 silent-drop bug by ignoring its
// workspace-id argument.
type perWorkspaceResolver map[string][]namespace.Namespace
func (r perWorkspaceResolver) ReadableNamespaces(_ context.Context, ws string) ([]namespace.Namespace, error) {
v, ok := r[ws]
if !ok {
return nil, errors.New("perWorkspaceResolver: unknown ws " + ws)
}
return v, nil
}
func (r perWorkspaceResolver) WritableNamespaces(_ context.Context, ws string) ([]namespace.Namespace, error) {
return r.ReadableNamespaces(nil, ws)
}
// TestExport_IncludesEveryMembersPrivateNamespace pins the I3 follow-up
// fix: when a root group has multiple members, the export must surface
// each member's workspace:<id> namespace, not just the root's. Before
// the fix, calling ReadableNamespaces(rootID) returned only
// workspace:rootID + team:rootID + org:rootID — every child workspace's
// private memories were silently dropped from admin export.
func TestExport_IncludesEveryMembersPrivateNamespace(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("root-1", "alpha", "root-1").
AddRow("child-1", "alpha-child", "root-1").
AddRow("child-2", "alpha-grandchild", "root-1"))
resolver := perWorkspaceResolver{
"root-1": {
{Name: "workspace:root-1", Kind: contract.NamespaceKindWorkspace, Writable: true},
{Name: "team:root-1", Kind: contract.NamespaceKindTeam, Writable: true},
{Name: "org:root-1", Kind: contract.NamespaceKindOrg, Writable: true},
},
"child-1": {
{Name: "workspace:child-1", Kind: contract.NamespaceKindWorkspace, Writable: true},
{Name: "team:root-1", Kind: contract.NamespaceKindTeam, Writable: true},
{Name: "org:root-1", Kind: contract.NamespaceKindOrg, Writable: true},
},
"child-2": {
{Name: "workspace:child-2", Kind: contract.NamespaceKindWorkspace, Writable: true},
{Name: "team:root-1", Kind: contract.NamespaceKindTeam, Writable: true},
{Name: "org:root-1", Kind: contract.NamespaceKindOrg, Writable: true},
},
}
var passedNamespaces []string
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
passedNamespaces = append(passedNamespaces, body.Namespaces...)
return &contract.SearchResponse{Memories: []contract.Memory{
{ID: "m-root", Namespace: "workspace:root-1", Content: "root private", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
{ID: "m-child1", Namespace: "workspace:child-1", Content: "child-1 private", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
{ID: "m-child2", Namespace: "workspace:child-2", Content: "child-2 private", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
{ID: "m-team", Namespace: "team:root-1", Content: "shared team", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
}}, nil
},
}
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, resolver)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
h.Export(c)
if w.Code != http.StatusOK {
t.Fatalf("code = %d body=%s", w.Code, w.Body.String())
}
// Every member's private namespace must reach the plugin search.
want := []string{"workspace:root-1", "workspace:child-1", "workspace:child-2", "team:root-1", "org:root-1"}
got := make(map[string]bool, len(passedNamespaces))
for _, ns := range passedNamespaces {
got[ns] = true
}
for _, w := range want {
if !got[w] {
t.Errorf("plugin search missing namespace %q (got %v)", w, passedNamespaces)
}
}
if len(passedNamespaces) != 5 {
t.Errorf("plugin search namespace count = %d, want 5 (3 workspace + team + org)", len(passedNamespaces))
}
// Children's private memories must appear in the export, attributed
// to the right workspace_name.
var entries []memoryExportEntry
if err := json.Unmarshal(w.Body.Bytes(), &entries); err != nil {
t.Fatalf("unmarshal: %v", err)
}
byID := map[string]memoryExportEntry{}
for _, e := range entries {
byID[e.ID] = e
}
for _, exp := range []struct{ id, ns, owner string }{
{"m-root", "workspace:root-1", "alpha"},
{"m-child1", "workspace:child-1", "alpha-child"},
{"m-child2", "workspace:child-2", "alpha-grandchild"},
} {
e, ok := byID[exp.id]
if !ok {
t.Errorf("export missing memory %s — children's private memories silently dropped", exp.id)
continue
}
if e.Namespace != exp.ns {
t.Errorf("memory %s namespace = %q, want %q", exp.id, e.Namespace, exp.ns)
}
if e.WorkspaceName != exp.owner {
t.Errorf("memory %s owner = %q, want %q", exp.id, e.WorkspaceName, exp.owner)
}
}
}
// TestPickOwnerForNamespace covers the namespace→workspace_name
// attribution helper introduced in I3.
func TestPickOwnerForNamespace(t *testing.T) {
members := []workspaceRow{
{ID: "root-1", Name: "alpha", RootID: "root-1"},
{ID: "child-1", Name: "alpha-child", RootID: "root-1"},
}
cases := []struct {
name string
ns string
want string
}{
{"workspace ns matches member id", "workspace:child-1", "alpha-child"},
{"workspace ns no match → first", "workspace:foreign", "alpha"},
{"team ns → first member of root group", "team:root-1", "alpha"},
{"org ns → first member", "org:root-1", "alpha"},
{"custom ns → first member", "custom:foo", "alpha"},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := pickOwnerForNamespace(tc.ns, members); got != tc.want {
t.Errorf("pickOwnerForNamespace(%q) = %q, want %q", tc.ns, got, tc.want)
}
})
}
if got := pickOwnerForNamespace("workspace:abc", nil); got != "" {
t.Errorf("empty members must return \"\", got %q", got)
}
}
// --- Helper functions ---
func TestLegacyScopeFromNamespace(t *testing.T) {
@@ -0,0 +1,81 @@
// Package wiring constructs the v2 memory plugin dependency bundle
// at boot time so handlers can opt into the plugin path uniformly.
//
// The bundle is nil-safe: when MEMORY_PLUGIN_URL is unset, Build
// returns (nil, nil) so callers can detect "v2 not configured" with
// a single nil check instead of plumbing a feature flag through
// every handler.
//
// This package exists because the v2 plugin client + namespace
// resolver are needed by THREE different handler types (MCPHandler,
// AdminMemoriesHandler, WorkspaceHandler) constructed in two
// different files (main.go for WorkspaceHandler, router.go for the
// other two). A central Build() avoids each construction site
// re-implementing the env-var read + plugin instantiation.
package wiring
import (
"context"
"database/sql"
"log"
"os"
"time"
mclient "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/client"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/namespace"
)
// Bundle is the v2 dependency bundle. Pass it through Setup as a
// single param; handlers extract what they need.
//
// nil receiver = "v2 not configured" — every method on Bundle
// nil-checks itself, so callers can pass a nil Bundle through the
// hot path without conditional spread.
type Bundle struct {
Plugin *mclient.Client
Resolver *namespace.Resolver
}
// Build returns a wired Bundle if MEMORY_PLUGIN_URL is set, else nil.
//
// It probes /v1/health at boot — when the plugin is unreachable, we
// log a warning but STILL return the bundle. The MCP layer's
// circuit breaker handles ongoing unavailability; we don't want to
// block workspace-server boot just because the memory plugin is
// briefly down.
func Build(db *sql.DB) *Bundle {
if os.Getenv("MEMORY_PLUGIN_URL") == "" {
return nil
}
plugin := mclient.New(mclient.Config{})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if hr, err := plugin.Boot(ctx); err != nil {
log.Printf("memory-plugin: /v1/health probe failed (will retry per-request): %v", err)
} else {
log.Printf("memory-plugin: ok, capabilities=%v", hr.Capabilities)
}
return &Bundle{
Plugin: plugin,
Resolver: namespace.New(db),
}
}
// NamespaceCleanupFn returns a closure suitable for
// WorkspaceHandler.WithNamespaceCleanup. nil when bundle is nil so
// callers can pass it through unconditionally.
//
// The closure runs best-effort: errors are logged, never propagated.
// A misbehaving plugin must not block workspace purges.
func (b *Bundle) NamespaceCleanupFn() func(context.Context, string) {
if b == nil || b.Plugin == nil {
return nil
}
return func(ctx context.Context, workspaceID string) {
ns := "workspace:" + workspaceID
if err := b.Plugin.DeleteNamespace(ctx, ns); err != nil {
log.Printf("memory-plugin: namespace cleanup failed (workspace=%s ns=%s): %v",
workspaceID, ns, err)
}
}
}
@@ -0,0 +1,160 @@
package wiring
import (
"context"
"net/http"
"net/http/httptest"
"sync"
"testing"
"github.com/DATA-DOG/go-sqlmock"
)
// TestBuild_NilWhenURLUnset pins the operator-friendly default: no
// MEMORY_PLUGIN_URL → nil bundle → all callers fall through to legacy
// behavior with no surprises.
func TestBuild_NilWhenURLUnset(t *testing.T) {
t.Setenv("MEMORY_PLUGIN_URL", "")
if got := Build(nil); got != nil {
t.Errorf("expected nil bundle when MEMORY_PLUGIN_URL unset, got %+v", got)
}
}
// TestBuild_NonNilWhenURLSet pins that the bundle is constructed even
// when the plugin's /v1/health probe fails — we don't want workspace-
// server boot to depend on a transiently unavailable plugin.
func TestBuild_NonNilWhenURLSet(t *testing.T) {
t.Setenv("MEMORY_PLUGIN_URL", "http://127.0.0.1:1") // bogus port = probe will fail
db, _, _ := sqlmock.New()
defer db.Close()
bundle := Build(db)
if bundle == nil {
t.Fatal("expected non-nil bundle when MEMORY_PLUGIN_URL is set")
}
if bundle.Plugin == nil {
t.Error("Plugin must be wired")
}
if bundle.Resolver == nil {
t.Error("Resolver must be wired")
}
}
// TestNamespaceCleanupFn_NilBundle pins the nil-safe path: callers
// that pass `bundle.NamespaceCleanupFn()` unconditionally don't need
// to nil-check the bundle separately.
func TestNamespaceCleanupFn_NilBundle(t *testing.T) {
var b *Bundle // nil receiver
if got := b.NamespaceCleanupFn(); got != nil {
t.Errorf("nil bundle must return nil cleanup fn, got non-nil")
}
}
// TestNamespaceCleanupFn_NilPlugin: bundle exists but plugin is nil —
// also returns nil cleanup fn (defensive in case of partial wiring).
func TestNamespaceCleanupFn_NilPlugin(t *testing.T) {
b := &Bundle{} // both fields nil
if got := b.NamespaceCleanupFn(); got != nil {
t.Errorf("bundle with nil plugin must return nil cleanup fn")
}
}
// TestNamespaceCleanupFn_HitsPluginAtCorrectNamespace is the real
// integration gate for the closure: it spins up an httptest.Server
// that records every DELETE request, points MEMORY_PLUGIN_URL at it,
// runs Build(), then invokes the returned closure and asserts the
// server saw `DELETE /v1/namespaces/workspace:<id>`.
//
// This replaces two earlier tests that exercised parallel
// implementations rather than the production closure (caught in
// self-review).
func TestNamespaceCleanupFn_HitsPluginAtCorrectNamespace(t *testing.T) {
var (
mu sync.Mutex
gotPaths []string
gotMethods []string
)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mu.Lock()
gotPaths = append(gotPaths, r.URL.Path)
gotMethods = append(gotMethods, r.Method)
mu.Unlock()
switch r.URL.Path {
case "/v1/health":
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"status":"ok","version":"1.0.0","capabilities":[]}`))
default:
w.WriteHeader(http.StatusNoContent)
}
}))
t.Cleanup(srv.Close)
t.Setenv("MEMORY_PLUGIN_URL", srv.URL)
db, _, _ := sqlmock.New()
defer db.Close()
bundle := Build(db)
if bundle == nil {
t.Fatal("Build returned nil with MEMORY_PLUGIN_URL set")
}
cleanup := bundle.NamespaceCleanupFn()
if cleanup == nil {
t.Fatal("NamespaceCleanupFn returned nil with non-nil Plugin")
}
cleanup(context.Background(), "abc-123")
mu.Lock()
defer mu.Unlock()
// Two requests expected: /v1/health probe at Boot + DELETE for cleanup.
foundDelete := false
for i, p := range gotPaths {
if gotMethods[i] == "DELETE" && p == "/v1/namespaces/workspace:abc-123" {
foundDelete = true
}
}
if !foundDelete {
t.Errorf("expected DELETE /v1/namespaces/workspace:abc-123, got %v",
pathsAndMethods(gotPaths, gotMethods))
}
}
// TestNamespaceCleanupFn_PluginErrorDoesNotPanic exercises the failure
// path for real: server returns 500 on DELETE; the closure must log
// and return without propagating. Replaces the parallel-implementation
// version that didn't actually test the production code.
func TestNamespaceCleanupFn_PluginErrorDoesNotPanic(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/v1/health" {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"status":"ok","version":"1.0.0","capabilities":[]}`))
return
}
http.Error(w, "boom", http.StatusInternalServerError)
}))
t.Cleanup(srv.Close)
t.Setenv("MEMORY_PLUGIN_URL", srv.URL)
db, _, _ := sqlmock.New()
defer db.Close()
bundle := Build(db)
cleanup := bundle.NamespaceCleanupFn()
// Must not panic, must not propagate the 500. Recovering with
// defer is belt-and-suspenders — production calls this from a
// for-loop in workspace_crud.go that has no recover.
defer func() {
if r := recover(); r != nil {
t.Errorf("cleanup panicked on plugin 500: %v", r)
}
}()
cleanup(context.Background(), "ws-1")
}
func pathsAndMethods(paths, methods []string) []string {
out := make([]string, len(paths))
for i := range paths {
out[i] = methods[i] + " " + paths[i]
}
return out
}
+8 -1
View File
@@ -13,6 +13,7 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers"
memwiring "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/wiring"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
@@ -23,7 +24,7 @@ import (
"github.com/gin-gonic/gin"
)
func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provisioner, platformURL, configsDir string, wh *handlers.WorkspaceHandler, channelMgr *channels.Manager) *gin.Engine {
func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provisioner, platformURL, configsDir string, wh *handlers.WorkspaceHandler, channelMgr *channels.Manager, memBundle *memwiring.Bundle) *gin.Engine {
r := gin.Default()
// Issue #179 — trust no reverse-proxy headers. Without this call Gin's
@@ -150,6 +151,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// F1084/#1131: Export applies redactSecrets before returning content.
// F1085/#1132: Import applies redactSecrets before persisting content.)
adminMemH := handlers.NewAdminMemoriesHandler()
if memBundle != nil {
adminMemH.WithMemoryV2(memBundle.Plugin, memBundle.Resolver)
}
wsAdmin.GET("/admin/memories/export", adminMemH.Export)
wsAdmin.POST("/admin/memories/import", adminMemH.Import)
}
@@ -370,6 +374,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// C3: commit_memory/recall_memory with scope=GLOBAL → permission error;
// send_message_to_user excluded unless MOLECULE_MCP_ALLOW_SEND_MESSAGE=true.
mcpH := handlers.NewMCPHandler(db.DB, broadcaster)
if memBundle != nil {
mcpH.WithMemoryV2(memBundle.Plugin, memBundle.Resolver)
}
mcpRl := middleware.NewMCPRateLimiter(120, time.Minute, context.Background())
wsAuth.GET("/mcp/stream", mcpRl.Middleware(), mcpH.Stream)
wsAuth.POST("/mcp", mcpRl.Middleware(), mcpH.Call)
+149 -98
View File
@@ -148,62 +148,15 @@ async def main(): # pragma: no cover
heartbeat=heartbeat,
)
# 5. Setup adapter and create executor
# If setup fails, ensure heartbeat is stopped to prevent resource leak
try:
await adapter.setup(adapter_config)
executor = await adapter.create_executor(adapter_config)
# 5a. Boot-smoke short-circuit (issue #2275): if MOLECULE_SMOKE_MODE
# is set, exercise the executor's full import tree by calling
# execute() once with stub deps + a short timeout. Skips platform
# registration + uvicorn entirely. Returns process exit code.
from smoke_mode import is_smoke_mode, run_executor_smoke
if is_smoke_mode():
exit_code = await run_executor_smoke(executor)
if hasattr(heartbeat, "stop"):
try:
await heartbeat.stop()
except Exception: # noqa: BLE001
pass
raise SystemExit(exit_code)
# 5b. Restore from pre-stop snapshot if one exists (GH#1391).
# The snapshot is scrubbed before being written, so secrets are
# already redacted — restore_state must not re-expose them.
from lib.pre_stop import read_snapshot
snapshot = read_snapshot()
if snapshot:
try:
adapter.restore_state(snapshot)
print(
f"Pre-stop snapshot restored: task={snapshot.get('current_task', '')!r}, "
f"uptime={snapshot.get('uptime_seconds', 0)}s"
)
except Exception as restore_err:
print(f"Warning: snapshot restore failed (continuing): {restore_err}")
except Exception:
# heartbeat hasn't started yet but may have async tasks pending
if hasattr(heartbeat, "stop"):
try:
await heartbeat.stop()
except Exception:
pass
raise
# 5.5. Initialise Temporal durable execution wrapper (optional)
# Connects to TEMPORAL_HOST (default: localhost:7233) and starts a
# co-located Temporal worker as a background asyncio task.
# No-op with a warning log if Temporal is unreachable or temporalio
# is not installed — all tasks fall back to direct execution transparently.
from builtin_tools.temporal_workflow import create_wrapper as _create_temporal_wrapper
temporal_wrapper = _create_temporal_wrapper()
await temporal_wrapper.start()
# Get loaded skills for agent card (adapter may have populated them)
loaded_skills = getattr(adapter, "loaded_skills", [])
# 6. Build Agent Card
# 5. Build the AgentCard *before* adapter.setup() so /.well-known/agent-card.json
# is reachable as soon as uvicorn binds, regardless of whether the adapter
# has working LLM credentials. Decoupling readiness ("is the workspace up?")
# from configuration ("can it actually answer?") means a workspace with a
# missing/rotated key stays REACHABLE — canvas can render a clear
# "agent not configured" error instead of "stuck booting forever," and
# operators can deprovision/redeploy normally. Skills built from
# config.skills (static names from config.yaml) up front; richer metadata
# from the adapter's loaded_skills swaps in below if setup() succeeds.
machine_ip = os.environ.get("HOSTNAME", get_machine_ip())
workspace_url = f"http://{machine_ip}:{port}"
@@ -237,20 +190,96 @@ async def main(): # pragma: no cover
# always available and tasks/get accepts historyLength via
# apply_history_length(). Don't add this kwarg back.
),
# Static skill stubs from config.yaml; replaced with rich metadata
# below if adapter.setup() loads skills successfully.
skills=[
AgentSkill(
id=skill.metadata.id,
name=skill.metadata.name,
description=skill.metadata.description,
tags=skill.metadata.tags,
examples=skill.metadata.examples,
)
for skill in loaded_skills
AgentSkill(id=name, name=name, description=name, tags=[], examples=[])
for name in (config.skills or [])
],
default_input_modes=["text/plain", "application/json"],
default_output_modes=["text/plain", "application/json"],
)
# 6. Setup adapter and create executor
# On failure: log + continue. The card route stays mounted (above);
# the JSON-RPC route below returns -32603 "agent not configured" until
# the operator fixes credentials and redeploys. Heartbeat keeps running
# so the platform sees the workspace as reachable-but-misconfigured
# rather than crash-looping.
adapter_ready = False
adapter_error: str | None = None
executor = None
try:
await adapter.setup(adapter_config)
executor = await adapter.create_executor(adapter_config)
# 6a. Boot-smoke short-circuit (issue #2275): if MOLECULE_SMOKE_MODE
# is set, exercise the executor's full import tree by calling
# execute() once with stub deps + a short timeout. Skips platform
# registration + uvicorn entirely. Returns process exit code.
from smoke_mode import is_smoke_mode, run_executor_smoke
if is_smoke_mode():
exit_code = await run_executor_smoke(executor)
if hasattr(heartbeat, "stop"):
try:
await heartbeat.stop()
except Exception: # noqa: BLE001
pass
raise SystemExit(exit_code)
# 6b. Restore from pre-stop snapshot if one exists (GH#1391).
# The snapshot is scrubbed before being written, so secrets are
# already redacted — restore_state must not re-expose them.
from lib.pre_stop import read_snapshot
snapshot = read_snapshot()
if snapshot:
try:
adapter.restore_state(snapshot)
print(
f"Pre-stop snapshot restored: task={snapshot.get('current_task', '')!r}, "
f"uptime={snapshot.get('uptime_seconds', 0)}s"
)
except Exception as restore_err:
print(f"Warning: snapshot restore failed (continuing): {restore_err}")
# 6c. Swap rich skill metadata into the card now that setup() loaded
# them. In-place mutation: a2a-sdk's create_agent_card_routes serialises
# the card on each request, so the route mounted below sees the update.
loaded_skills = getattr(adapter, "loaded_skills", None)
if loaded_skills:
agent_card.skills = [
AgentSkill(
id=skill.metadata.id,
name=skill.metadata.name,
description=skill.metadata.description,
tags=skill.metadata.tags,
examples=skill.metadata.examples,
)
for skill in loaded_skills
]
adapter_ready = True
except SystemExit:
# Smoke-mode exit signal — propagate untouched.
raise
except Exception as setup_err: # noqa: BLE001
adapter_error = f"{type(setup_err).__name__}: {setup_err}"
print(
f"WARNING: adapter.setup() failed — workspace will serve agent-card "
f"but JSON-RPC will return -32603 until configuration is fixed. "
f"Reason: {adapter_error}",
flush=True,
)
# Heartbeat keeps running so the platform marks the workspace as
# reachable-but-misconfigured. Operators can then redeploy with the
# correct env vars without having to chase a crash-loop.
# 6.5. Initialise Temporal durable execution wrapper (optional). Only
# meaningful when an executor exists; skipped on misconfigured boots.
if adapter_ready:
from builtin_tools.temporal_workflow import create_wrapper as _create_temporal_wrapper
temporal_wrapper = _create_temporal_wrapper()
await temporal_wrapper.start()
# 7. Wrap in A2A.
#
# Regression fix (#204): PR #198 tried to wire push_config_store +
@@ -262,42 +291,51 @@ async def main(): # pragma: no cover
# in the AgentCard below is still advertised via AgentCapabilities so
# clients know we COULD do pushes; actually implementing them requires
# a concrete sender subclass, tracked as a Phase-H follow-up to #175.
handler = DefaultRequestHandler(
agent_executor=executor,
task_store=InMemoryTaskStore(),
# a2a-sdk 1.x added agent_card as a required positional/keyword
# argument — it's used internally for capability dispatch (e.g.
# routing tasks/get historyLength based on the card's protocol
# version). Pass the same agent_card we registered with the
# platform so the handler's capability surface matches what the
# AgentCard advertises.
agent_card=agent_card,
)
# v1: replace A2AStarletteApplication with Starlette route factory.
# rpc_url is required in a2a-sdk 1.x (was implicit at root in 0.x).
# Use '/' to match a2a.utils.constants.DEFAULT_RPC_URL — that's also
# what the platform's a2a_proxy.go POSTs to (it forwards to the
# workspace's URL without appending a path). Card endpoint stays at
# the well-known path /.well-known/agent-card.json (handled by
# create_agent_card_routes default).
routes = []
routes.extend(create_agent_card_routes(agent_card))
# enable_v0_3_compat=True is the JSON-RPC wire-compat path: clients
# using v0.3-shaped payloads (`"role": "user"` lowercase + camelCase
# Pydantic field names) can talk to us without re-deploying. Outbound
# JSON-RPC wire payloads MUST also use v0.3 shape — the v0.3 compat
# adapter at /usr/local/lib/python3.11/site-packages/a2a/compat/v0_3/
# validates against Pydantic Role enum (`agent`|`user`) and rejects
# the protobuf-style `ROLE_USER` enum names with JSON-RPC -32600
# (Invalid Request). Native v1.x types (a2a.types.Role.ROLE_AGENT)
# are only for code that constructs Message objects in-process and
# hands them to the SDK, which serialises them correctly for the
# outbound wire format.
routes.extend(create_jsonrpc_routes(request_handler=handler, rpc_url="/", enable_v0_3_compat=True))
if adapter_ready:
handler = DefaultRequestHandler(
agent_executor=executor,
task_store=InMemoryTaskStore(),
# a2a-sdk 1.x added agent_card as a required positional/keyword
# argument — it's used internally for capability dispatch (e.g.
# routing tasks/get historyLength based on the card's protocol
# version). Pass the same agent_card we registered with the
# platform so the handler's capability surface matches what the
# AgentCard advertises.
agent_card=agent_card,
)
# v1: replace A2AStarletteApplication with Starlette route factory.
# rpc_url is required in a2a-sdk 1.x (was implicit at root in 0.x).
# Use '/' to match a2a.utils.constants.DEFAULT_RPC_URL — that's also
# what the platform's a2a_proxy.go POSTs to (it forwards to the
# workspace's URL without appending a path). Card endpoint stays at
# the well-known path /.well-known/agent-card.json (handled by
# create_agent_card_routes default).
# enable_v0_3_compat=True is the JSON-RPC wire-compat path: clients
# using v0.3-shaped payloads (`"role": "user"` lowercase + camelCase
# Pydantic field names) can talk to us without re-deploying.
routes.extend(create_jsonrpc_routes(request_handler=handler, rpc_url="/", enable_v0_3_compat=True))
else:
# Misconfigured: serve the card but reject JSON-RPC with -32603 so
# canvas surfaces a useful "agent not configured: <reason>" instead
# of letting requests time out. Handler factory is in its own module
# so the behavior is unit-testable (workspace/tests/test_not_configured_handler.py).
from starlette.routing import Route
from not_configured_handler import make_not_configured_handler
routes.append(
Route("/", make_not_configured_handler(adapter_error), methods=["POST"])
)
app = Starlette(routes=routes)
# 8. Register with platform
# When adapter.setup() failed, advertise via configuration_status so
# the platform/canvas can render "configured: false, reason: …" instead
# of a confused "ready but silent" state.
loaded_skills = getattr(adapter, "loaded_skills", None) or []
agent_card_dict = {
"name": config.name,
"description": config.description,
@@ -311,11 +349,16 @@ async def main(): # pragma: no cover
"tags": s.metadata.tags,
}
for s in loaded_skills
] if adapter_ready else [
{"id": n, "name": n, "description": n, "tags": []}
for n in (config.skills or [])
],
"capabilities": {
"streaming": config.a2a.streaming,
"pushNotifications": config.a2a.push_notifications,
},
"configuration_status": "ready" if adapter_ready else "not_configured",
**({"configuration_error": adapter_error} if adapter_error else {}),
}
async with httpx.AsyncClient(timeout=10.0) as client:
@@ -364,7 +407,9 @@ async def main(): # pragma: no cover
# 9b. Start skills hot-reload watcher (background task)
# When a skill file changes the watcher reloads the skill module and calls
# back into the adapter so the next A2A request uses the updated tools.
if config.skills:
# Skipped on misconfigured boots — adapter has no executor / tool registry
# to swap into, so reloading skills would NPE on the agent rebuild path.
if adapter_ready and config.skills:
try:
from skill_loader.watcher import SkillsWatcher
@@ -495,9 +540,13 @@ async def main(): # pragma: no cover
# 10b. Schedule initial_prompt self-message after server is ready.
# Only runs on first boot — creates a marker file to prevent re-execution on restart.
# Skipped on misconfigured boots: the self-message would route through the
# platform back to /, hit the -32603 not-configured handler, and consume
# the marker for a fire that can't actually run. Wait until the operator
# fixes credentials and the workspace redeploys with adapter_ready=True.
initial_prompt_task = None
initial_prompt_marker = resolve_initial_prompt_marker(config_path)
if config.initial_prompt and not os.path.exists(initial_prompt_marker):
if adapter_ready and config.initial_prompt and not os.path.exists(initial_prompt_marker):
# Write the marker UP FRONT (#71): if the prompt later crashes or
# times out, we do NOT replay on next boot — that created a
# ProcessError cascade where every message kept crashing. Operators
@@ -615,7 +664,9 @@ async def main(): # pragma: no cover
# workspaces upgrade opt-in — set idle_prompt in org.yaml defaults or
# per-workspace to enable.
idle_loop_task = None
if config.idle_prompt:
# Skipped on misconfigured boots — the self-fire would route to the
# -32603 handler in a tight loop and consume cycles for no useful work.
if adapter_ready and config.idle_prompt:
# Idle-fire HTTP timeout. Kept tight relative to the fire cadence so a
# hung platform doesn't accumulate dangling requests — a fire that
# takes longer than the idle interval itself is almost certainly stuck.
+55
View File
@@ -0,0 +1,55 @@
"""Build a JSON-RPC handler that returns ``-32603 "agent not configured"``.
Used by the workspace runtime when ``adapter.setup()`` fails (most often
because an LLM credential is missing or rotated). Lets ``/.well-known/agent-card.json``
keep serving 200 — the workspace stays REACHABLE for canvas/operator
introspection — while message-send requests get a clear, immediate
error instead of silently timing out.
Kept as its own module so the behavior is unit-testable without booting
the whole runtime (main.py is ``# pragma: no cover``).
"""
from __future__ import annotations
from typing import Awaitable, Callable
from starlette.requests import Request
from starlette.responses import JSONResponse
def make_not_configured_handler(
reason: str | None,
) -> Callable[[Request], Awaitable[JSONResponse]]:
"""Return a Starlette POST handler that always 503s with JSON-RPC -32603.
``reason`` is surfaced in the JSON-RPC ``error.data`` field so canvas
can render "agent not configured: <reason>" to the user. Pass the
stringified ``adapter.setup()`` exception. ``None`` falls back to a
generic "adapter.setup() failed".
The handler echoes the request's JSON-RPC ``id`` when present so a
well-behaved JSON-RPC client can correlate the error to its request.
Malformed bodies (non-JSON, missing id) get ``id: null`` per spec.
"""
fallback = reason or "adapter.setup() failed"
async def _handler(request: Request) -> JSONResponse:
try:
body = await request.json()
except Exception: # noqa: BLE001
body = {}
return JSONResponse(
{
"jsonrpc": "2.0",
"id": body.get("id") if isinstance(body, dict) else None,
"error": {
"code": -32603,
"message": "Internal error: agent not configured",
"data": fallback,
},
},
status_code=503,
)
return _handler
@@ -0,0 +1,87 @@
"""Tests for ``not_configured_handler`` — the JSON-RPC -32603 fallback the
runtime mounts when ``adapter.setup()`` fails.
Tests the behavior end-to-end via Starlette's TestClient so the JSON-RPC
wire shape (status 503, code -32603, id-echo) is exercised the same way
canvas would see it.
"""
from __future__ import annotations
import sys
from pathlib import Path
# Make workspace/ importable in test isolation — same pattern as the
# adjacent tests (test_smoke_mode.py, test_heartbeat.py).
WORKSPACE_DIR = Path(__file__).resolve().parents[1]
if str(WORKSPACE_DIR) not in sys.path:
sys.path.insert(0, str(WORKSPACE_DIR))
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.testclient import TestClient
from not_configured_handler import make_not_configured_handler
def _build_app(reason: str | None) -> TestClient:
handler = make_not_configured_handler(reason)
app = Starlette(routes=[Route("/", handler, methods=["POST"])])
return TestClient(app)
def test_returns_503_with_jsonrpc_error_envelope():
"""Status 503; body is a valid JSON-RPC 2.0 error envelope."""
client = _build_app("MINIMAX_API_KEY not set")
resp = client.post("/", json={"jsonrpc": "2.0", "id": 7, "method": "message/send"})
assert resp.status_code == 503
body = resp.json()
assert body["jsonrpc"] == "2.0"
assert body["error"]["code"] == -32603
assert body["error"]["message"] == "Internal error: agent not configured"
def test_echoes_request_id_when_present():
"""JSON-RPC clients correlate replies via id; the handler must echo it."""
client = _build_app("reason")
resp = client.post("/", json={"jsonrpc": "2.0", "id": "abc-123", "method": "x"})
assert resp.json()["id"] == "abc-123"
def test_id_is_null_when_body_malformed():
"""Per JSON-RPC 2.0: id MUST be null when it can't be determined from
the request. Malformed bodies (non-JSON, empty, non-object) all map
to id=null."""
client = _build_app("reason")
resp = client.post("/", content=b"not json at all", headers={"content-type": "application/json"})
assert resp.status_code == 503
assert resp.json()["id"] is None
def test_reason_surfaces_in_error_data():
"""Operators read ``error.data`` to figure out what to fix. The
setup() exception string lands there verbatim."""
client = _build_app("RuntimeError: Neither OPENAI_API_KEY nor MINIMAX_API_KEY is set")
resp = client.post("/", json={"jsonrpc": "2.0", "id": 1, "method": "x"})
assert resp.json()["error"]["data"] == (
"RuntimeError: Neither OPENAI_API_KEY nor MINIMAX_API_KEY is set"
)
def test_none_reason_falls_back_to_generic_message():
"""If the adapter raised but we couldn't capture a reason, give the
operator a hint where to look (still better than a stuck-booting
workspace with no log line)."""
client = _build_app(None)
resp = client.post("/", json={"jsonrpc": "2.0", "id": 1, "method": "x"})
assert resp.json()["error"]["data"] == "adapter.setup() failed"
def test_array_body_does_not_crash_id_extraction():
"""JSON-RPC supports batch (array) requests. We don't currently
support batch in the runtime, but the handler shouldn't crash on a
batch body — it should just respond with id=null and the same -32603
so the client sees a clear error instead of a 500."""
client = _build_app("reason")
resp = client.post("/", json=[{"jsonrpc": "2.0", "id": 1, "method": "x"}])
assert resp.status_code == 503
assert resp.json()["id"] is None