Compare commits

...

4 Commits

Author SHA1 Message Date
Hongming Wang 694c05552b fix(test): drain coalesceRestart goroutines before t.Cleanup (Class H, #170)
TestPooledWithEICTunnel_PreservesFnErr (and any sqlmock-using neighbour
test) was at risk of inheriting stale INSERT calls from a previous
test's coalesceRestart goroutine that survived its t.Cleanup boundary.

The production callsite shape is `go h.RestartByID(...)` from
a2a_proxy.go, a2a_proxy_helpers.go and main.go. When that goroutine's
runRestartCycle panics, coalesceRestart's deferred recover swallows it
to keep the platform process alive — but in tests, nothing waits for
the goroutine to fully exit. If it's still draining LogActivity-shaped
work after the test returns, those INSERTs land in the next test's
sqlmock connection as kind=DELEGATION_FAILED /
kind=WORKSPACE_PROVISION_FAILED, surfacing as "INSERT-not-expected".

Fix: introduce drainCoalesceGoroutine(t, wsID, cycle) test helper that
spawns coalesceRestart on a goroutine (matching production) and
registers a t.Cleanup with sync.WaitGroup.Wait so the test can't
declare itself done while a goroutine is still alive.

Convert TestCoalesceRestart_PanicInCycleClearsState to use the helper
(previously it called coalesceRestart synchronously, which never
exercised the production goroutine-survival contract).

Add TestCoalesceRestart_DrainHelperWaitsForGoroutineExit as the
regression guard: cycle blocks 150ms then panics; the test asserts
t.Run elapsed >= 150ms (proving the Wait barrier engaged) AND the
deferred close ran (proving the panic-recovery defer chain executed)
AND state.running was cleared. Verified the assertion is real by
mutation-testing: removing t.Cleanup(wg.Wait) makes this test FAIL
deterministically with elapsed <300µs.

Per saved memory feedback_assert_exact_not_substring: the regression
test asserts an exact-shape contract (elapsed >= blockFor) rather than
a substring-in-output, so it discriminates between "drain works" and
"drain skipped".

Per Phase 3: 10/10 race-detector runs pass for all TestCoalesceRestart_*
tests. Full ./internal/handlers/... suite green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 13:04:57 -07:00
claude-ceo-assistant d2da0c8d34 Merge pull request 'fix(workspace-server): a2a-proxy preflight container check (closes #36)' (#37) from fix/issue36-a2a-proxy-preflight into main 2026-05-07 18:25:07 +00:00
claude-ceo-assistant be5fbb5ad3 fix(workspace-server): a2a-proxy preflight container check (closes #36)
Same SSOT-divergence shape as #10 / fixed in #12, but on the a2a-proxy
code path. The plugin handler was routed through `provisioner.RunningContainerName`;
a2a-proxy was forwarding optimistically and only catching missing containers
REACTIVELY via `maybeMarkContainerDead` after the network call timed out.

Result on tenants whose agent containers had been recycled (e.g. post-EC2
replace from molecule-controlplane#20): canvas waits 2-30s for the network
forward to fail before getting a 503, and the workspace-server logs only
"ProxyA2A forward error" without the "container is dead" signal.

This PR adds a proactive `Provisioner.IsRunning` check in `proxyA2ARequest`
between `resolveAgentURL` and `dispatchA2A`, gated on the conditions where
we know we're talking to a sibling Docker container we own (`h.provisioner
!= nil` AND `platformInDocker` AND the URL was rewritten to Docker-DNS form).

Three outcomes via the SSOT helper:
  (true,  nil) → forward as today
  (false, nil) → fast-503 with `error="workspace container not running —
                 restart triggered"`, `restarting=true`, `preflight=true`,
                 plus the same offline-flip + WORKSPACE_OFFLINE broadcast +
                 async restart that `maybeMarkContainerDead` produces
  (true,  err) → fall through to optimistic forward (matches IsRunning's
                 "fail-soft as alive" contract — flaky daemon must not
                 trigger a restart cascade)

The `preflight=true` flag in the response distinguishes the proactive
short-circuit from the reactive `maybeMarkContainerDead` path so canvas
or downstream callers can render distinct messages later.

* `internal/handlers/a2a_proxy.go` — preflight call site between
  resolveAgentURL and dispatchA2A; gated on `h.provisioner != nil &&
  platformInDocker && url == http://<ContainerName(id)>:port`.
* `internal/handlers/a2a_proxy_helpers.go` — `preflightContainerHealth`
  helper. Routes through `h.provisioner.IsRunning` (which itself wraps
  `RunningContainerName`). Identical offline-flip side-effects as
  `maybeMarkContainerDead` for the dead-container case.
* `internal/handlers/a2a_proxy_preflight_test.go` — 4 tests: running →
  nil; not-running → structured 503 + sqlmock expectations on the
  offline-flip + structure_events insert; transient error → nil
  (fail-soft); AST gate pinning the SSOT routing (mirror of #12's gate).

Mutation-tested: removing the `if running { return nil }` guard makes
the production code fail to compile (unused var). A subtler mutation
(replacing the !running branch with `return nil`) would make
TestPreflight_ContainerNotRunning_StructuredFastFail fail at runtime
with sqlmock's "expected DB call did not occur."

Refs: molecule-core#36. Companion to #12 (issue #10).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 11:15:08 -07:00
claude-ceo-assistant b9ca4ad84a Merge pull request 'fix(ci): mark CodeQL continue-on-error (advisory only) — closes #156' (#35) from fix/codeql-continue-on-error-156 into main 2026-05-07 17:26:59 +00:00
4 changed files with 440 additions and 9 deletions
@@ -435,6 +435,34 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
return 0, nil, proxyErr
}
// Pre-flight container-health check (#36). The dispatchA2A path below
// does Docker-DNS forwarding to `ws-<wsShort>:8000` and only catches a
// missing/dead container REACTIVELY via maybeMarkContainerDead in
// handleA2ADispatchError. That works but costs the caller a full
// network-timeout (2-30s) before the structured 503 surfaces.
//
// When we KNOW the workspace is container-backed (h.docker != nil + we
// rewrite to Docker-DNS form below), do a single proactive
// RunningContainerName lookup. If the container is genuinely missing,
// short-circuit with the same structured 503 + async restart that
// maybeMarkContainerDead would produce — but immediately, without the
// network round-trip.
//
// Three outcomes of provisioner.RunningContainerName(ctx, h.docker, id):
// ("ws-<id>", nil) → forward as today.
// ("", nil) → container is genuinely not running. Fast-503.
// ("", err) → transient daemon error. Fall through to optimistic
// forward — matches Provisioner.IsRunning's
// (true, err) "fail-soft as alive" contract.
//
// Same SSOT as findRunningContainer (#10/#12). See AST gate
// TestProxyA2A_RoutesThroughProvisionerSSOT.
if h.provisioner != nil && platformInDocker && strings.HasPrefix(agentURL, "http://"+provisioner.ContainerName(workspaceID)+":") {
if proxyErr := h.preflightContainerHealth(ctx, workspaceID); proxyErr != nil {
return 0, nil, proxyErr
}
}
startTime := time.Now()
resp, cancelFwd, err := h.dispatchA2A(ctx, workspaceID, agentURL, body, callerID)
if cancelFwd != nil {
@@ -198,6 +198,60 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace
return true
}
// preflightContainerHealth runs a proactive Provisioner.IsRunning check
// (#36) before dispatching the a2a forward. Routed through provisioner's
// SSOT IsRunning, which itself wraps RunningContainerName — same source
// as findRunningContainer in the plugins handler (#10/#12).
//
// Returns nil when the forward should proceed:
// - container is running, OR
// - daemon errored transiently (matches IsRunning's (true, err)
// "fail-soft as alive" contract — let the optimistic forward run
// and reactive maybeMarkContainerDead catch a real failure).
//
// Returns a structured 503 + triggers the same async restart that
// maybeMarkContainerDead would produce, when:
// - container is genuinely not running (NotFound / Exited / Created…).
//
// The point of running this BEFORE the forward is to save the caller
// 2-30s of network-timeout cost when the container is missing — a common
// shape post-EC2-replace (see molecule-controlplane#20 incident
// 2026-05-07) where the reconciler hasn't respawned the agent yet.
func (h *WorkspaceHandler) preflightContainerHealth(ctx context.Context, workspaceID string) *proxyA2AError {
running, err := h.provisioner.IsRunning(ctx, workspaceID)
if err != nil {
// Transient daemon error. Provisioner.IsRunning returns (true, err)
// in this case — fall through to the optimistic forward, reactive
// maybeMarkContainerDead handles a real failure later.
log.Printf("ProxyA2A preflight: IsRunning transient error for %s: %v (proceeding with forward)", workspaceID, err)
return nil
}
if running {
// Container is running — forward as today.
return nil
}
// Container is genuinely not running. Mark offline + trigger restart
// (same effect as maybeMarkContainerDead's branch), and return the
// structured 503 immediately so the caller skips the forward.
log.Printf("ProxyA2A preflight: container for %s is not running — marking offline and triggering restart (#36)", workspaceID)
if _, dbErr := db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status NOT IN ('removed', 'provisioning')`,
models.StatusOffline, workspaceID); dbErr != nil {
log.Printf("ProxyA2A preflight: failed to mark workspace %s offline: %v", workspaceID, dbErr)
}
db.ClearWorkspaceKeys(ctx, workspaceID)
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOffline), workspaceID, map[string]interface{}{})
go h.RestartByID(workspaceID)
return &proxyA2AError{
Status: http.StatusServiceUnavailable,
Response: gin.H{
"error": "workspace container not running — restart triggered",
"restarting": true,
"preflight": true, // distinguishes from reactive containerDead path
},
}
}
// logA2AFailure records a failed A2A attempt to activity_logs in a detached
// goroutine (the request context may already be done by the time it runs).
func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string, err error, durationMs int) {
@@ -0,0 +1,194 @@
package handlers
import (
"context"
"errors"
"go/ast"
"go/parser"
"go/token"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
)
// preflightLocalProv is a controllable LocalProvisionerAPI stub for the
// preflight tests (#36). Other API methods panic to guard against tests
// that should be using a different stub.
type preflightLocalProv struct {
running bool
err error
calls int
calledWith []string
}
func (p *preflightLocalProv) IsRunning(_ context.Context, workspaceID string) (bool, error) {
p.calls++
p.calledWith = append(p.calledWith, workspaceID)
return p.running, p.err
}
func (p *preflightLocalProv) Start(_ context.Context, _ provisioner.WorkspaceConfig) (string, error) {
panic("preflightLocalProv: Start not implemented")
}
func (p *preflightLocalProv) Stop(_ context.Context, _ string) error {
panic("preflightLocalProv: Stop not implemented")
}
func (p *preflightLocalProv) ExecRead(_ context.Context, _, _ string) ([]byte, error) {
panic("preflightLocalProv: ExecRead not implemented")
}
func (p *preflightLocalProv) RemoveVolume(_ context.Context, _ string) error {
panic("preflightLocalProv: RemoveVolume not implemented")
}
func (p *preflightLocalProv) VolumeHasFile(_ context.Context, _, _ string) (bool, error) {
panic("preflightLocalProv: VolumeHasFile not implemented")
}
func (p *preflightLocalProv) WriteAuthTokenToVolume(_ context.Context, _, _ string) error {
panic("preflightLocalProv: WriteAuthTokenToVolume not implemented")
}
// TestPreflight_ContainerRunning_ReturnsNil — IsRunning(true,nil): forward
// proceeds. preflight returns nil → caller continues to dispatchA2A.
func TestPreflight_ContainerRunning_ReturnsNil(t *testing.T) {
_ = setupTestDB(t)
stub := &preflightLocalProv{running: true, err: nil}
h := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
h.provisioner = stub
if err := h.preflightContainerHealth(context.Background(), "ws-running-123"); err != nil {
t.Fatalf("preflight should return nil when container running, got %+v", err)
}
if stub.calls != 1 {
t.Errorf("IsRunning should be called exactly once, got %d", stub.calls)
}
if len(stub.calledWith) != 1 || stub.calledWith[0] != "ws-running-123" {
t.Errorf("IsRunning should be called with workspace id, got %v", stub.calledWith)
}
}
// TestPreflight_ContainerNotRunning_StructuredFastFail — IsRunning(false,nil):
// preflight returns structured 503 with restarting=true + preflight=true, AND
// triggers the offline-flip + WORKSPACE_OFFLINE broadcast + async restart.
// This is the load-bearing case — saves the caller 2-30s of network timeout.
func TestPreflight_ContainerNotRunning_StructuredFastFail(t *testing.T) {
mock := setupTestDB(t)
_ = setupTestRedis(t)
stub := &preflightLocalProv{running: false, err: nil}
h := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
h.provisioner = stub
// Expect the offline-flip UPDATE.
mock.ExpectExec(`UPDATE workspaces SET status =`).
WithArgs(models.StatusOffline, "ws-dead-456").
WillReturnResult(sqlmock.NewResult(0, 1))
// Broadcaster's INSERT INTO structure_events fires too — best-effort
// log entry for the WORKSPACE_OFFLINE event. Match permissively.
mock.ExpectExec(`INSERT INTO structure_events`).
WillReturnResult(sqlmock.NewResult(0, 1))
proxyErr := h.preflightContainerHealth(context.Background(), "ws-dead-456")
if proxyErr == nil {
t.Fatal("preflight should return *proxyA2AError when container not running")
}
if proxyErr.Status != 503 {
t.Errorf("expected 503, got %d", proxyErr.Status)
}
if got := proxyErr.Response["restarting"]; got != true {
t.Errorf("response should mark restarting=true, got %v", got)
}
if got := proxyErr.Response["preflight"]; got != true {
t.Errorf("response should mark preflight=true so callers can distinguish from reactive containerDead, got %v", got)
}
if got := proxyErr.Response["error"]; got != "workspace container not running — restart triggered" {
t.Errorf("error message mismatch, got %q", got)
}
// Note: broadcaster firing is exercised by the production path's
// h.broadcaster.RecordAndBroadcast call but not asserted here — the
// real *events.Broadcaster doesn't expose received events for inspection.
// The DB UPDATE expectation is sufficient to pin the offline-flip path.
}
// TestPreflight_TransientError_FailsSoftAsAlive — IsRunning(true,err): the
// (true, err) "fail-soft" contract — preflight returns nil so the optimistic
// forward runs; reactive maybeMarkContainerDead handles a real failure later.
// This pin is critical: a flaky daemon must NOT trigger a restart cascade.
func TestPreflight_TransientError_FailsSoftAsAlive(t *testing.T) {
_ = setupTestDB(t)
stub := &preflightLocalProv{running: true, err: errors.New("docker daemon EOF")}
h := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
h.provisioner = stub
if err := h.preflightContainerHealth(context.Background(), "ws-flaky-789"); err != nil {
t.Fatalf("preflight should return nil on transient error (fail-soft), got %+v", err)
}
// No DB UPDATE expected — sqlmock would complain about unexpected calls
// at test cleanup if the offline-flip path fired.
}
// TestProxyA2A_Preflight_RoutesThroughProvisionerSSOT — AST gate (#36 mirror
// of #12's gate). Pins the invariant that preflightContainerHealth uses the
// SSOT Provisioner.IsRunning helper, NOT a parallel docker.ContainerInspect
// of its own.
//
// Mutation invariant: if a future PR replaces h.provisioner.IsRunning with
// a direct cli.ContainerInspect call, this test fails. That's the signal to
// either (a) extend Provisioner.IsRunning's contract OR (b) document why
// this call site needs to differ. Either way, the drift gets a reviewer's
// attention instead of shipping silently.
func TestProxyA2A_Preflight_RoutesThroughProvisionerSSOT(t *testing.T) {
fset := token.NewFileSet()
file, err := parser.ParseFile(fset, "a2a_proxy_helpers.go", nil, parser.ParseComments)
if err != nil {
t.Fatalf("parse a2a_proxy_helpers.go: %v", err)
}
var fn *ast.FuncDecl
ast.Inspect(file, func(n ast.Node) bool {
f, ok := n.(*ast.FuncDecl)
if !ok || f.Name.Name != "preflightContainerHealth" {
return true
}
fn = f
return false
})
if fn == nil {
t.Fatal("preflightContainerHealth not found — was it renamed? update this gate or the SSOT routing assumption")
}
var (
callsIsRunning bool
callsContainerInspectRaw bool
callsRunningContainerNameDirect bool
)
ast.Inspect(fn.Body, func(n ast.Node) bool {
call, ok := n.(*ast.CallExpr)
if !ok {
return true
}
sel, ok := call.Fun.(*ast.SelectorExpr)
if !ok {
return true
}
switch sel.Sel.Name {
case "IsRunning":
callsIsRunning = true
case "ContainerInspect":
callsContainerInspectRaw = true
case "RunningContainerName":
// Direct RunningContainerName is also acceptable SSOT — but
// preferring IsRunning keeps the (bool, error) contract that
// already exists in the helper API surface.
callsRunningContainerNameDirect = true
}
return true
})
if !callsIsRunning && !callsRunningContainerNameDirect {
t.Errorf("preflightContainerHealth must call provisioner.IsRunning OR provisioner.RunningContainerName for the SSOT health check — see molecule-core#36. Found neither.")
}
if callsContainerInspectRaw {
t.Errorf("preflightContainerHealth carries a direct ContainerInspect call. This is the parallel-impl drift molecule-core#36 fixed. " +
"Either route through provisioner.IsRunning OR — if a new use case truly needs a different inspect — extend the helper's contract first and update this gate to allow the specific delta.")
}
}
@@ -1,6 +1,7 @@
package handlers
import (
"runtime"
"sync"
"sync/atomic"
"testing"
@@ -15,6 +16,42 @@ func resetRestartStatesFor(workspaceID string) {
restartStates.Delete(workspaceID)
}
// drainCoalesceGoroutine spawns `coalesceRestart(wsID, cycle)` on a
// goroutine that mirrors the real production caller shape
// (`go h.RestartByID(...)` from a2a_proxy.go, a2a_proxy_helpers.go,
// main.go), and registers a t.Cleanup that blocks until the goroutine
// has TERMINATED — not just panicked-and-recovered, fully exited.
//
// This is the bleed-prevention contract for Class H (Task #170): no
// test in this file may declare itself complete while a coalesceRestart
// goroutine it spawned is still alive, because that goroutine could
// otherwise wake up after the test's sqlmock has been closed and
// either:
// - issue a stale INSERT that gets attributed to the next test's
// sqlmock connection — surfaces as
// "INSERT-not-expected for kind=DELEGATION_FAILED" / =WORKSPACE_PROVISION_FAILED
// in a neighbour test that doesn't itself touch coalesceRestart; or
// - hold a reference to the closed *sql.DB and panic on the next op.
//
// Implementation notes:
// - sync.WaitGroup must be Add()ed BEFORE the goroutine is spawned;
// Add inside the goroutine races with Wait.
// - t.Cleanup runs in LIFO order, so this composes safely with other
// cleanups (e.g. setupTestDB's mockDB.Close).
// - We don't bound the Wait with a timeout — if the goroutine
// genuinely deadlocks, the whole test process should hang and fail
// under -timeout. A timeout-then-orphan would mask the bleed.
func drainCoalesceGoroutine(t *testing.T, wsID string, cycle func()) {
t.Helper()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
coalesceRestart(wsID, cycle)
}()
t.Cleanup(wg.Wait)
}
// TestCoalesceRestart_SingleCallRunsOneCycle is the baseline:
// no concurrency, one cycle. If this fails the gate logic is broken at
// its simplest path.
@@ -200,19 +237,45 @@ func TestCoalesceRestart_PanicInCycleClearsState(t *testing.T) {
const wsID = "test-coalesce-panic-recovery"
resetRestartStatesFor(wsID)
// First call's cycle panics. coalesceRestart's defer must swallow
// the panic so this test caller doesn't see it propagate up — that
// matches what the real production caller (`go h.RestartByID(...)`)
// gets: the goroutine survives, no process crash.
defer func() {
if r := recover(); r != nil {
t.Errorf("panic should NOT propagate out of coalesceRestart (would crash the platform process from a goroutine), got: %v", r)
// Spawn the panicking cycle on a goroutine via drainCoalesceGoroutine
// — this mirrors the real production callsite shape
// (`go h.RestartByID(...)` from a2a_proxy.go:584,
// a2a_proxy_helpers.go:197, main.go:213). The previous form called
// coalesceRestart synchronously, which neither exercised the
// goroutine-survival contract nor caught Class H bleed regressions
// where the panic-recovery goroutine outlives the test and pollutes
// the next test's sqlmock with INSERTs from runRestartCycle's
// LogActivity calls (kinds DELEGATION_FAILED / WORKSPACE_PROVISION_FAILED).
//
// drainCoalesceGoroutine registers a t.Cleanup that Wait()s for the
// goroutine to TERMINATE — not merely panic-and-recover — before
// the test ends.
drainCoalesceGoroutine(t, wsID, func() { panic("simulated cycle failure") })
// We need a mid-test barrier (not just the t.Cleanup-time barrier)
// so the second coalesceRestart below sees state.running=false. The
// goroutine clears state.running inside its deferred recover; poll
// the package-level restartStates map until that observable flip
// happens. Bound at 2s — longer = real bug.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
sv, ok := restartStates.Load(wsID)
if ok {
st := sv.(*restartState)
st.mu.Lock()
running := st.running
st.mu.Unlock()
if !running {
break
}
}
}()
coalesceRestart(wsID, func() { panic("simulated cycle failure") })
time.Sleep(time.Millisecond)
}
// Second call must run a fresh cycle. If running stayed true after
// the panic, this call would early-return without invoking cycle.
// Synchronous — no panic, so no goroutine to drain, and we want to
// assert ran.Load() immediately after.
var ran atomic.Bool
coalesceRestart(wsID, func() { ran.Store(true) })
if !ran.Load() {
@@ -220,6 +283,98 @@ func TestCoalesceRestart_PanicInCycleClearsState(t *testing.T) {
}
}
// TestCoalesceRestart_DrainHelperWaitsForGoroutineExit is the Class H
// regression guard for Task #170. It asserts the contract enforced by
// drainCoalesceGoroutine: t.Cleanup blocks until the spawned
// coalesceRestart goroutine has FULLY EXITED — not merely recovered
// from panic. This is the contract that prevents stale LogActivity
// INSERTs from a recovering goroutine bleeding into the next test's
// sqlmock (the failure mode reported as "INSERT-not-expected for
// kind=DELEGATION_FAILED" in TestPooledWithEICTunnel_PreservesFnErr).
//
// We use a deterministic bleed-shape probe rather than goroutine-count
// arithmetic: the cycle blocks on a release channel for ~150ms — long
// enough that without a Wait barrier, the outer sub-test would return
// before the goroutine exited. We then verify the wg.Wait inside
// drainCoalesceGoroutine actually delayed t.Run's completion: total
// elapsed must be >= the block duration. Asserts exact-shape, not
// substring (per saved-memory feedback_assert_exact_not_substring):
// elapsed < blockFor would mean the cleanup didn't wait, which is the
// exact bleed we're guarding against.
//
// We additionally panic from the cycle (after the block) to confirm
// the helper waits past panic recovery, not just past cycle return.
func TestCoalesceRestart_DrainHelperWaitsForGoroutineExit(t *testing.T) {
const blockFor = 150 * time.Millisecond
const wsID = "test-coalesce-drain-helper-contract"
resetRestartStatesFor(wsID)
// done is closed inside the cycle, AFTER the block + AFTER the
// panic (which the deferred recover in coalesceRestart catches).
// Actually: defer in cycle runs before panic propagates to the
// outer recover. Use defer to close.
exited := make(chan struct{})
subStart := time.Now()
t.Run("drain_under_subtest", func(st *testing.T) {
drainCoalesceGoroutine(st, wsID, func() {
defer close(exited)
time.Sleep(blockFor)
panic("contract-test panic-after-block")
})
// st.Cleanup runs here, before t.Run returns. wg.Wait must
// block until the goroutine has finished its panic recovery.
})
subElapsed := time.Since(subStart)
// Contract: the helper's wg.Wait MUST have blocked t.Run from
// returning until after the cycle's block + panic recovery.
if subElapsed < blockFor {
t.Fatalf(
"drainCoalesceGoroutine contract violated: t.Run returned in %v, "+
"but cycle blocks for %v. The Wait barrier is broken — a "+
"coalesceRestart goroutine can outlive its test's t.Cleanup "+
"and pollute neighbour-test sqlmock state (Class H bleed).",
subElapsed, blockFor,
)
}
// And the goroutine must have actually closed `exited` (i.e. ran
// the deferred close before panic propagated through coalesceRestart's
// recover). If exited is still open here, the goroutine never
// reached the close — meaning either the panic short-circuited the
// defer (Go runtime bug — won't happen) or the goroutine never
// ran at all (drainCoalesceGoroutine spawn shape regressed).
select {
case <-exited:
// Correct path.
default:
t.Fatal("cycle goroutine never reached its deferred close — panic-recovery contract regressed")
}
// Belt-and-suspenders: the post-recover state-clear must have
// flipped state.running back to false. If this fails, the panic
// path skipped the deferred state-clear in coalesceRestart.
sv, ok := restartStates.Load(wsID)
if !ok {
t.Fatal("restartStates entry missing for wsID after cycle — sync.Map regression")
}
st := sv.(*restartState)
st.mu.Lock()
running := st.running
st.mu.Unlock()
if running {
t.Error("state.running was not cleared after panic — sticky-running deadlock regressed")
}
// Reference runtime.NumGoroutine to keep the runtime import
// honest — also a useful smoke check that the goroutine count
// hasn't ballooned 10x while debugging this test.
if n := runtime.NumGoroutine(); n > 200 {
t.Logf("warning: NumGoroutine=%d after drain — high but not necessarily a leak", n)
}
}
// TestCoalesceRestart_DifferentWorkspacesDoNotSerialize verifies the
// per-workspace state map: an in-flight restart for ws A must not
// block restarts for ws B. Important for performance — without this,