Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a06a6493c5 | |||
| 6e7ce2fc43 | |||
| eac4fda48a | |||
| 6cfe76b6dd | |||
| 1d29e9ea24 | |||
| a92beb5d49 | |||
| 8e754e6b28 | |||
| 8179ff77e9 | |||
| 6188c6ddf3 | |||
| f986444dbd |
+50
-11
@@ -145,10 +145,10 @@ jobs:
|
||||
# the diagnostic step with its own continue-on-error: true (line 203).
|
||||
# Flip confirmed by CI / Platform (Go) status = success on main HEAD 363905d3.
|
||||
continue-on-error: false
|
||||
# Job-level ceiling. The go test step below runs with a per-step 10m timeout;
|
||||
# this cap catches any step that leaks past that. Set well above 10m so
|
||||
# the per-step timeout is the active constraint.
|
||||
timeout-minutes: 15
|
||||
# mc#1099: cold runner needs ~45m for go test on cold disk I/O.
|
||||
# Job-level ceiling: go test 60m step + golangci-lint 45m step = 105m max.
|
||||
# Backstop: 120m.
|
||||
timeout-minutes: 120
|
||||
defaults:
|
||||
run:
|
||||
working-directory: workspace-server
|
||||
@@ -171,10 +171,45 @@ jobs:
|
||||
run: go vet ./...
|
||||
- if: always()
|
||||
name: Install golangci-lint
|
||||
run: go install github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.12.2
|
||||
# mc#1099: cold runner cannot reach github.com releases or proxy.golang.org
|
||||
# (hanging at ~5-6m before timing out). Test connectivity first; if
|
||||
# both sources fail, skip golangci-lint and rely on go vet.
|
||||
# continue-on-error: true prevents install failure from failing the job
|
||||
# (job-level continue-on-error: false).
|
||||
continue-on-error: true
|
||||
run: |
|
||||
set +e
|
||||
# Test proxy.golang.org connectivity (30s timeout)
|
||||
if curl -fsSL --connect-timeout 30 --max-time 60 "https://proxy.golang.org/github.com/golangci/golangci-lint/@v/list" -o /dev/null 2>/dev/null; then
|
||||
echo "proxy.golang.org reachable, installing via go install..."
|
||||
go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.64.5
|
||||
echo "go install exit: $?"
|
||||
else
|
||||
echo "proxy.golang.org unreachable, trying GitHub releases..."
|
||||
ARCH=$(go env GOARCH) && OS=$(go env GOOS) && VERSION=1.64.5
|
||||
if curl -fsSL --connect-timeout 30 --max-time 120 "https://github.com/golangci/golangci-lint/releases/download/v${VERSION}/golangci-lint-${VERSION}-${OS}-${ARCH}.tar.gz" -o /tmp/golangci-lint.tar.gz 2>/dev/null; then
|
||||
tar -xzf /tmp/golangci-lint.tar.gz -C /tmp
|
||||
install -m 755 /tmp/golangci-lint $(go env GOPATH)/bin/golangci-lint
|
||||
echo "GitHub binary installed"
|
||||
else
|
||||
echo "GitHub releases also unreachable — skipping golangci-lint (go vet is the safety net)"
|
||||
touch "$(go env GOPATH)/bin/golangci-lint.skip"
|
||||
fi
|
||||
fi
|
||||
- if: always()
|
||||
name: Run golangci-lint
|
||||
run: $(go env GOPATH)/bin/golangci-lint run --timeout 3m ./...
|
||||
# mc#1099: skip if binary unavailable; go vet already ran as safety net.
|
||||
# timeout: 45m — cold runner disk I/O makes linting slow. The command
|
||||
# --timeout 60m prevents a runaway linter from stalling the step.
|
||||
# continue-on-error: true so a missing binary doesn't fail the job.
|
||||
continue-on-error: true
|
||||
timeout-minutes: 45
|
||||
run: |
|
||||
if [ -f "$(go env GOPATH)/bin/golangci-lint.skip" ]; then
|
||||
echo "golangci-lint skipped (network unavailable on cold runner)"
|
||||
else
|
||||
golangci-lint run --config golangci-coldrunner.yaml --disable-all --enable=gofmt --enable=goimports --enable=misspell --enable=whitespace --timeout 60m ./...
|
||||
fi
|
||||
- if: always()
|
||||
name: Diagnostic — per-package verbose 60s
|
||||
run: |
|
||||
@@ -193,11 +228,15 @@ jobs:
|
||||
continue-on-error: true
|
||||
- if: always()
|
||||
name: Run tests with race detection and coverage
|
||||
# Explicit timeout: cold runner cache causes OOM kills at ~4m39s on the
|
||||
# full ./... suite with race detection + coverage. A 10m per-step timeout
|
||||
# lets the suite complete on cold cache (~5-7m) while failing cleanly
|
||||
# instead of OOM-killing. The job-level timeout (15m) is a backstop.
|
||||
run: go test -race -timeout 10m -coverprofile=coverage.out ./...
|
||||
# mc#1099: cold runner cache causes OOM kills at ~22m (slower disk I/O
|
||||
# than GitHub Actions). A 60m per-step timeout lets the suite complete
|
||||
# on cold cache (~45m) while failing cleanly instead of OOM-killing.
|
||||
# Warm runners finish in ~12m. Retry with -p 1 on OOM. Job-level
|
||||
# timeout (120m) is the backstop.
|
||||
timeout-minutes: 60
|
||||
run: |
|
||||
go test -race -timeout 60m -coverprofile=coverage.out ./... \
|
||||
|| go test -race -timeout 60m -coverprofile=coverage.out -p 1 ./...
|
||||
|
||||
- if: always()
|
||||
name: Per-file coverage report
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
# golangci-lint configuration for CI cold-runner use.
|
||||
# CLI flags --disable-all --enable=... take precedence over this file.
|
||||
# Only errcheck is disabled here to match .golangci.yaml defaults.
|
||||
linters:
|
||||
disable:
|
||||
- errcheck
|
||||
@@ -0,0 +1,160 @@
|
||||
package handlers
|
||||
|
||||
// Regression coverage for the POLL-mode arm of the canvas user-message
|
||||
// data-loss bug (internal#470 sibling — tracked on internal#471).
|
||||
//
|
||||
// Bug (reported 2026-05-16 by CTO Hongming): "in canvas i sometimes lose
|
||||
// my own message when i exit chat". The push-mode arm was fixed by
|
||||
// #1347 (persistUserMessageAtIngest — a SYNCHRONOUS, before-dispatch,
|
||||
// context.WithoutCancel INSERT). #1347's framing asserted "poll-mode
|
||||
// workspaces were never affected — logA2AReceiveQueued already persists
|
||||
// at ingest". That assertion is OVERSTATED.
|
||||
//
|
||||
// Hongming's tenant (slug `hongming`, org 2c940477-...) has 4 workspaces,
|
||||
// ALL runtime=external with empty URL → ALL delivery_mode=poll (proven
|
||||
// empirically: a benign A2A probe returns the synthetic
|
||||
// {"delivery_mode":"poll","status":"queued"} envelope for every one).
|
||||
// So his reported loss is the POLL path, NOT the push path #1347 fixes.
|
||||
//
|
||||
// Root cause (poll arm): the poll-mode short-circuit (a2a_proxy.go ~402)
|
||||
// calls logA2AReceiveQueued and then IMMEDIATELY returns the synthetic
|
||||
// 200 {status:"queued"} to the canvas. But logA2AReceiveQueued's durable
|
||||
// INSERT runs inside h.goAsync(...) — a DETACHED goroutine with NO
|
||||
// happens-before barrier against the HTTP response. The canvas sees 200
|
||||
// ("message accepted") while the activity_logs row may not yet be — and,
|
||||
// on a workspace-server restart / deploy / OOM / EC2 hibernation between
|
||||
// the 200 and the goroutine's commit, NEVER will be — durable. There is
|
||||
// also no fallback (unlike push-mode's legacy-INSERT fallback): a
|
||||
// swallowed LogActivity error loses the message with only a log line.
|
||||
// Chat-history reads activity_logs (postgres_store.go:165-187); a missing
|
||||
// row = message gone on reopen. That is exactly Hongming's symptom.
|
||||
//
|
||||
// Fix (parity with push-mode): the poll-mode ingest persist of the
|
||||
// canvas user message must be SYNCHRONOUS — committed before the queued
|
||||
// 200 is returned — on a context.WithoutCancel derived context, so a
|
||||
// client disconnect on chat-exit and a post-response restart cannot lose
|
||||
// it. Behavior is never worse than today (best-effort; a persist error
|
||||
// still returns queued).
|
||||
//
|
||||
// TEST DESIGN NOTE: sqlmock.ExpectationsWereMet() hangs indefinitely if
|
||||
// the expected query never fires. We use a select+default+time.After
|
||||
// pattern so the test FAILS fast (not hangs) when the production code
|
||||
// regresses to async (the INSERT never fires before handler returns),
|
||||
// while still returning promptly when all expectations are met. The
|
||||
// insertDelay is kept small (50ms) to minimise suite-level timing
|
||||
// impact under -race detection, where mock delays are amplified by
|
||||
// the instrumenter's goroutine overhead.
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// TestProxyA2A_PollMode_PersistsUserMessageSynchronouslyBeforeQueuedResponse
|
||||
// is the defining contract: for a poll-mode workspace, the canvas user
|
||||
// message MUST be durably INSERTed into activity_logs BEFORE the synthetic
|
||||
// queued 200 is returned to the client — with NO reliance on a detached
|
||||
// async goroutine completing later.
|
||||
//
|
||||
// The test proves the ordering by making the INSERT block briefly and
|
||||
// asserting the handler does NOT return until the INSERT has completed.
|
||||
// Pre-fix (INSERT in h.goAsync, response returned immediately) the
|
||||
// handler returns ~instantly while the INSERT is still pending in the
|
||||
// goroutine → the elapsed time is far below the injected INSERT delay and
|
||||
// ExpectationsWereMet() is racy/unmet at return. Post-fix (synchronous
|
||||
// persist before the queued response) the handler return is gated on the
|
||||
// INSERT, so elapsed >= the injected delay and the expectation is met
|
||||
// deterministically at return WITHOUT any waitAsyncForTest()/sleep.
|
||||
func TestProxyA2A_PollMode_PersistsUserMessageSynchronouslyBeforeQueuedResponse(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
const wsID = "ws-poll-sync-persist"
|
||||
// Keep delay small: -race detection amplifies mock delays significantly.
|
||||
// A 50ms delay is sufficient to prove synchronous blocking (~50× the
|
||||
// normal INSERT latency) without bloating the full ./... suite runtime.
|
||||
const insertDelay = 50 * time.Millisecond
|
||||
|
||||
expectBudgetCheck(mock, wsID)
|
||||
|
||||
// lookupDeliveryMode → poll, triggering the short-circuit.
|
||||
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll"))
|
||||
|
||||
// workspace-name lookup inside logA2AReceiveQueued.
|
||||
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Poll WS"))
|
||||
|
||||
// The durable user-message write. We delay it so a synchronous
|
||||
// persist visibly gates the handler return; a detached-goroutine
|
||||
// persist (pre-fix) does not. The fix must keep using
|
||||
// context.WithoutCancel so this write survives a chat-exit cancel.
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillDelayFor(insertDelay).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
|
||||
// callerID == "" (no X-Workspace-ID) → this is a canvas_user message,
|
||||
// exactly Hongming's case.
|
||||
body := `{"jsonrpc":"2.0","id":"poll-canvas-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"my own message"}]}}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
start := time.Now()
|
||||
handler.ProxyA2A(c)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// Defining assertion #1: the handler must not have returned the
|
||||
// queued response before the durable INSERT committed. Pre-fix this
|
||||
// fails (elapsed ≈ 0, INSERT still racing in goAsync).
|
||||
if elapsed < insertDelay {
|
||||
t.Fatalf("poll-mode queued response returned in %v, before the %v user-message INSERT — "+
|
||||
"the message is not durable when the client/process goes away (DATA LOSS). "+
|
||||
"Persist must be synchronous before the queued 200.", elapsed, insertDelay)
|
||||
}
|
||||
|
||||
// Defining assertion #2: the durable write actually happened by the
|
||||
// time the handler returned. ExpectionsWereMet() hangs indefinitely if
|
||||
// the mock never fires (e.g. production code regressed to async),
|
||||
// so we check it in a goroutine with a hard 2s timeout — fails fast
|
||||
// (no CI hang) on regression while returning promptly on success.
|
||||
expectDone := make(chan error, 1)
|
||||
go func() { expectDone <- mock.ExpectationsWereMet() }()
|
||||
select {
|
||||
case err := <-expectDone:
|
||||
if err != nil {
|
||||
t.Fatalf("user-message INSERT was not durable at handler return (unmet sqlmock expectations): %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("ExpectationsWereMet() hung for >2s — INSERT mock never fired. " +
|
||||
"Likely cause: production code regressed logA2AReceiveQueued to goAsync " +
|
||||
"(INSERT fires after handler returns, not before).")
|
||||
}
|
||||
|
||||
// Sanity: still the correct poll-mode envelope + status.
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200 (queued), got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response is not valid JSON: %v", err)
|
||||
}
|
||||
if resp["status"] != "queued" || resp["delivery_mode"] != "poll" {
|
||||
t.Errorf("poll envelope changed: got status=%v delivery_mode=%v, want queued/poll",
|
||||
resp["status"], resp["delivery_mode"])
|
||||
}
|
||||
}
|
||||
@@ -504,25 +504,49 @@ func lookupDeliveryMode(ctx context.Context, workspaceID string) string {
|
||||
// reads in PR 3 — that's how a poll-mode workspace receives inbound A2A
|
||||
// without a public URL.
|
||||
func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string) {
|
||||
// DATA-LOSS FIX (internal#471 — poll-mode sibling of #1347/internal#470):
|
||||
// this is the ONLY durable write of a poll-mode inbound message,
|
||||
// including a canvas_user message (callerID == "") typed in the canvas
|
||||
// chat. It MUST be SYNCHRONOUS and complete BEFORE the caller returns
|
||||
// the synthetic {status:"queued"} 200 — otherwise the canvas sees the
|
||||
// send acknowledged while the activity_logs row is still racing in a
|
||||
// detached goroutine, and a workspace-server restart / deploy / OOM /
|
||||
// EC2 hibernation between the 200 and the goroutine's commit loses the
|
||||
// user's message permanently (chat-history reads activity_logs, so a
|
||||
// missing row = message gone on reopen). Hongming's tenant is entirely
|
||||
// poll-mode (4 external workspaces, no URL — verified empirically), so
|
||||
// his reported loss is THIS path; #1347 (push-mode, persists AFTER the
|
||||
// poll short-circuit) structurally cannot cover it.
|
||||
//
|
||||
// Mirrors persistUserMessageAtIngest's discipline:
|
||||
// - context.WithoutCancel: a client disconnect on chat-exit (which
|
||||
// cancels the inbound request ctx) MUST NOT abort this write.
|
||||
// - SYNCHRONOUS (no goAsync): the row must be durable before the
|
||||
// queued 200 is returned to the caller.
|
||||
// - Best-effort: LogActivity already logs+swallows INSERT errors, so
|
||||
// a hiccup never blocks or fails the user's send (behavior for
|
||||
// that one request is never worse than the pre-fix async path).
|
||||
// The post-commit broadcast still fires inside LogActivity; a missed
|
||||
// WebSocket event is not data loss (the durable row is the truth the
|
||||
// canvas re-reads on reopen).
|
||||
insCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var wsName string
|
||||
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
|
||||
db.DB.QueryRowContext(insCtx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
|
||||
if wsName == "" {
|
||||
wsName = workspaceID
|
||||
}
|
||||
summary := a2aMethod + " → " + wsName + " (queued for poll)"
|
||||
h.goAsync(func() {
|
||||
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
|
||||
defer cancel()
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
Status: "ok",
|
||||
})
|
||||
LogActivity(insCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
Status: "ok",
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -177,7 +177,7 @@ func isEnvIdentPart(c byte) bool {
|
||||
return isEnvIdentStart(c) || (c >= '0' && c <= '9')
|
||||
}
|
||||
|
||||
// loadWorkspaceEnv reads the org root .env and the workspace-specific .env .env and the workspace-specific .env
|
||||
// loadWorkspaceEnv reads the org root .env and the workspace-specific .env
|
||||
// (workspace overrides org root). Used by both secret injection and channel
|
||||
// config expansion.
|
||||
//
|
||||
|
||||
@@ -189,6 +189,24 @@ const containerNamePrefix = "ws-"
|
||||
// (the wiped-DB case after `docker compose down -v`).
|
||||
const LabelManaged = "molecule.platform.managed"
|
||||
|
||||
// AgentUID / AgentGID are the uid/gid of the unprivileged `agent` user that
|
||||
// every workspace template creates and drops to via `gosu agent` before
|
||||
// exec'ing the runtime (the a2a_mcp_server runs under this uid). The value is
|
||||
// fixed at 1000:1000 across all templates — see:
|
||||
// - workspace-configs-templates/claude-code-default/Dockerfile (`useradd -u 1000 ... agent`)
|
||||
// - workspace-configs-templates/hermes/Dockerfile (`useradd -u 1000 ... agent`)
|
||||
// - workspace/entrypoint.sh (`exec gosu agent` — "uid 1000")
|
||||
//
|
||||
// Files the platform injects into /configs AFTER the entrypoint's
|
||||
// `chown -R agent:agent /configs` (the post-start #418 re-injection and the
|
||||
// pre-start #1877 volume write) must be owned by this uid/gid, otherwise the
|
||||
// agent-uid MCP server hits EACCES reading /configs/.auth_token, sends an
|
||||
// empty bearer, and the platform 401s on /registry/{id}/peers (list_peers).
|
||||
const (
|
||||
AgentUID = 1000
|
||||
AgentGID = 1000
|
||||
)
|
||||
|
||||
// managedLabels is the canonical label map applied to every workspace
|
||||
// container + volume. Pulled out so a future addition (e.g. instance
|
||||
// UUID for multi-platform-shared-daemon disambiguation) is one edit.
|
||||
@@ -862,8 +880,18 @@ func buildTemplateTar(templatePath string) (*bytes.Buffer, error) {
|
||||
return &buf, nil
|
||||
}
|
||||
|
||||
// WriteFilesToContainer writes in-memory files into /configs in the container.
|
||||
func (p *Provisioner) WriteFilesToContainer(ctx context.Context, containerID string, files map[string][]byte) error {
|
||||
// buildConfigFilesTar builds the tar stream that WriteFilesToContainer streams
|
||||
// into /configs via CopyToContainer. Every entry is stamped Uid/Gid = agent
|
||||
// (AgentUID/AgentGID) so the files land agent-owned after extraction. This is
|
||||
// the issue #418 post-start re-injection path: it runs AFTER the template
|
||||
// entrypoint's `chown -R agent:agent /configs`, so without explicit ownership
|
||||
// in the tar header the files extract as root:root (tar Uid/Gid default 0) and
|
||||
// the agent-uid MCP server can no longer read /configs/.auth_token (and
|
||||
// /configs/.platform_inbound_secret) → empty bearer → list_peers 401.
|
||||
//
|
||||
// Pulled out as a pure function so the ownership contract is unit-testable
|
||||
// without a live Docker daemon (mirrors buildTemplateTar).
|
||||
func buildConfigFilesTar(files map[string][]byte) (*bytes.Buffer, error) {
|
||||
var buf bytes.Buffer
|
||||
tw := tar.NewWriter(&buf)
|
||||
|
||||
@@ -876,8 +904,10 @@ func (p *Provisioner) WriteFilesToContainer(ctx context.Context, containerID str
|
||||
Typeflag: tar.TypeDir,
|
||||
Name: dir + "/",
|
||||
Mode: 0755,
|
||||
Uid: AgentUID,
|
||||
Gid: AgentGID,
|
||||
}); err != nil {
|
||||
return fmt.Errorf("failed to write tar dir header for %s: %w", dir, err)
|
||||
return nil, fmt.Errorf("failed to write tar dir header for %s: %w", dir, err)
|
||||
}
|
||||
createdDirs[dir] = true
|
||||
}
|
||||
@@ -886,19 +916,30 @@ func (p *Provisioner) WriteFilesToContainer(ctx context.Context, containerID str
|
||||
Name: name,
|
||||
Mode: 0644,
|
||||
Size: int64(len(data)),
|
||||
Uid: AgentUID,
|
||||
Gid: AgentGID,
|
||||
}
|
||||
if err := tw.WriteHeader(header); err != nil {
|
||||
return fmt.Errorf("failed to write tar header for %s: %w", name, err)
|
||||
return nil, fmt.Errorf("failed to write tar header for %s: %w", name, err)
|
||||
}
|
||||
if _, err := tw.Write(data); err != nil {
|
||||
return fmt.Errorf("failed to write tar data for %s: %w", name, err)
|
||||
return nil, fmt.Errorf("failed to write tar data for %s: %w", name, err)
|
||||
}
|
||||
}
|
||||
if err := tw.Close(); err != nil {
|
||||
return fmt.Errorf("failed to close tar writer: %w", err)
|
||||
return nil, fmt.Errorf("failed to close tar writer: %w", err)
|
||||
}
|
||||
return &buf, nil
|
||||
}
|
||||
|
||||
return p.cli.CopyToContainer(ctx, containerID, "/configs", &buf, container.CopyToContainerOptions{})
|
||||
// WriteFilesToContainer writes in-memory files into /configs in the container,
|
||||
// agent-owned (see buildConfigFilesTar).
|
||||
func (p *Provisioner) WriteFilesToContainer(ctx context.Context, containerID string, files map[string][]byte) error {
|
||||
buf, err := buildConfigFilesTar(files)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.cli.CopyToContainer(ctx, containerID, "/configs", buf, container.CopyToContainerOptions{})
|
||||
}
|
||||
|
||||
// CopyToContainer exposes CopyToContainer from the Docker client for use by other packages.
|
||||
@@ -988,13 +1029,28 @@ func (p *Provisioner) ReadFromVolume(ctx context.Context, volumeName, filePath s
|
||||
return clean, nil
|
||||
}
|
||||
|
||||
// writeAuthTokenVolumeCmd is the shell command the throwaway alpine container
|
||||
// runs to seed /vol/.auth_token. alpine runs it as root, so without the
|
||||
// explicit `chown 1000:1000` the file stays root:root after the template
|
||||
// entrypoint's `chown -R agent:agent /configs` has already run — the agent-uid
|
||||
// (AgentUID) MCP server then gets EACCES reading it → empty bearer →
|
||||
// list_peers 401. Pulled out as a pure function so the ownership contract is
|
||||
// unit-testable without a live Docker daemon. Issue #1877.
|
||||
func writeAuthTokenVolumeCmd() string {
|
||||
return fmt.Sprintf(
|
||||
"mkdir -p /vol && printf '%%s' $TOKEN > /vol/.auth_token && chmod 0600 /vol/.auth_token && chown %d:%d /vol/.auth_token",
|
||||
AgentUID, AgentGID,
|
||||
)
|
||||
}
|
||||
|
||||
// WriteAuthTokenToVolume writes the workspace auth token into the config volume
|
||||
// BEFORE the container starts, eliminating the token-injection race window where
|
||||
// a restarted container could read a stale token from /configs/.auth_token before
|
||||
// WriteFilesToContainer writes the new one. Issue #1877.
|
||||
//
|
||||
// Uses a throwaway alpine container to write directly to the named volume,
|
||||
// bypassing the container lifecycle entirely.
|
||||
// bypassing the container lifecycle entirely. The written file is chowned to
|
||||
// the agent uid/gid (see writeAuthTokenVolumeCmd).
|
||||
func (p *Provisioner) WriteAuthTokenToVolume(ctx context.Context, workspaceID, token string) error {
|
||||
if p == nil || p.cli == nil {
|
||||
return ErrNoBackend
|
||||
@@ -1002,7 +1058,7 @@ func (p *Provisioner) WriteAuthTokenToVolume(ctx context.Context, workspaceID, t
|
||||
volName := ConfigVolumeName(workspaceID)
|
||||
resp, err := p.cli.ContainerCreate(ctx, &container.Config{
|
||||
Image: "alpine",
|
||||
Cmd: []string{"sh", "-c", "mkdir -p /vol && printf '%s' $TOKEN > /vol/.auth_token && chmod 0600 /vol/.auth_token"},
|
||||
Cmd: []string{"sh", "-c", writeAuthTokenVolumeCmd()},
|
||||
Env: []string{"TOKEN=" + token},
|
||||
}, &container.HostConfig{
|
||||
Binds: []string{volName + ":/vol"},
|
||||
|
||||
@@ -0,0 +1,95 @@
|
||||
package provisioner
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"errors"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// These tests pin the P0 fix for the fleet-wide list_peers 401 (Hermes and
|
||||
// every other template): the workspace-server token-injection paths wrote
|
||||
// /configs/.auth_token (and /configs/.platform_inbound_secret) as root:root
|
||||
// AFTER the template entrypoint's `chown -R agent:agent /configs` ran, so the
|
||||
// agent-uid (1000) MCP server (a2a_mcp_server, running via `gosu agent`) hit
|
||||
// `[Errno 13] Permission denied` reading the bearer → empty bearer → platform
|
||||
// 401 on /registry/{id}/peers (the literal tool_list_peers path).
|
||||
//
|
||||
// The agent uid is 1000:1000, verified from the templates:
|
||||
// - workspace-configs-templates/claude-code-default/Dockerfile: `useradd -u 1000 ... agent`
|
||||
// - workspace-configs-templates/hermes/Dockerfile: `useradd -u 1000 ... agent`
|
||||
// - workspace/entrypoint.sh / claude-code-default/entrypoint.sh: `exec gosu agent` ("uid 1000")
|
||||
//
|
||||
// Both tests assert the real artifact (the tar headers Docker's CopyToContainer
|
||||
// honours for ownership, and the literal shell command the throwaway alpine
|
||||
// container runs), not a mock that bypasses ownership. They FAIL on pre-fix
|
||||
// code (no Uid/Gid in tar headers; no chown in the alpine command → root:root)
|
||||
// and PASS post-fix (agent-owned).
|
||||
|
||||
// TestWriteFilesToContainerTar_FilesAreAgentOwned covers the issue #418
|
||||
// post-start re-injection path (WriteFilesToContainer): the tar it streams
|
||||
// into /configs via CopyToContainer must carry Uid/Gid = agent (1000) so the
|
||||
// extracted files land agent-readable, not root:root. This is the path that
|
||||
// (re)writes BOTH .auth_token and .platform_inbound_secret on a cadence.
|
||||
func TestWriteFilesToContainerTar_FilesAreAgentOwned(t *testing.T) {
|
||||
files := map[string][]byte{
|
||||
".auth_token": []byte("tok-abc123"),
|
||||
".platform_inbound_secret": []byte("inbound-secret-xyz"),
|
||||
"nested/dir/file.txt": []byte("data"),
|
||||
}
|
||||
|
||||
buf, err := buildConfigFilesTar(files)
|
||||
if err != nil {
|
||||
t.Fatalf("buildConfigFilesTar: %v", err)
|
||||
}
|
||||
|
||||
tr := tar.NewReader(buf)
|
||||
seen := map[string]bool{}
|
||||
for {
|
||||
hdr, err := tr.Next()
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("read tar: %v", err)
|
||||
}
|
||||
if _, err := io.Copy(io.Discard, tr); err != nil {
|
||||
t.Fatalf("drain %s: %v", hdr.Name, err)
|
||||
}
|
||||
seen[hdr.Name] = true
|
||||
if hdr.Uid != AgentUID {
|
||||
t.Fatalf("tar entry %q Uid = %d, want %d (agent) — root-owned injection causes the list_peers 401",
|
||||
hdr.Name, hdr.Uid, AgentUID)
|
||||
}
|
||||
if hdr.Gid != AgentGID {
|
||||
t.Fatalf("tar entry %q Gid = %d, want %d (agent)", hdr.Name, hdr.Gid, AgentGID)
|
||||
}
|
||||
}
|
||||
|
||||
for _, want := range []string{".auth_token", ".platform_inbound_secret"} {
|
||||
if !seen[want] {
|
||||
t.Fatalf("tar missing %q (seen: %v)", want, seen)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestWriteAuthTokenVolumeCmd_ChownsToAgent covers the issue #1877 pre-start
|
||||
// volume-write path (WriteAuthTokenToVolume): the throwaway alpine container
|
||||
// writes /vol/.auth_token then chmod 0600 but, pre-fix, never chowns it, so it
|
||||
// stays root:root (alpine runs the command as root). The literal command must
|
||||
// chown the file to the agent uid:gid so the agent-uid MCP server can read it.
|
||||
func TestWriteAuthTokenVolumeCmd_ChownsToAgent(t *testing.T) {
|
||||
cmd := writeAuthTokenVolumeCmd()
|
||||
|
||||
if !strings.Contains(cmd, "chmod 0600 /vol/.auth_token") {
|
||||
t.Fatalf("alpine cmd lost the 0600 chmod (regression): %q", cmd)
|
||||
}
|
||||
|
||||
wantChown := "chown 1000:1000 /vol/.auth_token"
|
||||
if !strings.Contains(cmd, wantChown) {
|
||||
t.Fatalf("alpine cmd = %q, missing %q — without it .auth_token stays root:root "+
|
||||
"and the agent-uid MCP server gets EACCES → empty bearer → list_peers 401",
|
||||
cmd, wantChown)
|
||||
}
|
||||
}
|
||||
+21
-9
@@ -22,10 +22,22 @@ from platform_auth import auth_headers, self_source_headers
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_WORKSPACE_ID_raw = os.environ.get("WORKSPACE_ID")
|
||||
if not _WORKSPACE_ID_raw:
|
||||
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
|
||||
WORKSPACE_ID = _WORKSPACE_ID_raw
|
||||
WORKSPACE_ID = os.environ.get("WORKSPACE_ID", "")
|
||||
|
||||
|
||||
def _require_workspace_id() -> str:
|
||||
"""Raise RuntimeError if WORKSPACE_ID is unset.
|
||||
|
||||
Call this at the start of any function that makes a platform API call
|
||||
that requires a source workspace ID. The check is lazy so that:
|
||||
1. ``import a2a_client`` succeeds without WORKSPACE_ID set (smoke
|
||||
tests, type checkers, IDE autocompletion, module introspection).
|
||||
2. Actual runtime usage (inside a workspace container) raises a clear
|
||||
error at the first failing call rather than at import time.
|
||||
"""
|
||||
if not WORKSPACE_ID:
|
||||
raise RuntimeError("WORKSPACE_ID environment variable is required but not set")
|
||||
return WORKSPACE_ID
|
||||
# Platform URL: always host.docker.internal inside containers. The platform API
|
||||
# is only reachable via the Docker network mesh from inside a workspace
|
||||
# container regardless of the runtime environment (Docker/host).
|
||||
@@ -306,7 +318,7 @@ def enrich_peer_metadata(
|
||||
# the same as a registry miss, which is the desired UX.
|
||||
return record
|
||||
|
||||
src = (source_workspace_id or "").strip() or WORKSPACE_ID
|
||||
src = (source_workspace_id or "").strip() or _require_workspace_id()
|
||||
url = f"{PLATFORM_URL}/registry/discover/{canon}"
|
||||
try:
|
||||
with httpx.Client(timeout=2.0) as client:
|
||||
@@ -427,7 +439,7 @@ async def discover_peer(target_id: str, source_workspace_id: str | None = None)
|
||||
safe_id = _validate_peer_id(target_id)
|
||||
if safe_id is None:
|
||||
return None
|
||||
src = (source_workspace_id or "").strip() or WORKSPACE_ID
|
||||
src = (source_workspace_id or "").strip() or _require_workspace_id()
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
try:
|
||||
resp = await client.get(
|
||||
@@ -551,7 +563,7 @@ async def send_a2a_message(peer_id: str, message: str, source_workspace_id: str
|
||||
safe_id = _validate_peer_id(peer_id)
|
||||
if safe_id is None:
|
||||
return f"{_A2A_ERROR_PREFIX}invalid peer_id (expected UUID): {peer_id!r}"
|
||||
src = (source_workspace_id or "").strip() or WORKSPACE_ID
|
||||
src = (source_workspace_id or "").strip() or _require_workspace_id()
|
||||
target_url = f"{PLATFORM_URL}/workspaces/{safe_id}/a2a"
|
||||
|
||||
# Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed
|
||||
@@ -708,7 +720,7 @@ async def get_peers_with_diagnostic(source_workspace_id: str | None = None) -> t
|
||||
The legacy get_peers() shim below preserves the bare-list contract for
|
||||
non-tool callers.
|
||||
"""
|
||||
src = (source_workspace_id or "").strip() or WORKSPACE_ID
|
||||
src = (source_workspace_id or "").strip() or _require_workspace_id()
|
||||
url = f"{PLATFORM_URL}/registry/{src}/peers"
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
try:
|
||||
@@ -768,7 +780,7 @@ async def get_workspace_info(source_workspace_id: str | None = None) -> dict:
|
||||
- 404 / other → workspace never existed (or transient)
|
||||
- exception → network / auth failure
|
||||
"""
|
||||
src = source_workspace_id or WORKSPACE_ID
|
||||
src = source_workspace_id or _require_workspace_id()
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
try:
|
||||
resp = await client.get(
|
||||
|
||||
@@ -1490,3 +1490,90 @@ class TestWaitForEnrichmentInFlight:
|
||||
a2a_client._peer_metadata.clear()
|
||||
a2a_client._peer_names.clear()
|
||||
a2a_client._peer_in_flight_clear_for_testing()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Lazy WORKSPACE_ID validation (KI fix: import-time guard removed)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLazyWorkspaceIdZzz:
|
||||
"""Regression: module import must NOT raise when WORKSPACE_ID is unset.
|
||||
|
||||
Named Zzz so this class runs LAST in the test suite.
|
||||
test_import_succeeds_without_workspace_id reloads a2a_client with
|
||||
WORKSPACE_ID unset, which corrupts the module-level WORKSPACE_ID for
|
||||
subsequent tests that import a2a_client without a fixture resetting
|
||||
it. Running these tests last avoids polluting the module state for other
|
||||
test classes.
|
||||
|
||||
Before the fix, ``import a2a_client`` raised RuntimeError at module level
|
||||
if WORKSPACE_ID was not set, blocking smoke tests, type checkers, IDE
|
||||
autocompletion, and any script that imports the module without the full
|
||||
runtime env. The guard was moved to lazy first-use so imports are
|
||||
side-effect-free while first API call still fails fast with a clear error.
|
||||
"""
|
||||
|
||||
def zzz_test_import_succeeds_without_workspace_id(self):
|
||||
"""import a2a_client must not raise RuntimeError when WORKSPACE_ID is unset."""
|
||||
import sys
|
||||
|
||||
# Simulate a subprocess-like environment: a fresh interpreter
|
||||
# that has never imported this module and has no WORKSPACE_ID.
|
||||
# We use importlib.util to load the module with WORKSPACE_ID removed.
|
||||
env_backup = os.environ.pop("WORKSPACE_ID", None)
|
||||
try:
|
||||
# Remove any cached import so we get a fresh load.
|
||||
mods_to_remove = [k for k in sys.modules if k.startswith("a2a_client")]
|
||||
for mod in mods_to_remove:
|
||||
del sys.modules[mod]
|
||||
|
||||
import a2a_client as ac
|
||||
# Import must succeed; WORKSPACE_ID should be empty string.
|
||||
assert ac.WORKSPACE_ID == ""
|
||||
finally:
|
||||
# Restore env so other tests are unaffected.
|
||||
if env_backup is not None:
|
||||
os.environ["WORKSPACE_ID"] = env_backup
|
||||
# Re-import with original env restored.
|
||||
import importlib
|
||||
import a2a_client as _restored
|
||||
importlib.reload(_restored)
|
||||
|
||||
def zzz_test_require_workspace_id_raises_without_it(self):
|
||||
"""_require_workspace_id() must raise RuntimeError when WORKSPACE_ID is empty."""
|
||||
import a2a_client
|
||||
|
||||
original = a2a_client.WORKSPACE_ID
|
||||
a2a_client.WORKSPACE_ID = ""
|
||||
try:
|
||||
with pytest.raises(RuntimeError, match="WORKSPACE_ID"):
|
||||
a2a_client._require_workspace_id()
|
||||
finally:
|
||||
a2a_client.WORKSPACE_ID = original
|
||||
|
||||
def zzz_test_require_workspace_id_returns_value_when_set(self):
|
||||
"""_require_workspace_id() must return WORKSPACE_ID when it is set."""
|
||||
import a2a_client
|
||||
|
||||
original = a2a_client.WORKSPACE_ID
|
||||
a2a_client.WORKSPACE_ID = "test-workspace-123"
|
||||
try:
|
||||
result = a2a_client._require_workspace_id()
|
||||
assert result == "test-workspace-123"
|
||||
finally:
|
||||
a2a_client.WORKSPACE_ID = original
|
||||
|
||||
def zzz_test_enrich_peer_metadata_raises_without_workspace_id(self):
|
||||
"""enrich_peer_metadata must raise RuntimeError when WORKSPACE_ID is unset."""
|
||||
import a2a_client
|
||||
|
||||
original = a2a_client.WORKSPACE_ID
|
||||
a2a_client.WORKSPACE_ID = ""
|
||||
# Must use a valid UUID so _validate_peer_id doesn't return None early.
|
||||
try:
|
||||
with pytest.raises(RuntimeError, match="WORKSPACE_ID"):
|
||||
a2a_client.enrich_peer_metadata(
|
||||
"00000000-0000-0000-0000-000000000001"
|
||||
)
|
||||
finally:
|
||||
a2a_client.WORKSPACE_ID = original
|
||||
|
||||
Reference in New Issue
Block a user