Merge pull request 'fix(handlers): drain detached async goroutines before test db.DB swap (data race)' (#1267) from fix/handlers-global-dbdb-data-race into staging
Block internal-flavored paths / Block forbidden paths (push) Waiting to run
CI / Shellcheck (E2E scripts) (push) Blocked by required conditions
CI / Canvas Deploy Reminder (push) Blocked by required conditions
Handlers Postgres Integration / detect-changes (push) Waiting to run
Harness Replays / detect-changes (push) Waiting to run
Runtime PR-Built Compatibility / detect-changes (push) Waiting to run
Secret scan / Scan diff for credential-shaped strings (push) Waiting to run
CI / Detect changes (push) Successful in 1m33s
E2E API Smoke Test / detect-changes (push) Successful in 2m55s
E2E Chat / detect-changes (push) Successful in 2m39s
CI / Python Lint & Test (push) Successful in 17s
E2E Chat / E2E Chat (push) Failing after 55s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 3m21s
CI / Canvas (Next.js) (push) Successful in 24m37s
CI / Platform (Go) (push) Successful in 26m11s
Handlers Postgres Integration / Handlers Postgres Integration (push) Has been cancelled
Harness Replays / Harness Replays (push) Has been cancelled
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Has been cancelled
CI / all-required (push) Has been cancelled

This commit was merged in pull request #1267.
This commit is contained in:
2026-05-16 01:30:41 +00:00
5 changed files with 87 additions and 19 deletions
@@ -194,7 +194,12 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace
}
db.ClearWorkspaceKeys(ctx, workspaceID)
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOffline), workspaceID, map[string]interface{}{})
go h.RestartByID(workspaceID)
// Tracked via goAsync (not bare `go`) so the asyncWG can be drained
// before a test swaps the global db.DB. runRestartCycle reads db.DB
// before its provisioner gate, so an untracked detached goroutine
// races setupTestDB's t.Cleanup db.DB restore. Matches the already-
// correct site at a2a_proxy.go:648.
h.goAsync(func() { h.RestartByID(workspaceID) })
return true
}
@@ -241,7 +246,10 @@ func (h *WorkspaceHandler) preflightContainerHealth(ctx context.Context, workspa
}
db.ClearWorkspaceKeys(ctx, workspaceID)
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOffline), workspaceID, map[string]interface{}{})
go h.RestartByID(workspaceID)
// Tracked via goAsync (see maybeMarkContainerDead): preflight's
// detached restart must be drainable so it doesn't race the global
// db.DB swap in test cleanup.
h.goAsync(func() { h.RestartByID(workspaceID) })
return &proxyA2AError{
Status: http.StatusServiceUnavailable,
Response: gin.H{
@@ -262,7 +270,8 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle
errWsName = workspaceID
}
summary := "A2A request to " + errWsName + " failed: " + errMsg
go func(parent context.Context) {
parent := ctx
h.goAsync(func() {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
@@ -277,7 +286,7 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle
Status: "error",
ErrorDetail: &errMsg,
})
}(ctx)
})
}
// logA2ASuccess records a successful A2A round-trip and (for canvas-initiated
@@ -298,18 +307,19 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
// silent workspaces. Only update when callerID is a real workspace (not
// canvas, not a system caller) and the target returned 2xx/3xx.
if callerID != "" && !isSystemCaller(callerID) && statusCode < 400 {
go func() {
h.goAsync(func() {
bgCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err := db.DB.ExecContext(bgCtx,
`UPDATE workspaces SET last_outbound_at = NOW() WHERE id = $1`, callerID); err != nil {
log.Printf("last_outbound_at update failed for %s: %v", callerID, err)
}
}()
})
}
summary := a2aMethod + " → " + wsNameForLog
toolTrace := extractToolTrace(respBody)
go func(parent context.Context) {
parent := ctx
h.goAsync(func() {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
@@ -325,7 +335,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
DurationMs: &durationMs,
Status: logStatus,
})
}(ctx)
})
if callerID == "" && statusCode < 400 {
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{
@@ -510,7 +520,8 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID,
wsName = workspaceID
}
summary := a2aMethod + " → " + wsName + " (queued for poll)"
go func(parent context.Context) {
parent := ctx
h.goAsync(func() {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
@@ -523,7 +534,7 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID,
RequestBody: json.RawMessage(body),
Status: "ok",
})
}(ctx)
})
}
// readUsageMap extracts input_tokens / output_tokens from the "usage" key of m.
@@ -8,6 +8,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
@@ -22,8 +23,39 @@ import (
"github.com/redis/go-redis/v9"
)
// liveTestHandlers tracks every WorkspaceHandler built during the test
// binary's lifetime so setupTestDB can drain their in-flight goAsync
// goroutines (notably the detached RestartByID restart cycle, which
// reads the global db.DB) BEFORE restoring db.DB. Without this drain a
// fire-and-forget restart goroutine spawned by one test outlives that
// test and races the db.DB swap in a later test's t.Cleanup — the
// 0x...d548 data race on platform/internal/db.DB.
var (
liveTestHandlersMu sync.Mutex
liveTestHandlers []*WorkspaceHandler
)
func init() {
gin.SetMode(gin.TestMode)
newHandlerHook = func(h *WorkspaceHandler) {
liveTestHandlersMu.Lock()
liveTestHandlers = append(liveTestHandlers, h)
liveTestHandlersMu.Unlock()
}
}
// drainTestAsync waits for every tracked handler's goAsync goroutines to
// finish. Called from setupTestDB's cleanup before db.DB is restored so
// no detached restart/provision goroutine is mid-read of db.DB when the
// pointer is swapped.
func drainTestAsync() {
liveTestHandlersMu.Lock()
handlers := make([]*WorkspaceHandler, len(liveTestHandlers))
copy(handlers, liveTestHandlers)
liveTestHandlersMu.Unlock()
for _, h := range handlers {
h.waitAsyncForTest()
}
}
// setupTestDB creates a sqlmock DB and assigns it to the global db.DB.
@@ -37,7 +69,16 @@ func setupTestDB(t *testing.T) sqlmock.Sqlmock {
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
t.Cleanup(func() {
// Drain detached async goroutines (e.g. goAsync(RestartByID),
// which reads db.DB in runRestartCycle before its provisioner
// gate) BEFORE swapping db.DB back. Doing the restore first
// would let an in-flight restart goroutine read db.DB while
// this line writes it — the data race this guards against.
drainTestAsync()
db.DB = prevDB
mockDB.Close()
})
// Disable SSRF checks for the duration of this test only. Restore
// the previous state via t.Cleanup so that TestIsSafeURL_* tests
@@ -56,9 +56,11 @@ const (
// (an externally routable address) is used directly.
func (h *WorkspaceHandler) gracefulPreRestart(ctx context.Context, workspaceID string) {
// Non-blocking send — don't stall the restart cycle.
// Run in a detached goroutine so the caller (runRestartCycle) can
// proceed to stopForRestart without waiting.
go func() {
// Run in a tracked async goroutine (goAsync, not bare `go`) so the
// caller (runRestartCycle) can proceed to stopForRestart without
// waiting, while the test harness can still drain it before swapping
// the global db.DB (resolveAgentURLForRestartSignal reads db.DB).
h.goAsync(func() {
signalCtx, cancel := context.WithTimeout(context.Background(), restartSignalTimeout)
defer cancel()
@@ -109,7 +111,7 @@ func (h *WorkspaceHandler) gracefulPreRestart(ctx context.Context, workspaceID s
} else {
log.Printf("A2AGracefulRestart: %s returned status %d — proceeding with stop", workspaceID, resp.StatusCode)
}
}()
})
}
// resolveAgentURLForRestartSignal returns the routable URL for the workspace
@@ -80,6 +80,15 @@ type WorkspaceHandler struct {
asyncWG sync.WaitGroup
}
// newHandlerHook, when non-nil, is invoked for every WorkspaceHandler
// created via NewWorkspaceHandler. It is nil in production (zero cost);
// the test harness sets it so setupTestDB can drain every handler's
// in-flight async goroutines before swapping the global db.DB. Without
// this, a detached restart goroutine (maybeMarkContainerDead ->
// goAsync(RestartByID) -> runRestartCycle reads db.DB) races the
// db.DB restore in another test's t.Cleanup.
var newHandlerHook func(*WorkspaceHandler)
func (h *WorkspaceHandler) goAsync(fn func()) {
h.asyncWG.Add(1)
go func() {
@@ -108,6 +117,9 @@ func NewWorkspaceHandler(b events.EventEmitter, p *provisioner.Provisioner, plat
if p != nil {
h.provisioner = p
}
if newHandlerHook != nil {
newHandlerHook(h)
}
return h
}
@@ -237,10 +237,10 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) {
// the silent-drop bugs PRs #2811/#2824 closed). RestartWorkspaceAuto
// enforces CP-FIRST ordering matching the other dispatchers — see
// docs/architecture/backends.md.
go func() {
h.goAsync(func() {
h.RestartWorkspaceAutoOpts(context.Background(), id, templatePath, configFiles, payload, resetClaudeSession)
}()
go h.sendRestartContext(id, restartData)
})
h.goAsync(func() { h.sendRestartContext(id, restartData) })
c.JSON(http.StatusOK, gin.H{"status": "provisioning", "config_dir": configLabel, "reset_session": resetClaudeSession})
}
@@ -610,7 +610,9 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) {
h.provisionWorkspaceAutoSync(workspaceID, "", nil, payload)
// sendRestartContext is a one-way notification to the new container; safe
// to fire async — the next restart cycle won't depend on it completing.
go h.sendRestartContext(workspaceID, restartData)
// Tracked via goAsync so the test harness can drain it before the
// global db.DB swap (sendRestartContext reads db.DB).
h.goAsync(func() { h.sendRestartContext(workspaceID, restartData) })
}
// Pause handles POST /workspaces/:id/pause