Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b9d2786f45 |
@@ -14,42 +14,12 @@ name: Handlers Postgres Integration
|
||||
# self-review caught it took 2 minutes to set up and would have caught
|
||||
# the bug at PR-time.
|
||||
#
|
||||
# Why this workflow does NOT use `services: postgres:` (Class B fix)
|
||||
# ------------------------------------------------------------------
|
||||
# Our act_runner config has `container.network: host` (operator host
|
||||
# /opt/molecule/runners/config.yaml), which act_runner applies to BOTH
|
||||
# the job container AND every service container. With host-net, two
|
||||
# concurrent runs of this workflow both try to bind 0.0.0.0:5432 — the
|
||||
# second postgres FATALs with `could not create any TCP/IP sockets:
|
||||
# Address in use`, and Docker auto-removes it (act_runner sets
|
||||
# AutoRemove:true on service containers). By the time the migrations
|
||||
# step runs `psql`, the postgres container is gone, hence
|
||||
# `Connection refused` then `failed to remove container: No such
|
||||
# container` at cleanup time.
|
||||
# This job spins a Postgres service container, applies the migration,
|
||||
# and runs `go test -tags=integration` against a live DB. Required
|
||||
# check on staging branch protection — backend handler PRs cannot
|
||||
# merge without a real-DB regression gate.
|
||||
#
|
||||
# Per-job `container.network` override is silently ignored by
|
||||
# act_runner — `--network and --net in the options will be ignored.`
|
||||
# appears in the runner log. Documented constraint.
|
||||
#
|
||||
# So we sidestep `services:` entirely. The job container still uses
|
||||
# host-net (inherited from runner config; required for cache server
|
||||
# discovery on the bridge IP 172.18.0.17:42631). We launch a sibling
|
||||
# postgres on the existing `molecule-monorepo-net` bridge with a
|
||||
# UNIQUE name per run — `pg-handlers-${RUN_ID}-${RUN_ATTEMPT}` — and
|
||||
# read its bridge IP via `docker inspect`. A host-net job container
|
||||
# can reach a bridge-net container directly via the bridge IP (verified
|
||||
# manually on operator host 2026-05-08).
|
||||
#
|
||||
# Trade-offs vs. the original `services:` shape:
|
||||
# + No host-port collision; N parallel runs share the bridge cleanly
|
||||
# + `if: always()` cleanup runs even on test-step failure
|
||||
# - One more step in the workflow (+~3 lines)
|
||||
# - Requires `molecule-monorepo-net` to exist on the operator host
|
||||
# (it does; declared in docker-compose.yml + docker-compose.infra.yml)
|
||||
#
|
||||
# Class B Hongming-owned CICD red sweep, 2026-05-08.
|
||||
#
|
||||
# Cost: ~30s job (postgres pull from cache + go build + 4 tests).
|
||||
# Cost: ~30s job (postgres pull from GH cache + go build + 4 tests).
|
||||
|
||||
on:
|
||||
push:
|
||||
@@ -89,14 +59,20 @@ jobs:
|
||||
name: Handlers Postgres Integration
|
||||
needs: detect-changes
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
# Unique name per run so concurrent jobs don't collide on the
|
||||
# bridge network. ${RUN_ID}-${RUN_ATTEMPT} is unique even across
|
||||
# workflow_dispatch reruns of the same run_id.
|
||||
PG_NAME: pg-handlers-${{ github.run_id }}-${{ github.run_attempt }}
|
||||
# Bridge network already exists on the operator host (declared
|
||||
# in docker-compose.yml + docker-compose.infra.yml).
|
||||
PG_NETWORK: molecule-monorepo-net
|
||||
services:
|
||||
postgres:
|
||||
image: postgres:15-alpine
|
||||
env:
|
||||
POSTGRES_PASSWORD: test
|
||||
POSTGRES_DB: molecule
|
||||
ports:
|
||||
- 5432:5432
|
||||
# GHA spins this with --health-cmd built in for postgres images.
|
||||
options: >-
|
||||
--health-cmd pg_isready
|
||||
--health-interval 5s
|
||||
--health-timeout 5s
|
||||
--health-retries 10
|
||||
defaults:
|
||||
run:
|
||||
working-directory: workspace-server
|
||||
@@ -113,57 +89,16 @@ jobs:
|
||||
with:
|
||||
go-version: 'stable'
|
||||
|
||||
- if: needs.detect-changes.outputs.handlers == 'true'
|
||||
name: Start sibling Postgres on bridge network
|
||||
working-directory: .
|
||||
run: |
|
||||
# Sanity: the bridge network must exist on the operator host.
|
||||
# Hard-fail loud if it doesn't — easier to spot than a silent
|
||||
# auto-create that diverges from the rest of the stack.
|
||||
if ! docker network inspect "${PG_NETWORK}" >/dev/null 2>&1; then
|
||||
echo "::error::Bridge network '${PG_NETWORK}' missing on operator host. Re-run docker-compose.infra.yml or check ops handbook."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# If a stale container with the same name exists (rerun on
|
||||
# the same run_id), wipe it first.
|
||||
docker rm -f "${PG_NAME}" >/dev/null 2>&1 || true
|
||||
|
||||
docker run -d \
|
||||
--name "${PG_NAME}" \
|
||||
--network "${PG_NETWORK}" \
|
||||
--health-cmd "pg_isready -U postgres" \
|
||||
--health-interval 5s \
|
||||
--health-timeout 5s \
|
||||
--health-retries 10 \
|
||||
-e POSTGRES_PASSWORD=test \
|
||||
-e POSTGRES_DB=molecule \
|
||||
postgres:15-alpine >/dev/null
|
||||
|
||||
# Read back the bridge IP. Always present immediately after
|
||||
# `docker run -d` for bridge networks.
|
||||
PG_HOST=$(docker inspect "${PG_NAME}" \
|
||||
--format "{{(index .NetworkSettings.Networks \"${PG_NETWORK}\").IPAddress}}")
|
||||
if [ -z "${PG_HOST}" ]; then
|
||||
echo "::error::Could not resolve PG_HOST for ${PG_NAME} on ${PG_NETWORK}"
|
||||
docker logs "${PG_NAME}" || true
|
||||
exit 1
|
||||
fi
|
||||
echo "PG_HOST=${PG_HOST}" >> "$GITHUB_ENV"
|
||||
echo "INTEGRATION_DB_URL=postgres://postgres:test@${PG_HOST}:5432/molecule?sslmode=disable" >> "$GITHUB_ENV"
|
||||
echo "Started ${PG_NAME} at ${PG_HOST}:5432"
|
||||
|
||||
- if: needs.detect-changes.outputs.handlers == 'true'
|
||||
name: Apply migrations to Postgres service
|
||||
env:
|
||||
PGPASSWORD: test
|
||||
run: |
|
||||
# Wait for postgres to actually accept connections. Docker's
|
||||
# health-cmd handles container-side readiness, but the wire
|
||||
# to the bridge IP is best-tested with pg_isready directly.
|
||||
# Wait for postgres to actually accept connections (the
|
||||
# GHA --health-cmd is best-effort but psql can still race).
|
||||
for i in {1..15}; do
|
||||
if pg_isready -h "${PG_HOST}" -p 5432 -U postgres -q; then break; fi
|
||||
echo "waiting for postgres at ${PG_HOST}:5432..."; sleep 2
|
||||
if pg_isready -h localhost -p 5432 -U postgres -q; then break; fi
|
||||
echo "waiting for postgres..."; sleep 2
|
||||
done
|
||||
|
||||
# Apply every .up.sql in lexicographic order with
|
||||
@@ -196,7 +131,7 @@ jobs:
|
||||
# not fine once a cross-table atomicity test came in.
|
||||
set +e
|
||||
for migration in $(ls migrations/*.sql 2>/dev/null | grep -v '\.down\.sql$' | sort); do
|
||||
if psql -h "${PG_HOST}" -U postgres -d molecule -v ON_ERROR_STOP=1 \
|
||||
if psql -h localhost -U postgres -d molecule -v ON_ERROR_STOP=1 \
|
||||
-f "$migration" >/dev/null 2>&1; then
|
||||
echo "✓ $(basename "$migration")"
|
||||
else
|
||||
@@ -210,7 +145,7 @@ jobs:
|
||||
# fail if any didn't land — that would be a real regression we
|
||||
# want loud.
|
||||
for tbl in delegations workspaces activity_logs pending_uploads; do
|
||||
if ! psql -h "${PG_HOST}" -U postgres -d molecule -tA \
|
||||
if ! psql -h localhost -U postgres -d molecule -tA \
|
||||
-c "SELECT 1 FROM information_schema.tables WHERE table_name = '$tbl'" \
|
||||
| grep -q 1; then
|
||||
echo "::error::$tbl table missing after migration replay — handler integration tests would be meaningless"
|
||||
@@ -221,32 +156,16 @@ jobs:
|
||||
|
||||
- if: needs.detect-changes.outputs.handlers == 'true'
|
||||
name: Run integration tests
|
||||
env:
|
||||
INTEGRATION_DB_URL: postgres://postgres:test@localhost:5432/molecule?sslmode=disable
|
||||
run: |
|
||||
# INTEGRATION_DB_URL is exported by the start-postgres step;
|
||||
# points at the per-run bridge IP, not 127.0.0.1, so concurrent
|
||||
# workflow runs don't fight over a host-net 5432 port.
|
||||
go test -tags=integration -timeout 5m -v ./internal/handlers/ -run "^TestIntegration_"
|
||||
|
||||
- if: failure() && needs.detect-changes.outputs.handlers == 'true'
|
||||
- if: needs.detect-changes.outputs.handlers == 'true' && failure()
|
||||
name: Diagnostic dump on failure
|
||||
env:
|
||||
PGPASSWORD: test
|
||||
run: |
|
||||
echo "::group::postgres container status"
|
||||
docker ps -a --filter "name=${PG_NAME}" --format '{{.Status}} {{.Names}}' || true
|
||||
docker logs "${PG_NAME}" 2>&1 | tail -50 || true
|
||||
echo "::endgroup::"
|
||||
echo "::group::delegations table state"
|
||||
psql -h "${PG_HOST}" -U postgres -d molecule -c "SELECT * FROM delegations LIMIT 50;" || true
|
||||
psql -h localhost -U postgres -d molecule -c "SELECT * FROM delegations LIMIT 50;" || true
|
||||
echo "::endgroup::"
|
||||
|
||||
- if: always() && needs.detect-changes.outputs.handlers == 'true'
|
||||
name: Stop sibling Postgres
|
||||
working-directory: .
|
||||
run: |
|
||||
# always() so containers don't leak when migrations or tests
|
||||
# fail. The cleanup is best-effort: if the container is
|
||||
# already gone (e.g. concurrent rerun race), don't fail the job.
|
||||
docker rm -f "${PG_NAME}" >/dev/null 2>&1 || true
|
||||
echo "Cleaned up ${PG_NAME}"
|
||||
|
||||
|
||||
@@ -7,32 +7,6 @@ export default defineConfig({
|
||||
test: {
|
||||
environment: 'node',
|
||||
exclude: ['e2e/**', 'node_modules/**', '**/dist/**'],
|
||||
// CI-conditional test timeout (issue #96).
|
||||
//
|
||||
// Vitest's 5000ms default is too tight for the first test in any
|
||||
// file under our CI shape: `npx vitest run --coverage` on the
|
||||
// self-hosted Gitea Actions Docker runner. The cold-start cost
|
||||
// (v8 coverage instrumentation init + JSDOM bootstrap + module-
|
||||
// graph import for @/components/* and @/lib/* + first React
|
||||
// render) consistently consumes 5-7 seconds for the first
|
||||
// synchronous test in heavyweight component files
|
||||
// (ActivityTab.test.tsx, CreateWorkspaceDialog.test.tsx,
|
||||
// ConfigTab.provider.test.tsx) — even though every subsequent
|
||||
// test in the same file completes in 100-1500ms.
|
||||
//
|
||||
// Empirically the worst observed first-test was 6453ms in a
|
||||
// single file (CreateWorkspaceDialog). 30000ms gives ~5x
|
||||
// headroom over that on CI; we still keep 5000ms locally so
|
||||
// genuine waitFor races / hung promises stay sensitive in dev.
|
||||
//
|
||||
// Same vitest pattern documented at:
|
||||
// https://vitest.dev/config/testtimeout
|
||||
// https://vitest.dev/guide/coverage#profiling-test-performance
|
||||
//
|
||||
// Per-test duration is still emitted to the CI log; if a test
|
||||
// ever silently approaches 25-30s under this raised ceiling that
|
||||
// will surface as a duration regression and we revisit.
|
||||
testTimeout: process.env.CI ? 30000 : 5000,
|
||||
// Coverage is instrumented but NOT yet a CI gate — first land
|
||||
// observability so we can see the baseline, then dial in
|
||||
// thresholds + a hard gate in a follow-up PR (#1815). Today's
|
||||
|
||||
@@ -1,137 +0,0 @@
|
||||
# Runbook — Handlers Postgres Integration port-collision substrate
|
||||
|
||||
**Status:** Resolved 2026-05-08 (PR for class B Hongming-owned CICD red sweep).
|
||||
|
||||
## Symptom
|
||||
|
||||
`Handlers Postgres Integration` workflow fails on staging push and PRs.
|
||||
Step `Apply migrations to Postgres service` shows:
|
||||
|
||||
```
|
||||
psql: error: connection to server at "127.0.0.1", port 5432 failed: Connection refused
|
||||
```
|
||||
|
||||
Job-cleanup step further down logs:
|
||||
|
||||
```
|
||||
Cleaning up services for job Handlers Postgres Integration
|
||||
failed to remove container: Error response from daemon: No such container: <id>
|
||||
```
|
||||
|
||||
…confirming the postgres service container was already gone before
|
||||
cleanup ran.
|
||||
|
||||
## Root cause
|
||||
|
||||
Our Gitea act_runner (operator host `5.78.80.188`,
|
||||
`/opt/molecule/runners/config.yaml`) sets:
|
||||
|
||||
```yaml
|
||||
container:
|
||||
network: host
|
||||
```
|
||||
|
||||
…which act_runner applies to BOTH the job container AND every
|
||||
`services:` container in a workflow. Multiple workflow instances
|
||||
running concurrently across the 16 parallel runners each try to bind
|
||||
postgres on `0.0.0.0:5432`. The first wins; subsequent instances exit
|
||||
immediately with:
|
||||
|
||||
```
|
||||
LOG: could not bind IPv4 address "0.0.0.0": Address in use
|
||||
HINT: Is another postmaster already running on port 5432?
|
||||
FATAL: could not create any TCP/IP sockets
|
||||
```
|
||||
|
||||
act_runner sets `AutoRemove:true` on service containers, so Docker
|
||||
garbage-collects them as soon as they exit. By the time the migrations
|
||||
step runs `pg_isready` / `psql`, the container is gone and connection
|
||||
refused.
|
||||
|
||||
Reproduction (operator host):
|
||||
|
||||
```bash
|
||||
docker run --rm -d --name pg-A --network host \
|
||||
-e POSTGRES_PASSWORD=test postgres:15-alpine
|
||||
docker run -d --name pg-B --network host \
|
||||
-e POSTGRES_PASSWORD=test postgres:15-alpine
|
||||
docker logs pg-B # FATAL: could not create any TCP/IP sockets
|
||||
```
|
||||
|
||||
## Why per-job override doesn't work
|
||||
|
||||
The natural fix — per-job `container.network` override — is silently
|
||||
ignored by act_runner. The runner log emits:
|
||||
|
||||
```
|
||||
--network and --net in the options will be ignored.
|
||||
```
|
||||
|
||||
This is a documented act_runner constraint: container network is a
|
||||
runner-wide setting, not per-job. Source: gitea/act_runner config docs
|
||||
+ vegardit/docker-gitea-act-runner issue #7.
|
||||
|
||||
Flipping the global `container.network` to `bridge` would break every
|
||||
other workflow in the repo (cache server discovery,
|
||||
`molecule-monorepo-net` peer access during integration tests, etc.) —
|
||||
unacceptable blast radius for a per-test bug.
|
||||
|
||||
## Fix shape
|
||||
|
||||
`handlers-postgres-integration.yml` no longer uses `services: postgres:`.
|
||||
It launches a sibling postgres container manually on the existing
|
||||
`molecule-monorepo-net` bridge network with a per-run unique name:
|
||||
|
||||
```yaml
|
||||
env:
|
||||
PG_NAME: pg-handlers-${{ github.run_id }}-${{ github.run_attempt }}
|
||||
PG_NETWORK: molecule-monorepo-net
|
||||
|
||||
steps:
|
||||
- name: Start sibling Postgres on bridge network
|
||||
run: |
|
||||
docker run -d --name "${PG_NAME}" --network "${PG_NETWORK}" \
|
||||
...
|
||||
postgres:15-alpine
|
||||
PG_HOST=$(docker inspect "${PG_NAME}" \
|
||||
--format "{{(index .NetworkSettings.Networks \"${PG_NETWORK}\").IPAddress}}")
|
||||
echo "PG_HOST=${PG_HOST}" >> "$GITHUB_ENV"
|
||||
|
||||
# … migrations + tests use ${PG_HOST}, not 127.0.0.1 …
|
||||
|
||||
- if: always() && …
|
||||
name: Stop sibling Postgres
|
||||
run: docker rm -f "${PG_NAME}" || true
|
||||
```
|
||||
|
||||
The host-net job container can reach a bridge-net container via the
|
||||
bridge IP directly (verified manually, 2026-05-08). Two parallel runs
|
||||
use different names + different bridge IPs — no collision.
|
||||
|
||||
## Future-proofing
|
||||
|
||||
Other workflows that hit the same shape (any `services:` with a
|
||||
fixed-port image) will exhibit the same failure mode under
|
||||
host-network runner config. Translate using this same pattern:
|
||||
|
||||
1. Drop the `services:` block.
|
||||
2. Use `${{ github.run_id }}-${{ github.run_attempt }}` for unique
|
||||
container name.
|
||||
3. Launch on `molecule-monorepo-net` (already trusted bridge in
|
||||
`docker-compose.infra.yml`).
|
||||
4. Read back the bridge IP via `docker inspect` and export as a step env.
|
||||
5. `if: always()` cleanup step at the end.
|
||||
|
||||
If the count of such workflows grows, factor into a composite action
|
||||
(`./.github/actions/sibling-postgres`) so the substrate logic lives
|
||||
in one place.
|
||||
|
||||
## Related
|
||||
|
||||
- Issue #88 (closed by #92): localhost → 127.0.0.1 fix that unmasked
|
||||
this collision; the IPv6 fix is correct, port collision is the new
|
||||
layer.
|
||||
- Issue #94 created `molecule-monorepo-net` + `alpine:latest` as
|
||||
prereqs.
|
||||
- Saved memory `feedback_act_runner_github_server_url` documents
|
||||
another act_runner-vs-GHA divergence (server URL).
|
||||
@@ -1,457 +0,0 @@
|
||||
package handlers
|
||||
|
||||
// eic_tunnel_pool.go — refcounted pool for EIC SSH tunnels keyed on
|
||||
// instanceID. Reuses one tunnel across N file ops, amortising the
|
||||
// ssh-keygen + SendSSHPublicKey + open-tunnel + waitForPort cost
|
||||
// (~3-5s) over multiple cats/finds (~50-200ms each).
|
||||
//
|
||||
// Origin: core#11 — canvas detail-panel config + filesystem load
|
||||
// took ~20s. ConfigTab fans out 4 GETs serially; the slowest is
|
||||
// /files/config.yaml which dispatches to readFileViaEIC. Without a
|
||||
// pool, every readFileViaEIC + listFilesViaEIC + writeFileViaEIC +
|
||||
// deleteFileViaEIC pays the full setup cost even when fired
|
||||
// back-to-back on the same workspace EC2.
|
||||
//
|
||||
// The pool keeps one eicSSHSession alive per instanceID for up to
|
||||
// poolTTL. SendSSHPublicKey grants a 60s key validity, so poolTTL
|
||||
// must stay strictly below that to avoid serving requests on a
|
||||
// just-expired key. We default to 50s with a 10s safety margin.
|
||||
//
|
||||
// Concurrency model:
|
||||
//
|
||||
// - Single mutex guards the entries map.
|
||||
// - Slow path (tunnel setup) runs OUTSIDE the lock, gated by an
|
||||
// "intent" placeholder so concurrent acquires for the same
|
||||
// instanceID don't both build a tunnel — the loser drops its
|
||||
// setup and uses the winner's.
|
||||
// - Refcount on each entry; eviction blocked while refcount > 0.
|
||||
// - Janitor goroutine sweeps every poolJanitorInterval, drops
|
||||
// entries where refcount == 0 && expiresAt < now.
|
||||
//
|
||||
// Test injection:
|
||||
//
|
||||
// - poolSetupTunnel is a package-level var so tests can swap the
|
||||
// slow path for a counting stub. Production wires it to
|
||||
// realWithEICTunnel-style setup.
|
||||
// - withEICTunnel (the public, single-shot API) is also a var
|
||||
// (already, see template_files_eic.go). It's rebound here to
|
||||
// pooledWithEICTunnel which routes through globalEICTunnelPool.
|
||||
// - Tests that need single-shot behaviour can set poolTTL = 0,
|
||||
// which makes pooledWithEICTunnel fall through to the underlying
|
||||
// setup directly (no pool entry kept).
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// poolTTL is the maximum age of a pooled tunnel. Must be strictly
|
||||
// less than the SendSSHPublicKey grant window (60s) so we never
|
||||
// serve a request through a key that's about to expire mid-op.
|
||||
//
|
||||
// Configurable via init-time wiring (see initEICTunnelPool); not a
|
||||
// const so tests can pin TTL=0 (disable pooling) or TTL=50ms (drive
|
||||
// eviction tests).
|
||||
var poolTTL = 50 * time.Second
|
||||
|
||||
// poolJanitorInterval is how often the janitor goroutine sweeps for
|
||||
// expired idle entries. Tighter than poolTTL so eviction is timely;
|
||||
// loose enough that the goroutine doesn't burn CPU.
|
||||
var poolJanitorInterval = 10 * time.Second
|
||||
|
||||
// poolMaxEntries caps simultaneous instanceIDs the pool tracks.
|
||||
// Beyond this, new acquires evict the LRU entry. Defends against a
|
||||
// pathological caller (e.g. a sweep over hundreds of workspace
|
||||
// EC2s) from leaking unbounded tunnel processes. 32 is a generous
|
||||
// ceiling for the canvas use case (one human navigates ≤ ~5
|
||||
// workspaces at a time).
|
||||
var poolMaxEntries = 32
|
||||
|
||||
// poolSetupTunnel is the slow-path tunnel constructor. Wrapped in a
|
||||
// var so tests can inject a counter stub. Returns a session and a
|
||||
// cleanup function (closes the open-tunnel subprocess + scrubs the
|
||||
// ephemeral keydir). nil session + non-nil err means setup failed
|
||||
// and there is nothing to clean up.
|
||||
//
|
||||
// Production wiring lives in eic_tunnel_pool_setup.go (a thin shim
|
||||
// over the existing realWithEICTunnel logic).
|
||||
var poolSetupTunnel = func(ctx context.Context, instanceID string) (
|
||||
sess eicSSHSession, cleanup func(), err error) {
|
||||
return setupRealEICTunnel(ctx, instanceID)
|
||||
}
|
||||
|
||||
// pooledTunnel is one entry in the pool. session is shared by N
|
||||
// concurrent fn calls; cleanup runs once when refcount returns to
|
||||
// zero AND the entry is past expiresAt or evicted.
|
||||
//
|
||||
// lastUsed tracks the most recent acquire time for LRU bookkeeping
|
||||
// (overflow eviction). expiresAt is set at construction and not
|
||||
// extended on use — a tunnel cannot live past poolTTL even if it's
|
||||
// hot, because the underlying SendSSHPublicKey grant expires.
|
||||
type pooledTunnel struct {
|
||||
session eicSSHSession
|
||||
cleanup func()
|
||||
expiresAt time.Time
|
||||
lastUsed time.Time
|
||||
refcount int
|
||||
poisoned bool // true if a fn returned a tunnel-fatal error; do not reuse
|
||||
}
|
||||
|
||||
// eicTunnelPool is the package-level pool. Single instance lives
|
||||
// in globalEICTunnelPool; constructor runs lazily on first acquire.
|
||||
type eicTunnelPool struct {
|
||||
mu sync.Mutex
|
||||
entries map[string]*pooledTunnel
|
||||
// pendingSetups guards concurrent setup for the same instanceID.
|
||||
// First acquirer takes the slot; later ones wait on the channel.
|
||||
pendingSetups map[string]chan struct{}
|
||||
stopJanitor chan struct{}
|
||||
// janitorInterval is captured at pool construction from the
|
||||
// package-level poolJanitorInterval var. Captured (not re-read on
|
||||
// every tick) so a test that swaps the package var via t.Cleanup
|
||||
// after a global pool's janitor is already running can't race
|
||||
// with that goroutine's ticker read. The global pool is created
|
||||
// lazily once per process via sync.Once; before this capture
|
||||
// landed, every test that touched poolJanitorInterval after the
|
||||
// global pool's first-touch raced the janitor (caught by -race
|
||||
// on staging tip 249dbc6a — TestPooledWithEICTunnel_PanicPoisonsEntry).
|
||||
// Tests still get the new value on a freshPool() because they
|
||||
// set the package var BEFORE calling newEICTunnelPool().
|
||||
janitorInterval time.Duration
|
||||
}
|
||||
|
||||
var (
|
||||
globalEICTunnelPool *eicTunnelPool
|
||||
globalEICTunnelPoolOnce sync.Once
|
||||
)
|
||||
|
||||
// getEICTunnelPool returns the singleton pool, lazy-initialising on
|
||||
// first call. Idempotent.
|
||||
func getEICTunnelPool() *eicTunnelPool {
|
||||
globalEICTunnelPoolOnce.Do(func() {
|
||||
globalEICTunnelPool = newEICTunnelPool()
|
||||
go globalEICTunnelPool.janitor()
|
||||
})
|
||||
return globalEICTunnelPool
|
||||
}
|
||||
|
||||
// newEICTunnelPool constructs an empty pool. Exported so tests can
|
||||
// build isolated pools without sharing the singleton.
|
||||
//
|
||||
// Captures poolJanitorInterval at construction time so the janitor
|
||||
// goroutine doesn't race with t.Cleanup-driven swaps of the package
|
||||
// var. See the janitorInterval field comment for the failure mode.
|
||||
func newEICTunnelPool() *eicTunnelPool {
|
||||
return &eicTunnelPool{
|
||||
entries: map[string]*pooledTunnel{},
|
||||
pendingSetups: map[string]chan struct{}{},
|
||||
stopJanitor: make(chan struct{}),
|
||||
janitorInterval: poolJanitorInterval,
|
||||
}
|
||||
}
|
||||
|
||||
// acquire returns a usable session for instanceID. If a healthy entry
|
||||
// exists, refcount++ and return it. If a setup is in flight for the
|
||||
// same instanceID, wait for it. Otherwise build one (slow path).
|
||||
//
|
||||
// done() must be called by the caller when the op finishes. It
|
||||
// decrements refcount and triggers cleanup if the entry is past
|
||||
// TTL or poisoned and refcount==0.
|
||||
//
|
||||
// Errors from the slow path propagate; pool state is not modified
|
||||
// for failed setups (no poisoned entry created — that's only for
|
||||
// fn-returned errors on a previously-good session).
|
||||
func (p *eicTunnelPool) acquire(ctx context.Context, instanceID string) (
|
||||
sess eicSSHSession, done func(poisoned bool), err error) {
|
||||
|
||||
if poolTTL <= 0 {
|
||||
// Pool disabled (TTL=0 mode for tests / opt-out). Fall
|
||||
// through to a direct setup with caller-driven cleanup.
|
||||
s, cleanup, err := poolSetupTunnel(ctx, instanceID)
|
||||
if err != nil {
|
||||
return eicSSHSession{}, nil, err
|
||||
}
|
||||
return s, func(_ bool) { cleanup() }, nil
|
||||
}
|
||||
|
||||
for {
|
||||
p.mu.Lock()
|
||||
if pt, ok := p.entries[instanceID]; ok && !pt.poisoned && pt.expiresAt.After(time.Now()) {
|
||||
pt.refcount++
|
||||
pt.lastUsed = time.Now()
|
||||
p.mu.Unlock()
|
||||
return pt.session, p.releaser(instanceID, pt), nil
|
||||
}
|
||||
// Either no entry, expired entry, or poisoned entry. If a
|
||||
// setup is already in flight, wait and retry.
|
||||
if pending, ok := p.pendingSetups[instanceID]; ok {
|
||||
p.mu.Unlock()
|
||||
select {
|
||||
case <-pending:
|
||||
continue // re-check the entries map
|
||||
case <-ctx.Done():
|
||||
return eicSSHSession{}, nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
// Drop expired/poisoned entry now (we'll cleanup outside
|
||||
// the lock — the entry is unreferenced or we'd not be here).
|
||||
var oldCleanup func()
|
||||
if pt, ok := p.entries[instanceID]; ok {
|
||||
if pt.refcount == 0 {
|
||||
oldCleanup = pt.cleanup
|
||||
delete(p.entries, instanceID)
|
||||
}
|
||||
}
|
||||
// Reserve the setup slot.
|
||||
signal := make(chan struct{})
|
||||
p.pendingSetups[instanceID] = signal
|
||||
p.mu.Unlock()
|
||||
|
||||
if oldCleanup != nil {
|
||||
go oldCleanup()
|
||||
}
|
||||
|
||||
// Slow path: build a new tunnel. Anything that goes wrong
|
||||
// here cleans up the pendingSetups slot and propagates to
|
||||
// the caller without leaving the pool in a state where the
|
||||
// next acquire blocks waiting on a signal that never fires.
|
||||
newSess, cleanup, setupErr := poolSetupTunnel(ctx, instanceID)
|
||||
|
||||
p.mu.Lock()
|
||||
delete(p.pendingSetups, instanceID)
|
||||
close(signal)
|
||||
|
||||
if setupErr != nil {
|
||||
p.mu.Unlock()
|
||||
return eicSSHSession{}, nil, fmt.Errorf("eic tunnel setup: %w", setupErr)
|
||||
}
|
||||
|
||||
// Enforce LRU bound BEFORE inserting so we don't briefly
|
||||
// exceed the cap even by one entry.
|
||||
p.evictLRUIfFullLocked(instanceID)
|
||||
|
||||
pt := &pooledTunnel{
|
||||
session: newSess,
|
||||
cleanup: cleanup,
|
||||
expiresAt: time.Now().Add(poolTTL),
|
||||
lastUsed: time.Now(),
|
||||
refcount: 1,
|
||||
}
|
||||
p.entries[instanceID] = pt
|
||||
p.mu.Unlock()
|
||||
return pt.session, p.releaser(instanceID, pt), nil
|
||||
}
|
||||
}
|
||||
|
||||
// releaser returns a closure that decrements refcount and triggers
|
||||
// cleanup if (a) the entry is past TTL or (b) the caller signalled
|
||||
// poison. Idempotent against double-release (decrements once via the
|
||||
// captured pt; pool entry may have been replaced by then).
|
||||
func (p *eicTunnelPool) releaser(instanceID string, pt *pooledTunnel) func(poisoned bool) {
|
||||
released := false
|
||||
return func(poisoned bool) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if released {
|
||||
return
|
||||
}
|
||||
released = true
|
||||
pt.refcount--
|
||||
if poisoned {
|
||||
pt.poisoned = true
|
||||
}
|
||||
// Evict immediately if poisoned-and-idle OR expired-and-idle.
|
||||
// Hot entries (refcount > 0) defer eviction to the last release.
|
||||
if pt.refcount == 0 && (pt.poisoned || pt.expiresAt.Before(time.Now())) {
|
||||
// If the entry in the map is still us, remove it.
|
||||
if cur, ok := p.entries[instanceID]; ok && cur == pt {
|
||||
delete(p.entries, instanceID)
|
||||
}
|
||||
go pt.cleanup()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// evictLRUIfFullLocked drops the least-recently-used IDLE entry
|
||||
// when the pool is at capacity. Caller must hold p.mu. The new
|
||||
// instanceID about to be inserted is excluded so we don't evict
|
||||
// ourselves. If no idle entries exist, no eviction happens — the
|
||||
// new entry will push us above the soft cap until something releases.
|
||||
func (p *eicTunnelPool) evictLRUIfFullLocked(skipInstance string) {
|
||||
if len(p.entries) < poolMaxEntries {
|
||||
return
|
||||
}
|
||||
var oldestKey string
|
||||
var oldest *pooledTunnel
|
||||
for k, pt := range p.entries {
|
||||
if k == skipInstance {
|
||||
continue
|
||||
}
|
||||
if pt.refcount > 0 {
|
||||
continue
|
||||
}
|
||||
if oldest == nil || pt.lastUsed.Before(oldest.lastUsed) {
|
||||
oldestKey = k
|
||||
oldest = pt
|
||||
}
|
||||
}
|
||||
if oldest == nil {
|
||||
return // every entry is in use; no eviction possible
|
||||
}
|
||||
delete(p.entries, oldestKey)
|
||||
go oldest.cleanup()
|
||||
}
|
||||
|
||||
// janitor periodically scans for entries that are idle AND expired,
|
||||
// closing their tunnels. Runs forever (per pool lifetime); cancelled
|
||||
// by close(p.stopJanitor) for tests that build short-lived pools.
|
||||
//
|
||||
// Reads p.janitorInterval (captured at construction) instead of the
|
||||
// package-level poolJanitorInterval — see janitorInterval field comment.
|
||||
func (p *eicTunnelPool) janitor() {
|
||||
t := time.NewTicker(p.janitorInterval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
p.sweep()
|
||||
case <-p.stopJanitor:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sweep is one janitor pass. Drops idle expired entries.
|
||||
func (p *eicTunnelPool) sweep() {
|
||||
p.mu.Lock()
|
||||
now := time.Now()
|
||||
var toClose []func()
|
||||
for k, pt := range p.entries {
|
||||
if pt.refcount == 0 && pt.expiresAt.Before(now) {
|
||||
toClose = append(toClose, pt.cleanup)
|
||||
delete(p.entries, k)
|
||||
}
|
||||
}
|
||||
p.mu.Unlock()
|
||||
for _, c := range toClose {
|
||||
go c()
|
||||
}
|
||||
}
|
||||
|
||||
// stop terminates the janitor and closes all idle entries. Hot
|
||||
// (refcount > 0) entries are NOT force-closed — callers running
|
||||
// against them would see a use-after-free. In practice stop is only
|
||||
// called by tests that have already drained their callers.
|
||||
func (p *eicTunnelPool) stop() {
|
||||
close(p.stopJanitor)
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
for k, pt := range p.entries {
|
||||
if pt.refcount == 0 {
|
||||
go pt.cleanup()
|
||||
delete(p.entries, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pooledWithEICTunnel is the pool-backed replacement for
|
||||
// realWithEICTunnel. The signature matches `var withEICTunnel`
|
||||
// exactly so the rebind (in initEICTunnelPool) is a drop-in.
|
||||
//
|
||||
// Errors from `fn` itself are forwarded to the caller AND mark the
|
||||
// pool entry as poisoned, so the next acquire builds a fresh
|
||||
// tunnel. This catches the case where the workspace EC2 was
|
||||
// restarted out-of-band (tunnel still appears alive locally but
|
||||
// every cat/find errors out).
|
||||
func pooledWithEICTunnel(ctx context.Context, instanceID string,
|
||||
fn func(s eicSSHSession) error) error {
|
||||
pool := getEICTunnelPool()
|
||||
sess, done, err := pool.acquire(ctx, instanceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// poisoned defaults to true so a panic from fn poisons the
|
||||
// entry on the way through the deferred release. Without the
|
||||
// defer, a panicking fn would leak refcount=1 forever and
|
||||
// permanently block eviction of this entry. The fn-error path
|
||||
// resets poisoned to its real classification before return.
|
||||
poisoned := true
|
||||
defer func() { done(poisoned) }()
|
||||
fnErr := fn(sess)
|
||||
poisoned = fnErrIndicatesTunnelFault(fnErr)
|
||||
return fnErr
|
||||
}
|
||||
|
||||
// fnErrIndicatesTunnelFault returns true for fn errors whose nature
|
||||
// suggests the underlying tunnel is no longer reusable (auth gone,
|
||||
// network gone, ssh process dead). Returning true poisons the pool
|
||||
// entry so the next acquire builds fresh.
|
||||
//
|
||||
// Conservative: only marks tunnel-faulty for clearly tunnel-level
|
||||
// failures (connection refused, broken pipe, ssh exit-status from
|
||||
// fatal-channel signals). A `cat` returning os.ErrNotExist on a
|
||||
// missing file is NOT a tunnel fault — that's the file path being
|
||||
// wrong, the tunnel is fine.
|
||||
func fnErrIndicatesTunnelFault(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
msg := err.Error()
|
||||
// stderr substrings produced by ssh when the tunnel is broken.
|
||||
for _, marker := range []string{
|
||||
"connection refused",
|
||||
"connection closed",
|
||||
"broken pipe",
|
||||
"Connection reset by peer",
|
||||
"kex_exchange_identification",
|
||||
"port forwarding failed",
|
||||
"Permission denied",
|
||||
"Authentication failed",
|
||||
} {
|
||||
if containsCaseInsensitive(msg, marker) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// containsCaseInsensitive avoids importing strings just for this
|
||||
// (the file already needs ssh stderr matching elsewhere — this
|
||||
// keeps the helper local to avoid a cross-file dependency).
|
||||
func containsCaseInsensitive(s, substr string) bool {
|
||||
if len(substr) > len(s) {
|
||||
return false
|
||||
}
|
||||
// Manual lowercase compare loop; ssh error markers are ASCII so
|
||||
// no need for unicode-aware folding.
|
||||
low := func(b byte) byte {
|
||||
if b >= 'A' && b <= 'Z' {
|
||||
return b + 32
|
||||
}
|
||||
return b
|
||||
}
|
||||
for i := 0; i+len(substr) <= len(s); i++ {
|
||||
match := true
|
||||
for j := 0; j < len(substr); j++ {
|
||||
if low(s[i+j]) != low(substr[j]) {
|
||||
match = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if match {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// initEICTunnelPool rebinds the package-level withEICTunnel var to
|
||||
// the pooled implementation. Called once at package init via the
|
||||
// init() in eic_tunnel_pool_setup.go (split file so the rebind
|
||||
// itself is testable without dragging in the production setup
|
||||
// shim's exec/aws dependencies).
|
||||
func initEICTunnelPool() {
|
||||
withEICTunnel = pooledWithEICTunnel
|
||||
}
|
||||
@@ -1,467 +0,0 @@
|
||||
package handlers
|
||||
|
||||
// eic_tunnel_pool_test.go — tests for the refcounted EIC tunnel pool
|
||||
// added in core#11. Stubs poolSetupTunnel with a counter so the
|
||||
// tests don't fork ssh-keygen / aws subprocesses.
|
||||
//
|
||||
// Per memory feedback_assert_exact_not_substring: each test pins
|
||||
// exact expected counts (not "at least N") so a regression that
|
||||
// silently double-sets-up surfaces here.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// withPoolSetupStub swaps poolSetupTunnel for a counting fake that
|
||||
// returns a sentinel session and a cleanup func that records its
|
||||
// invocation. Restores on test cleanup.
|
||||
//
|
||||
// setupSignal blocks each setup until released — for concurrent-
|
||||
// acquire tests where we want to gate setup completion.
|
||||
func withPoolSetupStub(t *testing.T) (
|
||||
setupCount *int64, cleanupCount *int64, restore func(), unblock func()) {
|
||||
t.Helper()
|
||||
prev := poolSetupTunnel
|
||||
prevTTL := poolTTL
|
||||
prevJanitor := poolJanitorInterval
|
||||
|
||||
var sc, cc int64
|
||||
setupCount, cleanupCount = &sc, &cc
|
||||
|
||||
gate := make(chan struct{}, 1)
|
||||
gate <- struct{}{} // allow the first setup through immediately
|
||||
unblock = func() { gate <- struct{}{} }
|
||||
|
||||
poolSetupTunnel = func(ctx context.Context, instanceID string) (
|
||||
eicSSHSession, func(), error) {
|
||||
select {
|
||||
case <-gate:
|
||||
case <-ctx.Done():
|
||||
return eicSSHSession{}, nil, ctx.Err()
|
||||
}
|
||||
atomic.AddInt64(&sc, 1)
|
||||
sess := eicSSHSession{
|
||||
instanceID: instanceID,
|
||||
osUser: "ubuntu",
|
||||
localPort: 10000 + int(atomic.LoadInt64(&sc)),
|
||||
keyPath: "/tmp/molecule-eic-test-" + instanceID,
|
||||
}
|
||||
cleanup := func() { atomic.AddInt64(&cc, 1) }
|
||||
return sess, cleanup, nil
|
||||
}
|
||||
|
||||
restore = func() {
|
||||
poolSetupTunnel = prev
|
||||
poolTTL = prevTTL
|
||||
poolJanitorInterval = prevJanitor
|
||||
}
|
||||
t.Cleanup(restore)
|
||||
return
|
||||
}
|
||||
|
||||
// freshPool returns an isolated pool (NOT the global) so tests run
|
||||
// independently. Stops the janitor on cleanup.
|
||||
func freshPool(t *testing.T) *eicTunnelPool {
|
||||
t.Helper()
|
||||
p := newEICTunnelPool()
|
||||
t.Cleanup(p.stop)
|
||||
return p
|
||||
}
|
||||
|
||||
// TestEICTunnelPool_FourOpsAmortise pins the core invariant: four
|
||||
// sequential acquire/release cycles on the same instanceID share
|
||||
// ONE underlying tunnel setup. Mutation: delete the cache hit branch
|
||||
// in acquire() → setupCount goes 1 → 4 → test fails.
|
||||
func TestEICTunnelPool_FourOpsAmortise(t *testing.T) {
|
||||
setupCount, cleanupCount, _, _ := withPoolSetupStub(t)
|
||||
// Refill gate after each setup so concurrent stubs aren't blocked
|
||||
// (we want every test to be able to set up if it needs to).
|
||||
t.Cleanup(func() { /* no-op; defer is enough */ })
|
||||
poolTTL = 50 * time.Second
|
||||
pool := freshPool(t)
|
||||
ctx := context.Background()
|
||||
|
||||
for i := 0; i < 4; i++ {
|
||||
sess, done, err := pool.acquire(ctx, "i-test-1")
|
||||
if err != nil {
|
||||
t.Fatalf("op %d: acquire: %v", i, err)
|
||||
}
|
||||
if sess.instanceID != "i-test-1" {
|
||||
t.Fatalf("op %d: session has wrong instanceID: %q", i, sess.instanceID)
|
||||
}
|
||||
done(false)
|
||||
}
|
||||
|
||||
if got := atomic.LoadInt64(setupCount); got != 1 {
|
||||
t.Errorf("expected exactly 1 tunnel setup across 4 ops, got %d", got)
|
||||
}
|
||||
if got := atomic.LoadInt64(cleanupCount); got != 0 {
|
||||
t.Errorf("expected 0 cleanups while entry is hot (TTL=50s), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEICTunnelPool_DifferentInstancesDoNotShare pins that two
|
||||
// different instanceIDs each get their own tunnel — the pool is
|
||||
// keyed on instanceID, not a single global slot.
|
||||
func TestEICTunnelPool_DifferentInstancesDoNotShare(t *testing.T) {
|
||||
setupCount, _, _, unblock := withPoolSetupStub(t)
|
||||
poolTTL = 50 * time.Second
|
||||
pool := freshPool(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// First instance setup uses the initial gate slot.
|
||||
_, doneA, err := pool.acquire(ctx, "i-a")
|
||||
if err != nil {
|
||||
t.Fatalf("acquire A: %v", err)
|
||||
}
|
||||
doneA(false)
|
||||
|
||||
// Second instance needs a new slot through the gate.
|
||||
unblock()
|
||||
_, doneB, err := pool.acquire(ctx, "i-b")
|
||||
if err != nil {
|
||||
t.Fatalf("acquire B: %v", err)
|
||||
}
|
||||
doneB(false)
|
||||
|
||||
if got := atomic.LoadInt64(setupCount); got != 2 {
|
||||
t.Errorf("expected 2 setups (one per instance), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEICTunnelPool_TTLEviction: a short TTL forces the second op
|
||||
// to build a fresh tunnel after the first expires.
|
||||
func TestEICTunnelPool_TTLEviction(t *testing.T) {
|
||||
setupCount, cleanupCount, _, unblock := withPoolSetupStub(t)
|
||||
poolTTL = 50 * time.Millisecond
|
||||
poolJanitorInterval = 1 * time.Second // keep janitor away
|
||||
pool := freshPool(t)
|
||||
ctx := context.Background()
|
||||
|
||||
_, done, err := pool.acquire(ctx, "i-ttl")
|
||||
if err != nil {
|
||||
t.Fatalf("acquire 1: %v", err)
|
||||
}
|
||||
done(false)
|
||||
|
||||
time.Sleep(80 * time.Millisecond) // past TTL
|
||||
|
||||
unblock() // allow next setup
|
||||
_, done, err = pool.acquire(ctx, "i-ttl")
|
||||
if err != nil {
|
||||
t.Fatalf("acquire 2: %v", err)
|
||||
}
|
||||
done(false)
|
||||
|
||||
if got := atomic.LoadInt64(setupCount); got != 2 {
|
||||
t.Errorf("expected 2 setups (TTL eviction between), got %d", got)
|
||||
}
|
||||
// First entry should have been cleaned up when the second
|
||||
// acquire evicted it on the slow path. Cleanup runs in a
|
||||
// goroutine; poll briefly for it to land.
|
||||
deadline := time.Now().Add(500 * time.Millisecond)
|
||||
for atomic.LoadInt64(cleanupCount) < 1 && time.Now().Before(deadline) {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
if got := atomic.LoadInt64(cleanupCount); got < 1 {
|
||||
t.Errorf("expected ≥1 cleanup (first entry evicted), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEICTunnelPool_FailureInvalidates pins the poison-on-fault
|
||||
// behavior — fn returning a tunnel-fatal error marks the entry
|
||||
// unusable so the next acquire builds fresh.
|
||||
func TestEICTunnelPool_FailureInvalidates(t *testing.T) {
|
||||
setupCount, _, _, unblock := withPoolSetupStub(t)
|
||||
poolTTL = 50 * time.Second
|
||||
pool := freshPool(t)
|
||||
ctx := context.Background()
|
||||
|
||||
_, done, err := pool.acquire(ctx, "i-fault")
|
||||
if err != nil {
|
||||
t.Fatalf("acquire 1: %v", err)
|
||||
}
|
||||
done(true) // signal poison
|
||||
|
||||
unblock() // let the next setup through
|
||||
_, done, err = pool.acquire(ctx, "i-fault")
|
||||
if err != nil {
|
||||
t.Fatalf("acquire 2: %v", err)
|
||||
}
|
||||
done(false)
|
||||
|
||||
if got := atomic.LoadInt64(setupCount); got != 2 {
|
||||
t.Errorf("expected 2 setups (poison forced rebuild), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEICTunnelPool_ConcurrentAcquireSingleSetup pins that N
|
||||
// concurrent acquires for the same instanceID before any release
|
||||
// only trigger ONE tunnel setup — the rest wait via pendingSetups.
|
||||
//
|
||||
// Without this guard each concurrent acquire would spawn its own
|
||||
// tunnel and the loser-cleanup would still leak refcount. Mutation:
|
||||
// delete the pendingSetups gate → setupCount goes 1 → N → fails.
|
||||
func TestEICTunnelPool_ConcurrentAcquireSingleSetup(t *testing.T) {
|
||||
setupCount, _, _, _ := withPoolSetupStub(t)
|
||||
// Pause setup so all goroutines pile into the pending slot.
|
||||
prev := poolSetupTunnel
|
||||
gate := make(chan struct{})
|
||||
poolSetupTunnel = func(ctx context.Context, instanceID string) (
|
||||
eicSSHSession, func(), error) {
|
||||
<-gate
|
||||
atomic.AddInt64(setupCount, 1)
|
||||
return eicSSHSession{instanceID: instanceID}, func() {}, nil
|
||||
}
|
||||
t.Cleanup(func() { poolSetupTunnel = prev })
|
||||
|
||||
poolTTL = 50 * time.Second
|
||||
pool := freshPool(t)
|
||||
ctx := context.Background()
|
||||
|
||||
const N = 8
|
||||
type result struct {
|
||||
done func(bool)
|
||||
err error
|
||||
}
|
||||
results := make(chan result, N)
|
||||
var startWg sync.WaitGroup
|
||||
startWg.Add(N)
|
||||
for i := 0; i < N; i++ {
|
||||
go func() {
|
||||
startWg.Done()
|
||||
_, done, err := pool.acquire(ctx, "i-concurrent")
|
||||
results <- result{done, err}
|
||||
}()
|
||||
}
|
||||
startWg.Wait()
|
||||
// give all N goroutines time to enter pool.acquire
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
close(gate)
|
||||
|
||||
for i := 0; i < N; i++ {
|
||||
r := <-results
|
||||
if r.err != nil {
|
||||
t.Fatalf("acquire %d: %v", i, r.err)
|
||||
}
|
||||
r.done(false)
|
||||
}
|
||||
|
||||
if got := atomic.LoadInt64(setupCount); got != 1 {
|
||||
t.Errorf("expected 1 setup across %d concurrent acquires, got %d", N, got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEICTunnelPool_TTLZeroDisablesPooling pins the escape hatch:
|
||||
// poolTTL=0 means every acquire goes straight through to setup +
|
||||
// cleanup, no entry kept. Useful for tests / opt-out.
|
||||
func TestEICTunnelPool_TTLZeroDisablesPooling(t *testing.T) {
|
||||
setupCount, cleanupCount, _, unblock := withPoolSetupStub(t)
|
||||
poolTTL = 0
|
||||
pool := freshPool(t)
|
||||
ctx := context.Background()
|
||||
|
||||
_, done, err := pool.acquire(ctx, "i-ttlzero")
|
||||
if err != nil {
|
||||
t.Fatalf("acquire 1: %v", err)
|
||||
}
|
||||
done(false)
|
||||
|
||||
unblock()
|
||||
_, done, err = pool.acquire(ctx, "i-ttlzero")
|
||||
if err != nil {
|
||||
t.Fatalf("acquire 2: %v", err)
|
||||
}
|
||||
done(false)
|
||||
|
||||
if got := atomic.LoadInt64(setupCount); got != 2 {
|
||||
t.Errorf("expected 2 setups with TTL=0 (pool disabled), got %d", got)
|
||||
}
|
||||
if got := atomic.LoadInt64(cleanupCount); got != 2 {
|
||||
t.Errorf("expected 2 cleanups with TTL=0 (each release closes), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEICTunnelPool_LRUEvictionAtCap pins the LRU defence: when the
|
||||
// pool reaches poolMaxEntries, a new acquire for an unseen
|
||||
// instanceID evicts the LRU idle entry instead of growing unbounded.
|
||||
func TestEICTunnelPool_LRUEvictionAtCap(t *testing.T) {
|
||||
setupCount, cleanupCount, _, _ := withPoolSetupStub(t)
|
||||
prev := poolMaxEntries
|
||||
poolMaxEntries = 2
|
||||
t.Cleanup(func() { poolMaxEntries = prev })
|
||||
poolTTL = 50 * time.Second
|
||||
|
||||
// Replace stub with one that doesn't gate so we can fill quickly.
|
||||
poolSetupTunnel = func(ctx context.Context, instanceID string) (
|
||||
eicSSHSession, func(), error) {
|
||||
atomic.AddInt64(setupCount, 1)
|
||||
return eicSSHSession{instanceID: instanceID}, func() {
|
||||
atomic.AddInt64(cleanupCount, 1)
|
||||
}, nil
|
||||
}
|
||||
|
||||
pool := freshPool(t)
|
||||
ctx := context.Background()
|
||||
|
||||
for _, id := range []string{"i-1", "i-2"} {
|
||||
_, done, err := pool.acquire(ctx, id)
|
||||
if err != nil {
|
||||
t.Fatalf("acquire %s: %v", id, err)
|
||||
}
|
||||
done(false)
|
||||
}
|
||||
// Both entries idle, pool at cap.
|
||||
_, done, err := pool.acquire(ctx, "i-3")
|
||||
if err != nil {
|
||||
t.Fatalf("acquire i-3: %v", err)
|
||||
}
|
||||
done(false)
|
||||
|
||||
// Wait for the goroutine'd cleanup of the evicted entry.
|
||||
deadline := time.Now().Add(500 * time.Millisecond)
|
||||
for atomic.LoadInt64(cleanupCount) < 1 && time.Now().Before(deadline) {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
if got := atomic.LoadInt64(setupCount); got != 3 {
|
||||
t.Errorf("expected 3 setups (one per unique instance), got %d", got)
|
||||
}
|
||||
if got := atomic.LoadInt64(cleanupCount); got < 1 {
|
||||
t.Errorf("expected ≥1 cleanup (LRU eviction), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEICTunnelPool_PoisonedClassification pins the heuristic that
|
||||
// distinguishes tunnel-fatal errors (poison the entry) from
|
||||
// app-level errors (file not found, validation) that should NOT
|
||||
// invalidate the tunnel.
|
||||
func TestEICTunnelPool_PoisonedClassification(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
err error
|
||||
want bool
|
||||
}{
|
||||
{"nil", nil, false},
|
||||
{"file not found", errors.New("os: file does not exist"), false},
|
||||
{"validation", errors.New("invalid path: must be relative"), false},
|
||||
{"connection refused", errors.New("ssh: connect to host: connection refused"), true},
|
||||
{"connection refused upper", errors.New("Connection Refused"), true},
|
||||
{"broken pipe", errors.New("write tunnel: broken pipe"), true},
|
||||
{"permission denied", errors.New("Permission denied (publickey)"), true},
|
||||
{"auth failed", errors.New("Authentication failed"), true},
|
||||
{"connection reset", errors.New("Connection reset by peer"), true},
|
||||
{"port forward", errors.New("port forwarding failed"), true},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
got := fnErrIndicatesTunnelFault(tc.err)
|
||||
if got != tc.want {
|
||||
t.Errorf("fnErrIndicatesTunnelFault(%v) = %v, want %v",
|
||||
tc.err, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestEICTunnelPool_RefcountBlocksEviction pins that an entry past
|
||||
// TTL is NOT evicted while a caller still holds it — preventing
|
||||
// use-after-free in the holder.
|
||||
func TestEICTunnelPool_RefcountBlocksEviction(t *testing.T) {
|
||||
setupCount, cleanupCount, _, _ := withPoolSetupStub(t)
|
||||
poolTTL = 30 * time.Millisecond
|
||||
poolJanitorInterval = 5 * time.Millisecond
|
||||
pool := freshPool(t)
|
||||
ctx := context.Background()
|
||||
|
||||
_, done, err := pool.acquire(ctx, "i-hold")
|
||||
if err != nil {
|
||||
t.Fatalf("acquire: %v", err)
|
||||
}
|
||||
|
||||
// Sleep past TTL while holding the session. Janitor sweeps
|
||||
// every 5ms but must skip our entry because refcount=1.
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
|
||||
if got := atomic.LoadInt64(cleanupCount); got != 0 {
|
||||
t.Errorf("expected 0 cleanups while holder is active, got %d", got)
|
||||
}
|
||||
|
||||
done(false)
|
||||
// Now refcount=0 and entry is past TTL; releaser triggers cleanup.
|
||||
deadline := time.Now().Add(200 * time.Millisecond)
|
||||
for atomic.LoadInt64(cleanupCount) < 1 && time.Now().Before(deadline) {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
}
|
||||
if got := atomic.LoadInt64(cleanupCount); got != 1 {
|
||||
t.Errorf("expected 1 cleanup after release of expired entry, got %d", got)
|
||||
}
|
||||
if got := atomic.LoadInt64(setupCount); got != 1 {
|
||||
t.Errorf("setupCount tracking: got %d, want 1", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPooledWithEICTunnel_PanicPoisonsEntry pins that a panic
|
||||
// from fn poisons the pool entry on the way out — refcount goes
|
||||
// back to zero (no leak) and the entry is marked unusable so the
|
||||
// next acquire builds fresh. Without the defer-release pattern, a
|
||||
// panic would leave refcount=1 forever and the entry would never
|
||||
// evict.
|
||||
func TestPooledWithEICTunnel_PanicPoisonsEntry(t *testing.T) {
|
||||
setupCount, _, _, _ := withPoolSetupStub(t)
|
||||
poolTTL = 50 * time.Second
|
||||
globalEICTunnelPool = newEICTunnelPool()
|
||||
t.Cleanup(globalEICTunnelPool.stop)
|
||||
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r == nil {
|
||||
t.Errorf("expected panic to bubble up, got nil")
|
||||
}
|
||||
}()
|
||||
_ = pooledWithEICTunnel(context.Background(), "i-panic",
|
||||
func(s eicSSHSession) error { panic("boom") })
|
||||
}()
|
||||
|
||||
// Replenish the gate so the next setup can run.
|
||||
prev := poolSetupTunnel
|
||||
poolSetupTunnel = func(ctx context.Context, instanceID string) (
|
||||
eicSSHSession, func(), error) {
|
||||
atomic.AddInt64(setupCount, 1)
|
||||
return eicSSHSession{instanceID: instanceID}, func() {}, nil
|
||||
}
|
||||
t.Cleanup(func() { poolSetupTunnel = prev })
|
||||
|
||||
// Next acquire must build fresh — entry was poisoned by panic.
|
||||
if err := pooledWithEICTunnel(context.Background(), "i-panic",
|
||||
func(s eicSSHSession) error { return nil }); err != nil {
|
||||
t.Fatalf("post-panic acquire: %v", err)
|
||||
}
|
||||
if got := atomic.LoadInt64(setupCount); got != 2 {
|
||||
t.Errorf("expected 2 setups (panic poisoned, rebuild), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPooledWithEICTunnel_PreservesFnErr pins that errors from the
|
||||
// inner fn pass through to the caller verbatim — pool wrapping
|
||||
// should not swallow or transform error semantics for app code.
|
||||
func TestPooledWithEICTunnel_PreservesFnErr(t *testing.T) {
|
||||
withPoolSetupStub(t)
|
||||
poolTTL = 50 * time.Second
|
||||
|
||||
// Reset the global pool so this test is isolated from any prior
|
||||
// test that may have populated it.
|
||||
globalEICTunnelPool = newEICTunnelPool()
|
||||
|
||||
want := errors.New("file does not exist")
|
||||
got := pooledWithEICTunnel(context.Background(), "i-fn-err",
|
||||
func(s eicSSHSession) error { return want })
|
||||
if !errors.Is(got, want) {
|
||||
t.Errorf("pooledWithEICTunnel returned %v, want %v", got, want)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user