Compare commits

..

1 Commits

Author SHA1 Message Date
Molecule AI Core Platform Lead 158dec4e71 feat(workspace): user-configurable EC2 sizing override, decoupled from tier
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 18s
Check migration collisions / Migration version collision check (pull_request) Successful in 44s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 50s
CI / Detect changes (pull_request) Successful in 52s
E2E API Smoke Test / detect-changes (pull_request) Successful in 30s
E2E Chat / detect-changes (pull_request) Successful in 31s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Has been skipped
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (pull_request) Successful in 18s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 31s
Harness Replays / detect-changes (pull_request) Successful in 31s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 33s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 24s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 21s
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Successful in 1m8s
qa-review / approved (pull_request) Failing after 23s
gate-check-v3 / gate-check (pull_request) Successful in 31s
security-review / approved (pull_request) Failing after 25s
sop-checklist / all-items-acked (pull_request) Successful in 21s
sop-tier-check / tier-check (pull_request) Successful in 23s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m32s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Successful in 6m0s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 2m34s
Harness Replays / Harness Replays (pull_request) Successful in 8s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 10s
CI / Python Lint & Test (pull_request) Successful in 8m40s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 6m31s
E2E Chat / E2E Chat (pull_request) Failing after 10m7s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 9m46s
CI / Canvas (Next.js) (pull_request) Successful in 19m3s
CI / Platform (Go) (pull_request) Successful in 19m49s
CI / all-required (pull_request) Successful in 19m52s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
2026-05-16 09:57:59 +00:00
37 changed files with 926 additions and 1041 deletions
+1 -89
View File
@@ -23,7 +23,6 @@ import dataclasses
import json
import os
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
@@ -327,43 +326,6 @@ def update_pull(pr_number: int, *, dry_run: bool) -> None:
)
def wait_for_ci(
head_sha: str,
contexts: list[str],
*,
max_wait_seconds: int = 300,
poll_interval: int = 15,
) -> bool:
"""Poll CI statuses for head_sha until all required contexts are terminal.
Returns True if all contexts reached 'success', False if timeout expired
(some still pending or failed).
Background: after a queue-triggered PR update, CI re-runs on the new head.
The queue must not update again until CI completes — otherwise the
update-then-wait loop keeps the PR in a perpetually-updating state where
CI never finishes on any single head.
"""
deadline = time.time() + max_wait_seconds
while time.time() < deadline:
time.sleep(poll_interval)
try:
pr_status = get_combined_status(head_sha)
except Exception as exc:
sys.stderr.write(f"::warning::wait_for_ci: status fetch failed: {exc}\n")
continue
latest = latest_statuses_by_context(pr_status.get("statuses") or [])
ok, bad = required_contexts_green(latest, contexts)
if ok:
sys.stderr.write(f"::notice::wait_for_ci: all contexts green after {int(time.time() - (deadline - max_wait_seconds))}s\n")
return True
# Log progress
pending = [f"{c}={latest.get(c, {}).get('status', 'missing')}" for c in contexts if latest.get(c, {}).get('status') != 'success']
sys.stderr.write(f"::notice::wait_for_ci: still waiting ({int(deadline - time.time())}s left): {', '.join(pending[:3])}\n")
sys.stderr.write(f"::warning::wait_for_ci: timeout after {max_wait_seconds}s; proceeding with merge check\n")
return False
def merge_pull(pr_number: int, *, dry_run: bool) -> None:
payload = {
"Do": "merge",
@@ -376,24 +338,7 @@ def merge_pull(pr_number: int, *, dry_run: bool) -> None:
print(f"::notice::merging PR #{pr_number}")
if dry_run:
return
# Gitea's merge endpoint returns HTTP 200 with an empty body on success.
# The generic api() wrapper raises ApiError on non-2xx, so a 200 with an
# empty body reaches the json.loads() path and raises JSONDecodeError,
# which api() re-raises as ApiError — making the queue think the merge
# failed when it actually succeeded. Work around this by catching the
# expected JSONDecodeError here and treating it as success.
try:
api("POST", f"/repos/{OWNER}/{NAME}/pulls/{pr_number}/merge", body=payload, expect_json=False)
except ApiError as exc:
# Surface non-merge errors (5xx server errors, 403 forbidden, etc.)
if "merge" in str(exc).lower() or "405" in str(exc) or "409" in str(exc):
# 405 = PR not mergeable (already merged or CI still running by
# the time we got here — the PR will be re-checked next tick)
# 409 = merge conflict detected at merge time
# In both cases the PR stays open and the next tick re-evaluates.
sys.stderr.write(f"::warning::merge call returned: {exc}\n")
else:
raise
api("POST", f"/repos/{OWNER}/{NAME}/pulls/{pr_number}/merge", body=payload, expect_json=False)
def process_once(*, dry_run: bool = False) -> int:
@@ -445,32 +390,6 @@ def process_once(*, dry_run: bool = False) -> int:
print(f"::notice::PR #{pr_number} decision={decision.action}: {decision.reason}")
if decision.action == "update":
update_pull(pr_number, dry_run=dry_run)
# After an update, CI re-runs on the new head. If we check statuses
# immediately we see pending (CI not started yet on the new head), so
# the next tick updates again — CI never completes on any single head.
# Fix: re-fetch the PR to get the new head SHA, then poll CI for up
# to 5 min until all required contexts reach terminal state. If CI
# finishes in time, proceed to merge on the same tick.
if not dry_run:
updated_pr = get_pull(pr_number)
new_head = updated_pr.get("head", {}).get("sha", "")
if new_head and new_head != head_sha:
sys.stderr.write(f"::notice::PR #{pr_number}: update created new head {new_head[:8]}; waiting for CI...\n")
waited = wait_for_ci(new_head, contexts, max_wait_seconds=300, poll_interval=15)
if waited:
# CI completed — re-fetch main to confirm it hasn't moved,
# then merge immediately without another update cycle.
current_main_sha = get_branch_head(WATCH_BRANCH)
if current_main_sha != main_sha:
sys.stderr.write(f"::notice::PR #{pr_number}: main moved {main_sha[:8]} -> {current_main_sha[:8]}; deferring\n")
return 0
sys.stderr.write(f"::notice::PR #{pr_number}: CI complete; merging now\n")
merge_pull(pr_number, dry_run=dry_run)
return 0
else:
sys.stderr.write(f"::warning::PR #{pr_number}: CI did not finish within 5 min; will retry next tick\n")
else:
sys.stderr.write(f"::notice::PR #{pr_number}: update did not change head SHA; will retry\n")
post_comment(
pr_number,
(
@@ -481,13 +400,6 @@ def process_once(*, dry_run: bool = False) -> int:
)
return 0
if decision.ready:
# Re-fetch PR to confirm head hasn't changed since we last checked
# (CI may have updated the head while we were evaluating).
current_pr = get_pull(pr_number)
current_head = current_pr.get("head", {}).get("sha", "")
if current_head != head_sha:
print(f"::notice::PR #{pr_number} head changed {head_sha[:8]} -> {current_head[:8]}; re-evaluating")
return 0
latest_main_sha = get_branch_head(WATCH_BRANCH)
if latest_main_sha != main_sha:
print(
+25 -168
View File
@@ -68,7 +68,7 @@ import sys
import urllib.error
import urllib.parse
import urllib.request
from typing import Any, Callable
from typing import Any
# ---------------------------------------------------------------------------
@@ -110,7 +110,7 @@ def normalize_slug(raw: str, numeric_aliases: dict[int, str] | None = None) -> s
# for /sop-revoke (RFC#351 open question 4 — reason is captured but not
# yet validated; future iteration may require a min-length).
_DIRECTIVE_RE = re.compile(
r"^[ \t]*/(sop-ack|sop-revoke|sop-n/a)[ \t]+([A-Za-z0-9_\- ]+?)(?:[ \t]+(.*))?[ \t]*$",
r"^[ \t]*/(sop-ack|sop-revoke)[ \t]+([A-Za-z0-9_\- ]+?)(?:[ \t]+(.*))?[ \t]*$",
re.MULTILINE,
)
@@ -118,21 +118,19 @@ _DIRECTIVE_RE = re.compile(
def parse_directives(
comment_body: str,
numeric_aliases: dict[int, str],
) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]]]:
"""Extract /sop-ack, /sop-revoke, and /sop-n/a directives from a comment body.
) -> tuple[list[tuple[str, str, str]], list]:
"""Extract /sop-ack and /sop-revoke directives from a comment body.
Returns (directives, na_directives) where each is a list of
(kind, canonical_slug, note) tuples:
kind is "sop-ack", "sop-revoke", or "sop-n/a"
canonical_slug is the normalized form (or "" if unparseable)
note is the trailing free-text (may be "")
The two lists are kept separate so call sites can unpack them
directly (e.g. directives, na_directives = parse_directives(...)).
Returns (directives, na_directives) where:
directives is a list of (kind, canonical_slug, note) tuples
kind is "sop-ack" or "sop-revoke"
canonical_slug is the normalized form (or "" if unparseable)
note is the trailing free-text (may be "")
na_directives is reserved for future N/A handling (always [] for now)
"""
directives: list[tuple[str, str, str]] = []
na_directives: list[tuple[str, str, str]] = []
out: list[tuple[str, str, str]] = []
if not comment_body:
return directives, na_directives
return out, []
for m in _DIRECTIVE_RE.finditer(comment_body):
kind = m.group(1)
raw_slug = (m.group(2) or "").strip()
@@ -162,12 +160,8 @@ def parse_directives(
note_from_group = (m.group(3) or "").strip()
# If we collapsed multi-word slug into kebab and there's a
# trailing-text group too, append it.
entry = (kind, canonical, note_from_group)
if kind == "sop-n/a":
na_directives.append(entry)
else:
directives.append(entry)
return directives, na_directives
out.append((kind, canonical, note_from_group))
return out, []
# ---------------------------------------------------------------------------
@@ -180,8 +174,8 @@ def section_marker_present(body: str, marker: str) -> bool:
on a non-empty line (i.e. the author actually filled it in).
We require the marker substring AND non-whitespace content on the
same line OR within the next non-blank line — this prevents
trivially-empty checklists like:
same line OR within the next line — this prevents trivially-empty
checklists like:
## SOP-Checklist
- [ ] **Comprehensive testing performed**:
@@ -190,18 +184,9 @@ def section_marker_present(body: str, marker: str) -> bool:
from auto-passing the section-present check. The peer-ack is still
required, but answering with empty content is captured as a soft
finding via the section-present test alone.
NOTE: we scan forward through blank lines (the markdown-header pattern
is ## Header\\n\\ncontent) so that a header + blank-line + content
structure still satisfies the check. The backward checkbox fallback
catches inline markers without a preceding checkbox (mc#1099).
"""
if not body or not marker:
return False
# Strip trailing whitespace so the blank-line scan below can find
# content that appears on the very last line of the body (without
# being misled by a trailing \n or spaces).
body = body.rstrip()
body_lower = body.lower()
marker_lower = marker.lower()
idx = body_lower.find(marker_lower)
@@ -217,44 +202,13 @@ def section_marker_present(body: str, marker: str) -> bool:
stripped = re.sub(r"[\s\*:\-\[\]]+", "", line)
if stripped:
return True
# Fall through: scan forward, skipping blank-only lines, until we find
# non-empty content or run out of body. Handles:
# ## Header ← marker line (empty after marker)
# ← blank line (skipped)
# - actual content ← found
pos = line_end
while True:
# Skip the current newline and any additional newlines (blank lines).
while pos < len(body) and body[pos] == "\n":
pos += 1
if pos >= len(body):
break
line_end = body.find("\n", pos)
if line_end < 0:
line_end = len(body)
line = body[pos:line_end]
stripped = re.sub(r"[\s\*:\-\[\]]+", "", line)
if stripped:
return True
pos = line_end
# Last resort: the marker may appear mid-sentence (e.g.
# **Memory/saved-feedback consulted**: No applicable...).
# Search backward within the CURRENT LINE only (not preceding lines)
# to find a checkbox on the same line before the marker text.
# mc#1099 follow-up: memory-consulted detection was failing because
# the checkbox was on the same line before the inline marker.
_CHECKBOX_RE = re.compile(r"- \[[ x\]]|<input", re.IGNORECASE)
line_start = body.rfind("\n", 0, idx) + 1 # 0 if no newline before idx
before = body[line_start:idx]
m = _CHECKBOX_RE.search(before)
if not m:
return False
# Require meaningful content between the checkbox and the marker text
# (markdown formatting like ** or * must also be stripped).
# If only whitespace/markdown chars remain, the checkbox line is empty.
between = before[m.end() :]
stripped_between = re.sub(r"[\s\*:#\[\]_\-]+", "", between)
return bool(stripped_between)
# Fall through: check the NEXT line (multi-line answers).
next_line_end = body.find("\n", line_end + 1)
if next_line_end < 0:
next_line_end = len(body)
next_line = body[line_end + 1:next_line_end]
stripped_next = re.sub(r"[\s\*:\-\[\]]+", "", next_line)
return bool(stripped_next)
# ---------------------------------------------------------------------------
@@ -297,7 +251,8 @@ def compute_ack_state(
user = (c.get("user") or {}).get("login", "")
if not user:
continue
for kind, slug, _note in parse_directives(body, numeric_aliases)[0]:
directives, _na = parse_directives(body, numeric_aliases)
for kind, slug, _note in directives:
if not slug:
unparseable_per_user[user] = unparseable_per_user.get(user, 0) + 1
continue
@@ -349,63 +304,6 @@ def compute_ack_state(
}
# ---------------------------------------------------------------------------
# N/A-gate evaluation
# ---------------------------------------------------------------------------
def compute_na_state(
comments: list[dict[str, Any]],
author: str,
na_gates: dict[str, Any],
probe: Callable[[str, list[str]], list[str]],
) -> dict[str, dict[str, Any]]:
"""Evaluate which N/A gates have a valid declaration from a team member.
Returns dict[gate_name, dict] where each dict has:
declared: bool — at least one valid non-author team-member declared N/A
decl_ackers: list[str] — usernames who declared this gate N/A
rejected: dict with keys:
not_in_team: list[str] — users who tried but aren't in required teams
"""
# Build per-user latest N/A directive (most-recent wins per RFC#324).
latest_na: dict[str, tuple[str, str]] = {} # user → (gate, note)
for c in comments:
body = c.get("body", "") or ""
user = (c.get("user") or {}).get("login", "")
if not user:
continue
for kind, gate, note in parse_directives(body, {})[1]:
# [1] = na_directives only
if gate in na_gates:
latest_na[user] = (gate, note)
result: dict[str, dict[str, Any]] = {}
for gate, gate_cfg in na_gates.items():
result[gate] = {
"declared": False,
"decl_ackers": [],
"rejected": {"not_in_team": []},
}
decl_ackers: list[str] = []
not_in_team: list[str] = []
for user, (g, _note) in latest_na.items():
if g != gate:
continue
if user == author:
continue # authors cannot self-declare N/A
approved = probe(gate, [user])
if approved:
decl_ackers.append(user)
else:
not_in_team.append(user)
result[gate]["declared"] = bool(decl_ackers)
result[gate]["decl_ackers"] = decl_ackers
result[gate]["rejected"]["not_in_team"] = not_in_team
return result
# ---------------------------------------------------------------------------
# Gitea API client
# ---------------------------------------------------------------------------
@@ -800,7 +698,6 @@ def main(argv: list[str] | None = None) -> int:
cfg = load_config(args.config)
items: list[dict[str, Any]] = cfg["items"]
items_by_slug = {it["slug"]: it for it in items}
na_gates: dict[str, Any] = cfg.get("n/a_gates", {})
numeric_aliases = {
int(it["numeric_alias"]): it["slug"] for it in items if it.get("numeric_alias")
}
@@ -921,46 +818,6 @@ def main(argv: list[str] | None = None) -> int:
description=description, target_url=target_url,
)
print(f"::notice::status posted: {args.status_context}{state}")
# --- N/A gate status (RFC#324 §N/A follow-up) ---
# Post a separate status so review-check.sh can discover N/A declarations
# and waive the Gitea-approve requirement for that gate.
na_state: dict[str, dict[str, Any]] = {}
if na_gates:
na_state = compute_na_state(comments, author, na_gates, probe)
na_descs: list[str] = []
for gate, s in na_state.items():
if s["declared"]:
na_descs.append(gate)
decl = s["decl_ackers"]
rej = s["rejected"]["not_in_team"]
if decl:
print(f"::notice:: [N/A OK] {gate} — declared by {','.join(decl)}")
if rej:
print(
f"::notice:: [N/A REJ] {gate} — not-in-team: {','.join(rej)}",
file=sys.stderr,
)
na_desc = ", ".join(sorted(na_descs)) if na_descs else "(none)"
na_status_state = "success" if na_descs else "pending"
# review-check.sh reads the description to discover which gates are N/A.
# Include the gate names so it can grep for them.
na_description = f"N/A: {na_desc}" if na_descs else "N/A: (none)"
if not args.dry_run:
client.post_status(
args.owner, args.repo, head_sha,
state=na_status_state,
context="sop-checklist / na-declarations (pull_request)",
description=na_description,
target_url=target_url,
)
print(
f"::notice::na-declarations status → {na_status_state}: {na_description}"
)
# By default exit 0 — the POSTed status IS the gate, NOT the job
# conclusion. If the job exits 1 BP will see TWO failure signals
# (one from the job's auto-status, one from our POST), making the
@@ -551,55 +551,3 @@ class TestEndToEndAckFlow(unittest.TestCase):
if __name__ == "__main__":
unittest.main(verbosity=2)
# ---------------------------------------------------------------------------
# compute_na_state
# ---------------------------------------------------------------------------
class TestComputeNaState(unittest.TestCase):
"""Tests for /sop-n/a directive evaluation."""
def test_no_na_declarations(self):
cfg = sop.load_config(CONFIG_PATH)
na_gates = cfg.get("n/a_gates", {})
comments = []
na_state = sop.compute_na_state(comments, "alice", na_gates, lambda *_: [])
self.assertFalse(na_state["qa-review"]["declared"])
self.assertFalse(na_state["security-review"]["declared"])
def test_na_declared_by_authorized_user(self):
cfg = sop.load_config(CONFIG_PATH)
na_gates = cfg.get("n/a_gates", {})
comments = [_comment("bob", "/sop-n/a qa-review N/A: pure tooling change")]
na_state = sop.compute_na_state(comments, "alice", na_gates, lambda g, u: u)
self.assertTrue(na_state["qa-review"]["declared"])
self.assertEqual(na_state["qa-review"]["decl_ackers"], ["bob"])
def test_na_declared_by_unauthorized_user_rejected(self):
cfg = sop.load_config(CONFIG_PATH)
na_gates = cfg.get("n/a_gates", {})
comments = [_comment("mallory", "/sop-n/a qa-review N/A: not real team")]
na_state = sop.compute_na_state(comments, "alice", na_gates, lambda g, u: [])
self.assertFalse(na_state["qa-review"]["declared"])
self.assertEqual(na_state["qa-review"]["rejected"]["not_in_team"], ["mallory"])
def test_author_cannot_self_declare_na(self):
cfg = sop.load_config(CONFIG_PATH)
na_gates = cfg.get("n/a_gates", {})
comments = [_comment("alice", "/sop-n/a qa-review N/A: I am the author")]
na_state = sop.compute_na_state(comments, "alice", na_gates, lambda g, u: u)
self.assertFalse(na_state["qa-review"]["declared"])
def test_parse_directives_separates_na_from_ack(self):
directives, na_directives = sop.parse_directives(
"/sop-ack comprehensive-testing\n/sop-n/a qa-review N/A: no surface",
{},
)
self.assertEqual(len(directives), 1)
self.assertEqual(directives[0][0], "sop-ack")
self.assertEqual(len(na_directives), 1)
self.assertEqual(na_directives[0][0], "sop-n/a")
self.assertEqual(na_directives[0][1], "qa-review")
self.assertIn("no surface", na_directives[0][2])
-6
View File
@@ -32,12 +32,6 @@ on:
# iterating all open PRs when PR_NUMBER is empty.
workflow_dispatch:
# Cancel stale runs so the 8-runner pool stays available for PR jobs.
# Per-SHA group ensures push and cron runs at different SHAs don't cancel each other.
concurrency:
group: gate-check-v3-${{ github.event.pull_request.head.sha || github.sha }}
cancel-in-progress: true
permissions:
# read: contents — for checkout (base ref, not PR head for security)
# read: pull-requests — for reading PR info via API
+7 -11
View File
@@ -49,17 +49,13 @@ jobs:
# bp-exempt: post-merge image publication side effect; CI / all-required gates source changes.
build-and-push:
name: Build & push canvas image
# Dedicated publish/release lane (internal#462 / #394 / #399). Ship
# path (on: push:main, canvas/**) — reserved capacity so a merged
# canvas fix's image build never FIFO-queues behind PR required-CI.
# The `publish` label resolves ONLY to the molecule-runner-publish-*
# sub-pool (config.publish.yaml). HARD DEPENDENCY: this MUST land
# AFTER the publish-lane runners are registered/advertising `publish`
# — the earlier #599 `docker` label attempt queued indefinitely with
# zero eligible runners precisely because the label was targeted
# before any runner advertised it (see #576). The lane is registered
# in this rollout (internal#462) so the precondition holds.
runs-on: publish
# REVERTED (infra/revert-docker-runner-label): `runs-on: ubuntu-latest` restored.
# The `docker` label is not registered on any act_runner. `runs-on: [ubuntu-latest, docker]`
# causes jobs to queue indefinitely with zero eligible runners — strictly worse than the
# pre-#599 coin-flip (50% success rate). Once the `docker` label is registered on
# ≥2 runners, re-apply the fix from #599 (infra/docker-runner-label).
# See issue #576 + infra-lead pulse ~00:30Z.
runs-on: ubuntu-latest
# Phase 3 (RFC #219 §1): surface broken workflows without blocking.
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
+2 -7
View File
@@ -66,10 +66,7 @@ concurrency:
jobs:
publish:
# Dedicated publish/release lane (internal#462 / #394 / #399). Ship
# path (on: push tag runtime-v*) — reserved capacity, never FIFO
# behind PR-CI. `publish` resolves only to molecule-runner-publish-*.
runs-on: publish
runs-on: ubuntu-latest
outputs:
version: ${{ steps.version.outputs.version }}
wheel_sha256: ${{ steps.wheel_hash.outputs.wheel_sha256 }}
@@ -169,9 +166,7 @@ jobs:
cascade:
needs: publish
# Publish/release lane (internal#462) — downstream of the runtime
# publish ship job; keep it on the reserved lane too.
runs-on: publish
runs-on: ubuntu-latest
steps:
- name: Wait for PyPI to propagate the new version
env:
@@ -54,14 +54,7 @@ env:
jobs:
build-and-push:
# Dedicated publish/release lane (internal#462 / #394 / #399). This
# is a post-merge ship job (on: push:main) — it must NOT FIFO-compete
# with PR required-CI on the shared pool (PR#1350's prod image build
# was delayed ~25min this way). The `publish` label resolves ONLY to
# the reserved molecule-runner-publish-* sub-pool (config.publish.yaml,
# OUTSIDE the managed 1..20 range) so a merged fix's image build
# starts immediately while PR-CI keeps the general pool.
runs-on: publish
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
@@ -188,9 +181,7 @@ jobs:
name: Production auto-deploy
needs: build-and-push
if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }}
# Publish/release lane (internal#462) — production deploy of a merged
# fix; reserved capacity, never queued behind PR-CI.
runs-on: publish
runs-on: ubuntu-latest
timeout-minutes: 75
env:
CP_URL: ${{ vars.PROD_CP_URL || 'https://api.moleculesai.app' }}
@@ -68,10 +68,7 @@ jobs:
# bp-exempt: production redeploy is a side-effect workflow, not a merge gate.
redeploy:
if: ${{ github.event_name == 'workflow_dispatch' }}
# Dedicated publish/release lane (internal#462 / #394 / #399).
# Production tenant redeploy — a deploy action, reserved capacity so
# it never queues behind PR-CI. `publish` -> molecule-runner-publish-*.
runs-on: publish
runs-on: ubuntu-latest
# Phase 3 (RFC #219 §1): surface broken workflows without blocking.
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
@@ -75,10 +75,7 @@ env:
jobs:
# bp-exempt: post-merge staging redeploy side effect; CI / all-required gates source changes.
redeploy:
# Dedicated publish/release lane (internal#462 / #394 / #399).
# Post-merge staging redeploy — a deploy action, reserved capacity.
# `publish` -> molecule-runner-publish-* sub-pool.
runs-on: publish
runs-on: ubuntu-latest
# Phase 3 (RFC #219 §1): surface broken workflows without blocking.
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
@@ -44,12 +44,6 @@ on:
- ".github/scripts/lint_secret_pattern_drift.py"
- ".githooks/pre-commit"
# Cancel stale runs to keep the 8-runner pool available for PR jobs.
# Per-SHA group ensures push and scheduled runs at different SHAs don't cancel each other.
concurrency:
group: secret-pattern-drift-${{ github.event.pull_request.head.sha || github.sha }}
cancel-in-progress: true
env:
GITHUB_SERVER_URL: https://git.moleculesai.app
-5
View File
@@ -22,11 +22,6 @@ on:
- cron: '17 4 * * 1' # Mondays at 04:17 UTC
workflow_dispatch:
# Cancel stale runs to keep the 8-runner pool available for PR jobs.
concurrency:
group: weekly-platform-go-${{ github.event.pull_request.head.sha || github.sha }}
cancel-in-progress: true
permissions:
contents: read
statuses: write
+142 -13
View File
@@ -178,6 +178,26 @@ export function deriveProvidersFromModels(models: ModelSpec[]): string[] {
// not this one.
const RUNTIMES_WITH_OWN_CONFIG = new Set<string>(["external", "kimi", "kimi-cli", "openclaw"]);
// Workspace EC2 sizing options. The control-plane is the authoritative
// allowlist + bounds enforcement point (controlplane
// internal/provisioner/ec2.go workspaceInstanceTypeAllowlist; disk
// clamped to [30,500]); this list MUST stay in sync — an entry here the
// CP rejects would let the user save an override the CP then silently
// falls back to the default for. "" = the platform default
// (t3.large / 50GB). Sizing is decoupled from the access tier.
const WORKSPACE_INSTANCE_TYPES: { value: string; label: string }[] = [
{ value: "", label: "Default (t3.large — 2 vCPU / 8GB)" },
{ value: "t3.medium", label: "t3.medium — 2 vCPU / 4GB (smallest)" },
{ value: "t3.large", label: "t3.large — 2 vCPU / 8GB" },
{ value: "t3.xlarge", label: "t3.xlarge — 4 vCPU / 16GB" },
{ value: "t3.2xlarge", label: "t3.2xlarge — 8 vCPU / 32GB (largest)" },
{ value: "m6i.large", label: "m6i.large — 2 vCPU / 8GB (steady CPU)" },
{ value: "m6i.xlarge", label: "m6i.xlarge — 4 vCPU / 16GB (steady CPU)" },
{ value: "c6i.xlarge", label: "c6i.xlarge — 4 vCPU / 8GB (compute)" },
];
const WORKSPACE_DISK_MIN_GB = 30;
const WORKSPACE_DISK_MAX_GB = 500;
const FALLBACK_RUNTIME_OPTIONS: RuntimeOption[] = [
{ value: "", label: "LangGraph (default)", models: [], providers: [] },
{ value: "claude-code", label: "Claude Code", models: [], providers: [] },
@@ -210,6 +230,19 @@ export function ConfigTab({ workspaceId }: Props) {
// data, written into /configs/config.yaml on next provision too).
const [provider, setProvider] = useState("");
const [originalProvider, setOriginalProvider] = useState("");
// Per-workspace EC2 sizing override (DB-backed, NOT in config.yaml).
// Separate state from `config` for the same reason as provider/model:
// these live on the workspace row, not the template YAML. Empty
// instanceType + 0 diskGB = "use the platform default" (t3.large /
// 50GB). Sizing is ORTHOGONAL to the access tier (the Tier select in
// General) — T4 = full root access only; it does not size the box.
// Resize semantics: provision-time only (AWS can't change instance
// type live / shrink EBS in place) — surfaced in the section copy.
const [instanceType, setInstanceType] = useState("");
const [originalInstanceType, setOriginalInstanceType] = useState("");
const [diskGB, setDiskGB] = useState(0);
const [originalDiskGB, setOriginalDiskGB] = useState(0);
const [sizingError, setSizingError] = useState<string | null>(null);
// Track the model the form first rendered, so handleSave can detect
// whether the user actually changed it (vs. only edited tier/skills/etc).
// Two field sources contribute:
@@ -259,8 +292,8 @@ export function ConfigTab({ workspaceId }: Props) {
// See GH #1894 for the workspace-row-as-source-of-truth rationale
// that motivated splitting from a single config.yaml read.
const [wsRes, modelRes, providerRes] = await Promise.all([
api.get<{ runtime?: string; tier?: number }>(`/workspaces/${workspaceId}`)
.catch(() => ({} as { runtime?: string; tier?: number })),
api.get<{ runtime?: string; tier?: number; instance_type?: string | null; disk_gb?: number | null }>(`/workspaces/${workspaceId}`)
.catch(() => ({} as { runtime?: string; tier?: number; instance_type?: string | null; disk_gb?: number | null })),
api.get<{ model?: string }>(`/workspaces/${workspaceId}/model`)
.catch(() => ({} as { model?: string })),
api.get<{ provider?: string }>(`/workspaces/${workspaceId}/provider`)
@@ -278,6 +311,19 @@ export function ConfigTab({ workspaceId }: Props) {
setProvider("");
setOriginalProvider("");
}
// Sizing override comes from the workspace row (GET /workspaces/:id
// returns instance_type/disk_gb, null when no override → render as
// the platform default). Snapshot originals so handleSave only
// PATCHes when the user actually changed them.
{
const loadedType = (wsRes.instance_type || "").trim();
const loadedDisk =
typeof wsRes.disk_gb === "number" ? wsRes.disk_gb : 0;
setInstanceType(loadedType);
setOriginalInstanceType(loadedType);
setDiskGB(loadedDisk);
setOriginalDiskGB(loadedDisk);
}
// originalModel is set further down once the YAML has been parsed —
// we want it to reflect what the form ACTUALLY rendered, which may
// be the YAML's runtime_config.model fallback when MODEL_PROVIDER
@@ -581,6 +627,31 @@ export function ConfigTab({ workspaceId }: Props) {
}
}
// Sizing override → PATCH /workspaces/:id. Send only the fields
// the user changed. "" / 0 clears the override (handler maps to
// NULL → CP default). Sizing is provision-time only; the handler
// returns needs_restart so the user knows it takes effect on the
// next (re)provision — we do NOT pretend it applied live.
let sizingSaveError: string | null = null;
const instanceTypeChanged = instanceType !== originalInstanceType;
const diskChanged = diskGB !== originalDiskGB;
let sizingWillNeedRestart = false;
if (instanceTypeChanged || diskChanged) {
const sizingPatch: Record<string, unknown> = {};
if (instanceTypeChanged) sizingPatch.instance_type = instanceType || null;
if (diskChanged) sizingPatch.disk_gb = diskGB || null;
try {
await api.patch(`/workspaces/${workspaceId}`, sizingPatch);
setOriginalInstanceType(instanceType);
setOriginalDiskGB(diskGB);
setSizingError(null);
sizingWillNeedRestart = true;
} catch (e) {
sizingSaveError = e instanceof Error ? e.message : "Sizing update was rejected";
setSizingError(sizingSaveError);
}
}
setOriginalYaml(content);
if (rawMode) {
const parsed = parseYaml(content);
@@ -598,18 +669,25 @@ export function ConfigTab({ workspaceId }: Props) {
if (restart && !providerWillAutoRestart) {
await useCanvasStore.getState().restartWorkspace(workspaceId);
} else if (!restart) {
useCanvasStore.getState().updateNodeData(workspaceId, { needsRestart: !providerWillAutoRestart });
// A sizing change only takes effect on the next (re)provision
// (AWS can't resize a live instance), so flag needsRestart so
// the user gets the "restart to apply" affordance instead of
// silently believing the new size is active.
const needsRestart = !providerWillAutoRestart || sizingWillNeedRestart;
useCanvasStore.getState().updateNodeData(workspaceId, { needsRestart });
}
// Aggregate partial-save errors. Both modelSaveError and
// providerSaveError describe rejected updates from independent
// endpoints — show whichever fired so the user knows which
// field reverts on next reload (otherwise they'd see "Saved" and
// be confused why Provider snapped back).
const partialError = providerSaveError
? `Other fields saved, but provider update failed: ${providerSaveError}`
: modelSaveError
? `Other fields saved, but model update failed: ${modelSaveError}`
: null;
// Aggregate partial-save errors. modelSaveError, providerSaveError
// and sizingSaveError describe rejected updates from independent
// endpoints — show whichever fired so the user knows which field
// reverts on next reload (otherwise they'd see "Saved" and be
// confused why a field snapped back).
const partialError = sizingSaveError
? `Other fields saved, but sizing update failed: ${sizingSaveError}`
: providerSaveError
? `Other fields saved, but provider update failed: ${providerSaveError}`
: modelSaveError
? `Other fields saved, but model update failed: ${modelSaveError}`
: null;
if (partialError) {
setError(partialError);
} else {
@@ -628,6 +706,8 @@ export function ConfigTab({ workspaceId }: Props) {
const descriptionId = useId();
const tierId = useId();
const runtimeId = useId();
const instanceTypeId = useId();
const diskGBId = useId();
const effortId = useId();
const taskBudgetId = useId();
const sandboxBackendId = useId();
@@ -706,6 +786,55 @@ export function ConfigTab({ workspaceId }: Props) {
</div>
</Section>
<Section title="Sizing" defaultOpen={false}>
<p className="text-[10px] text-ink-mid mb-2 leading-relaxed">
Per-workspace EC2 size. Independent of Tier Tier controls
access (T4 = full root), this controls how big the box is.
Default is t3.large / 50&nbsp;GB. Changes apply on the next
restart / re-provision (AWS cannot resize a running instance
or shrink a disk in place).
</p>
<div className="grid grid-cols-2 gap-3">
<div>
<label htmlFor={instanceTypeId} className="text-[10px] text-ink-mid block mb-1">Instance type</label>
<select
id={instanceTypeId}
value={instanceType}
onChange={(e) => setInstanceType(e.target.value)}
aria-label="Instance type"
className="w-full bg-surface-card border border-line rounded px-2 py-1 text-xs text-ink focus:outline-none focus:border-accent"
>
{WORKSPACE_INSTANCE_TYPES.map((o) => (
<option key={o.value} value={o.value}>{o.label}</option>
))}
</select>
</div>
<div>
<label htmlFor={diskGBId} className="text-[10px] text-ink-mid block mb-1">
Disk (GB) 0 = default 50
</label>
<input
id={diskGBId}
type="number"
value={diskGB}
min={0}
max={WORKSPACE_DISK_MAX_GB}
onChange={(e) => setDiskGB(parseInt(e.target.value, 10) || 0)}
aria-label="Disk size in GB"
className="w-full bg-surface-card border border-line rounded px-2 py-1 text-xs text-ink focus:outline-none focus:border-accent font-mono"
/>
{diskGB !== 0 && (diskGB < WORKSPACE_DISK_MIN_GB || diskGB > WORKSPACE_DISK_MAX_GB) && (
<div className="text-[10px] text-bad mt-1">
Will be clamped to {WORKSPACE_DISK_MIN_GB}{WORKSPACE_DISK_MAX_GB} GB by the platform.
</div>
)}
</div>
</div>
{sizingError && (
<div className="text-[10px] text-bad mt-2">{sizingError}</div>
)}
</Section>
<Section title="Runtime">
<div>
<label htmlFor={runtimeId} className="text-[10px] text-ink-mid block mb-1">Runtime</label>
@@ -0,0 +1,145 @@
// @vitest-environment jsdom
//
// Tests for the workspace EC2 Sizing section (tier↔sizing decoupling,
// Hongming 2026-05-15).
//
// What this pins:
// 1. A "Sizing" section exists, separate from the Tier control.
// 2. It loads the workspace's instance_type / disk_gb from
// GET /workspaces/:id and renders them.
// 3. Changing the override + Save PATCHes /workspaces/:id with the
// changed sizing fields (proves the override is not a silent
// no-op in the UI — feedback_no_proxy_e2e_claims).
// 4. Section copy states sizing is independent of Tier and applies
// on the next restart (provision-time only).
import { describe, it, expect, vi, afterEach, beforeEach } from "vitest";
import { render, screen, cleanup, waitFor, fireEvent } from "@testing-library/react";
import React from "react";
afterEach(cleanup);
const apiGet = vi.fn();
const apiPatch = vi.fn();
const apiPut = vi.fn();
vi.mock("@/lib/api", () => ({
api: {
get: (path: string) => apiGet(path),
patch: (path: string, body?: unknown) => apiPatch(path, body),
put: (path: string, body?: unknown) => apiPut(path, body),
post: vi.fn(),
del: vi.fn(),
},
}));
const storeUpdateNodeData = vi.fn();
const storeRestartWorkspace = vi.fn();
vi.mock("@/store/canvas", () => ({
useCanvasStore: Object.assign(
(selector: (s: unknown) => unknown) =>
selector({ restartWorkspace: storeRestartWorkspace, updateNodeData: storeUpdateNodeData }),
{
getState: () => ({
restartWorkspace: storeRestartWorkspace,
updateNodeData: storeUpdateNodeData,
}),
},
),
}));
vi.mock("../AgentCardSection", () => ({
AgentCardSection: () => <div data-testid="agent-card-stub" />,
}));
import { ConfigTab } from "../ConfigTab";
function mockApi(opts: { instanceType?: string | null; diskGB?: number | null } = {}) {
apiGet.mockReset();
apiPatch.mockReset();
apiPut.mockReset();
apiPatch.mockResolvedValue({});
apiPut.mockResolvedValue({});
apiGet.mockImplementation((path: string) => {
if (path === `/workspaces/ws-test`) {
return Promise.resolve({
runtime: "claude-code",
tier: 4,
instance_type: opts.instanceType ?? null,
disk_gb: opts.diskGB ?? null,
});
}
if (path === `/workspaces/ws-test/model`) return Promise.resolve({ model: "claude-opus-4-7" });
if (path === `/workspaces/ws-test/provider`) return Promise.resolve({ provider: "anthropic-oauth" });
if (path === `/workspaces/ws-test/files/config.yaml`) {
return Promise.resolve({ content: "name: test\nruntime: claude-code\n" });
}
if (path === "/templates") {
return Promise.resolve([{ id: "claude-code", name: "Claude Code", runtime: "claude-code", providers: [] }]);
}
return Promise.reject(new Error(`unmocked api.get: ${path}`));
});
}
describe("ConfigTab Sizing section", () => {
beforeEach(() => mockApi());
it("renders a Sizing section distinct from the Tier control", async () => {
render(<ConfigTab workspaceId="ws-test" />);
await waitFor(() => expect(apiGet).toHaveBeenCalled());
const sizingBtn = screen.getByRole("button", { name: /^Sizing/i });
expect(sizingBtn).toBeTruthy();
fireEvent.click(sizingBtn);
// Copy must state independence from Tier + restart-to-apply.
await waitFor(() => {
const blurb = screen.queryAllByText((_, el) =>
(el?.textContent || "").includes("Independent of Tier"),
);
expect(blurb.length).toBeGreaterThan(0);
});
});
it("loads the workspace's saved override into the controls", async () => {
mockApi({ instanceType: "t3.xlarge", diskGB: 120 });
render(<ConfigTab workspaceId="ws-test" />);
await waitFor(() => expect(apiGet).toHaveBeenCalled());
fireEvent.click(screen.getByRole("button", { name: /^Sizing/i }));
const typeSelect = (await screen.findByLabelText("Instance type")) as HTMLSelectElement;
const diskInput = screen.getByLabelText("Disk size in GB") as HTMLInputElement;
expect(typeSelect.value).toBe("t3.xlarge");
expect(diskInput.value).toBe("120");
});
it("PATCHes /workspaces/:id with the changed sizing on Save", async () => {
render(<ConfigTab workspaceId="ws-test" />);
await waitFor(() => expect(apiGet).toHaveBeenCalled());
fireEvent.click(screen.getByRole("button", { name: /^Sizing/i }));
const typeSelect = (await screen.findByLabelText("Instance type")) as HTMLSelectElement;
fireEvent.change(typeSelect, { target: { value: "t3.2xlarge" } });
const diskInput = screen.getByLabelText("Disk size in GB") as HTMLInputElement;
fireEvent.change(diskInput, { target: { value: "200" } });
fireEvent.click(screen.getByRole("button", { name: /^Save$/i }));
await waitFor(() => {
const sizingCall = apiPatch.mock.calls.find(
(c) => c[0] === "/workspaces/ws-test" && c[1] && ("instance_type" in c[1] || "disk_gb" in c[1]),
);
expect(sizingCall).toBeTruthy();
expect(sizingCall![1]).toMatchObject({ instance_type: "t3.2xlarge", disk_gb: 200 });
});
});
it("does not PATCH sizing when the user did not change it", async () => {
mockApi({ instanceType: "t3.large", diskGB: 50 });
render(<ConfigTab workspaceId="ws-test" />);
await waitFor(() => expect(apiGet).toHaveBeenCalled());
fireEvent.click(screen.getByRole("button", { name: /^Sizing/i }));
fireEvent.click(screen.getByRole("button", { name: /^Save$/i }));
await waitFor(() => expect(screen.queryByText(/Saving/i)).toBeNull());
const sizingCall = apiPatch.mock.calls.find(
(c) => c[1] && ("instance_type" in c[1] || "disk_gb" in c[1]),
);
expect(sizingCall).toBeFalsy();
});
});
+4 -1
View File
@@ -30,7 +30,10 @@
{"name": "openclaw", "repo": "molecule-ai/molecule-ai-workspace-template-openclaw", "ref": "main"},
{"name": "codex", "repo": "molecule-ai/molecule-ai-workspace-template-codex", "ref": "main"},
{"name": "langgraph", "repo": "molecule-ai/molecule-ai-workspace-template-langgraph", "ref": "main"},
{"name": "autogen", "repo": "molecule-ai/molecule-ai-workspace-template-autogen", "ref": "main"}
{"name": "crewai", "repo": "molecule-ai/molecule-ai-workspace-template-crewai", "ref": "main"},
{"name": "autogen", "repo": "molecule-ai/molecule-ai-workspace-template-autogen", "ref": "main"},
{"name": "deepagents", "repo": "molecule-ai/molecule-ai-workspace-template-deepagents", "ref": "main"},
{"name": "gemini-cli", "repo": "molecule-ai/molecule-ai-workspace-template-gemini-cli", "ref": "main"}
],
"org_templates": [
{"name": "molecule-dev", "repo": "molecule-ai/molecule-ai-org-template-molecule-dev", "ref": "main"},
@@ -1,160 +0,0 @@
package handlers
// Regression coverage for the POLL-mode arm of the canvas user-message
// data-loss bug (internal#470 sibling — tracked on internal#471).
//
// Bug (reported 2026-05-16 by CTO Hongming): "in canvas i sometimes lose
// my own message when i exit chat". The push-mode arm was fixed by
// #1347 (persistUserMessageAtIngest — a SYNCHRONOUS, before-dispatch,
// context.WithoutCancel INSERT). #1347's framing asserted "poll-mode
// workspaces were never affected — logA2AReceiveQueued already persists
// at ingest". That assertion is OVERSTATED.
//
// Hongming's tenant (slug `hongming`, org 2c940477-...) has 4 workspaces,
// ALL runtime=external with empty URL → ALL delivery_mode=poll (proven
// empirically: a benign A2A probe returns the synthetic
// {"delivery_mode":"poll","status":"queued"} envelope for every one).
// So his reported loss is the POLL path, NOT the push path #1347 fixes.
//
// Root cause (poll arm): the poll-mode short-circuit (a2a_proxy.go ~402)
// calls logA2AReceiveQueued and then IMMEDIATELY returns the synthetic
// 200 {status:"queued"} to the canvas. But logA2AReceiveQueued's durable
// INSERT runs inside h.goAsync(...) — a DETACHED goroutine with NO
// happens-before barrier against the HTTP response. The canvas sees 200
// ("message accepted") while the activity_logs row may not yet be — and,
// on a workspace-server restart / deploy / OOM / EC2 hibernation between
// the 200 and the goroutine's commit, NEVER will be — durable. There is
// also no fallback (unlike push-mode's legacy-INSERT fallback): a
// swallowed LogActivity error loses the message with only a log line.
// Chat-history reads activity_logs (postgres_store.go:165-187); a missing
// row = message gone on reopen. That is exactly Hongming's symptom.
//
// Fix (parity with push-mode): the poll-mode ingest persist of the
// canvas user message must be SYNCHRONOUS — committed before the queued
// 200 is returned — on a context.WithoutCancel derived context, so a
// client disconnect on chat-exit and a post-response restart cannot lose
// it. Behavior is never worse than today (best-effort; a persist error
// still returns queued).
//
// TEST DESIGN NOTE: sqlmock.ExpectationsWereMet() hangs indefinitely if
// the expected query never fires. We use a select+default+time.After
// pattern so the test FAILS fast (not hangs) when the production code
// regresses to async (the INSERT never fires before handler returns),
// while still returning promptly when all expectations are met. The
// insertDelay is kept small (50ms) to minimise suite-level timing
// impact under -race detection, where mock delays are amplified by
// the instrumenter's goroutine overhead.
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// TestProxyA2A_PollMode_PersistsUserMessageSynchronouslyBeforeQueuedResponse
// is the defining contract: for a poll-mode workspace, the canvas user
// message MUST be durably INSERTed into activity_logs BEFORE the synthetic
// queued 200 is returned to the client — with NO reliance on a detached
// async goroutine completing later.
//
// The test proves the ordering by making the INSERT block briefly and
// asserting the handler does NOT return until the INSERT has completed.
// Pre-fix (INSERT in h.goAsync, response returned immediately) the
// handler returns ~instantly while the INSERT is still pending in the
// goroutine → the elapsed time is far below the injected INSERT delay and
// ExpectationsWereMet() is racy/unmet at return. Post-fix (synchronous
// persist before the queued response) the handler return is gated on the
// INSERT, so elapsed >= the injected delay and the expectation is met
// deterministically at return WITHOUT any waitAsyncForTest()/sleep.
func TestProxyA2A_PollMode_PersistsUserMessageSynchronouslyBeforeQueuedResponse(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
const wsID = "ws-poll-sync-persist"
// Keep delay small: -race detection amplifies mock delays significantly.
// A 50ms delay is sufficient to prove synchronous blocking (~50× the
// normal INSERT latency) without bloating the full ./... suite runtime.
const insertDelay = 50 * time.Millisecond
expectBudgetCheck(mock, wsID)
// lookupDeliveryMode → poll, triggering the short-circuit.
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll"))
// workspace-name lookup inside logA2AReceiveQueued.
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Poll WS"))
// The durable user-message write. We delay it so a synchronous
// persist visibly gates the handler return; a detached-goroutine
// persist (pre-fix) does not. The fix must keep using
// context.WithoutCancel so this write survives a chat-exit cancel.
mock.ExpectExec("INSERT INTO activity_logs").
WillDelayFor(insertDelay).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
// callerID == "" (no X-Workspace-ID) → this is a canvas_user message,
// exactly Hongming's case.
body := `{"jsonrpc":"2.0","id":"poll-canvas-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"my own message"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
start := time.Now()
handler.ProxyA2A(c)
elapsed := time.Since(start)
// Defining assertion #1: the handler must not have returned the
// queued response before the durable INSERT committed. Pre-fix this
// fails (elapsed ≈ 0, INSERT still racing in goAsync).
if elapsed < insertDelay {
t.Fatalf("poll-mode queued response returned in %v, before the %v user-message INSERT — "+
"the message is not durable when the client/process goes away (DATA LOSS). "+
"Persist must be synchronous before the queued 200.", elapsed, insertDelay)
}
// Defining assertion #2: the durable write actually happened by the
// time the handler returned. ExpectionsWereMet() hangs indefinitely if
// the mock never fires (e.g. production code regressed to async),
// so we check it in a goroutine with a hard 2s timeout — fails fast
// (no CI hang) on regression while returning promptly on success.
expectDone := make(chan error, 1)
go func() { expectDone <- mock.ExpectationsWereMet() }()
select {
case err := <-expectDone:
if err != nil {
t.Fatalf("user-message INSERT was not durable at handler return (unmet sqlmock expectations): %v", err)
}
case <-time.After(2 * time.Second):
t.Fatalf("ExpectationsWereMet() hung for >2s — INSERT mock never fired. " +
"Likely cause: production code regressed logA2AReceiveQueued to goAsync " +
"(INSERT fires after handler returns, not before).")
}
// Sanity: still the correct poll-mode envelope + status.
if w.Code != http.StatusOK {
t.Fatalf("expected 200 (queued), got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response is not valid JSON: %v", err)
}
if resp["status"] != "queued" || resp["delivery_mode"] != "poll" {
t.Errorf("poll envelope changed: got status=%v delivery_mode=%v, want queued/poll",
resp["status"], resp["delivery_mode"])
}
}
@@ -504,49 +504,25 @@ func lookupDeliveryMode(ctx context.Context, workspaceID string) string {
// reads in PR 3 — that's how a poll-mode workspace receives inbound A2A
// without a public URL.
func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, callerID string, body []byte, a2aMethod string) {
// DATA-LOSS FIX (internal#471 — poll-mode sibling of #1347/internal#470):
// this is the ONLY durable write of a poll-mode inbound message,
// including a canvas_user message (callerID == "") typed in the canvas
// chat. It MUST be SYNCHRONOUS and complete BEFORE the caller returns
// the synthetic {status:"queued"} 200 — otherwise the canvas sees the
// send acknowledged while the activity_logs row is still racing in a
// detached goroutine, and a workspace-server restart / deploy / OOM /
// EC2 hibernation between the 200 and the goroutine's commit loses the
// user's message permanently (chat-history reads activity_logs, so a
// missing row = message gone on reopen). Hongming's tenant is entirely
// poll-mode (4 external workspaces, no URL — verified empirically), so
// his reported loss is THIS path; #1347 (push-mode, persists AFTER the
// poll short-circuit) structurally cannot cover it.
//
// Mirrors persistUserMessageAtIngest's discipline:
// - context.WithoutCancel: a client disconnect on chat-exit (which
// cancels the inbound request ctx) MUST NOT abort this write.
// - SYNCHRONOUS (no goAsync): the row must be durable before the
// queued 200 is returned to the caller.
// - Best-effort: LogActivity already logs+swallows INSERT errors, so
// a hiccup never blocks or fails the user's send (behavior for
// that one request is never worse than the pre-fix async path).
// The post-commit broadcast still fires inside LogActivity; a missed
// WebSocket event is not data loss (the durable row is the truth the
// canvas re-reads on reopen).
insCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
defer cancel()
var wsName string
db.DB.QueryRowContext(insCtx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
db.DB.QueryRowContext(ctx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
if wsName == "" {
wsName = workspaceID
}
summary := a2aMethod + " → " + wsName + " (queued for poll)"
LogActivity(insCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
RequestBody: json.RawMessage(body),
Status: "ok",
h.goAsync(func() {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
RequestBody: json.RawMessage(body),
Status: "ok",
})
})
}
@@ -44,8 +44,8 @@ func NewWorkspaceImageService(docker *dockerclient.Client) *WorkspaceImageServic
// AllRuntimes is the canonical list mirroring docs/workspace-runtime-package.md.
// Update both when a new template is added.
var AllRuntimes = []string{
"claude-code", "langgraph", "autogen",
"hermes", "openclaw",
"claude-code", "langgraph", "crewai", "autogen",
"deepagents", "hermes", "gemini-cli", "openclaw",
}
// RefreshResult is the per-call outcome surfaced to HTTP callers AND logged
@@ -177,7 +177,7 @@ func isEnvIdentPart(c byte) bool {
return isEnvIdentStart(c) || (c >= '0' && c <= '9')
}
// loadWorkspaceEnv reads the org root .env and the workspace-specific .env
// loadWorkspaceEnv reads the org root .env and the workspace-specific .env .env and the workspace-specific .env
// (workspace overrides org root). Used by both secret injection and channel
// config expansion.
//
@@ -805,5 +805,28 @@ func (h *WorkspaceHandler) Get(c *gin.Context) {
h.addProvisionTimeoutMs(ws, rt)
}
// Per-workspace EC2 sizing override (canvas Config tab). NULL → the
// CP applies its default (t3.large / 50GB); we surface null so the
// UI can render "Default" rather than a stale value. Non-sensitive
// (it's the size the user themselves configured). Separate query
// for the same reason as last_outbound_at above — keeps the shared
// scanWorkspaceRow column list (used by list endpoints too) stable.
var instanceType sql.NullString
var diskGB sql.NullInt64
if err := db.DB.QueryRowContext(c.Request.Context(),
`SELECT instance_type, disk_gb FROM workspaces WHERE id = $1`, id,
).Scan(&instanceType, &diskGB); err == nil {
if instanceType.Valid && instanceType.String != "" {
ws["instance_type"] = instanceType.String
} else {
ws["instance_type"] = nil
}
if diskGB.Valid && diskGB.Int64 != 0 {
ws["disk_gb"] = diskGB.Int64
} else {
ws["disk_gb"] = nil
}
}
c.JSON(http.StatusOK, ws)
}
@@ -218,6 +218,78 @@ func (h *WorkspaceHandler) Update(c *gin.Context) {
}
}
needsRestart := false
// Per-workspace EC2 sizing override (canvas Config tab). Sizing is
// ORTHOGONAL to tier — tier is the ACCESS model (T4 = full root
// access), it does NOT drive sizing. The CP is the enforcement
// point (allowlist + [30,500] disk clamp); here we persist intent
// and reject obviously-bad input early. instance_type="" / disk_gb=0
// (or JSON null) clears the override → CP falls back to its default
// (t3.large / 50GB).
//
// Resize semantics: provision-time only. AWS cannot change instance
// type live (needs stop/start) and cannot shrink EBS in place. So a
// sizing change sets needs_restart=true — the new spec takes effect
// when the workspace is next (re)provisioned. We do NOT pretend it
// applied live.
if it, ok := body["instance_type"]; ok {
if it == nil {
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET instance_type = NULL, updated_at = now() WHERE id = $1`, id); err != nil {
log.Printf("Update instance_type (clear) error for %s: %v", id, err)
}
needsRestart = true
} else if s, isStr := it.(string); isStr {
if s != "" && !isAllowedWorkspaceInstanceType(s) {
c.JSON(http.StatusBadRequest, gin.H{
"error": "unsupported instance_type",
"allowed": allowedWorkspaceInstanceTypes,
})
return
}
// Empty string also clears the override (store NULL).
var val interface{}
if s != "" {
val = s
}
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET instance_type = $2, updated_at = now() WHERE id = $1`, id, val); err != nil {
log.Printf("Update instance_type error for %s: %v", id, err)
}
needsRestart = true
} else {
c.JSON(http.StatusBadRequest, gin.H{"error": "instance_type must be a string or null"})
return
}
}
if dg, ok := body["disk_gb"]; ok {
if dg == nil {
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET disk_gb = NULL, updated_at = now() WHERE id = $1`, id); err != nil {
log.Printf("Update disk_gb (clear) error for %s: %v", id, err)
}
needsRestart = true
} else if f, isNum := dg.(float64); isNum {
gb := int(f)
if gb == 0 {
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET disk_gb = NULL, updated_at = now() WHERE id = $1`, id); err != nil {
log.Printf("Update disk_gb (clear) error for %s: %v", id, err)
}
} else {
// Reject implausible values early; the CP still clamps
// to [30,500] authoritatively.
if gb < 0 || gb > 100000 {
c.JSON(http.StatusBadRequest, gin.H{"error": "disk_gb out of range"})
return
}
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET disk_gb = $2, updated_at = now() WHERE id = $1`, id, gb); err != nil {
log.Printf("Update disk_gb error for %s: %v", id, err)
}
}
needsRestart = true
} else {
c.JSON(http.StatusBadRequest, gin.H{"error": "disk_gb must be a number or null"})
return
}
}
if wsDir, ok := body["workspace_dir"]; ok {
// ValidateWorkspaceDir was already called above before the existence check;
// the UPDATE itself is unconditional.
@@ -251,6 +323,35 @@ func (h *WorkspaceHandler) Update(c *gin.Context) {
c.JSON(http.StatusOK, resp)
}
// allowedWorkspaceInstanceTypes is the user-selectable workspace EC2
// instance-type allowlist surfaced to the canvas Config tab and used
// for early rejection in the PATCH handler. It MIRRORS the CP's
// authoritative allowlist (controlplane internal/provisioner/ec2.go
// workspaceInstanceTypeAllowlist) — the CP is the enforcement point;
// this copy gives the user a fast, clear 400 instead of letting the
// CP silently fall back to the default. Keep the two in sync: a value
// here that the CP rejects would let the user save an override that
// then silently no-ops at provision (exactly the failure mode this
// feature is meant to avoid). Covered by a drift test.
var allowedWorkspaceInstanceTypes = []string{
"c6i.xlarge",
"m6i.large",
"m6i.xlarge",
"t3.2xlarge",
"t3.large",
"t3.medium",
"t3.xlarge",
}
func isAllowedWorkspaceInstanceType(t string) bool {
for _, a := range allowedWorkspaceInstanceTypes {
if a == t {
return true
}
}
return false
}
// validateWorkspaceDir checks that a workspace_dir path is safe to bind-mount.
func validateWorkspaceDir(dir string) error {
if !filepath.IsAbs(dir) {
@@ -2,6 +2,7 @@ package handlers
import (
"context"
"database/sql"
"fmt"
"log"
"os"
@@ -259,17 +260,34 @@ func (h *WorkspaceHandler) buildProvisionerConfig(
// present) wins, matching the existing WorkspaceDir precedence.
workspacePath := payload.WorkspaceDir
workspaceAccess := payload.WorkspaceAccess
if (workspacePath == "" || workspaceAccess == "") && db.DB != nil {
var dbDir, dbAccess string
if err := db.DB.QueryRow(
`SELECT COALESCE(workspace_dir, ''), COALESCE(workspace_access, 'none') FROM workspaces WHERE id = $1`,
workspaceID,
).Scan(&dbDir, &dbAccess); err == nil {
if workspacePath == "" && dbDir != "" {
workspacePath = dbDir
}
if workspaceAccess == "" {
workspaceAccess = dbAccess
// Per-workspace sizing override (instance_type / disk_gb). Like
// workspace_dir/workspace_access these are DB-backed so a restart /
// reprovision picks up an override the user set via the canvas
// Config tab AFTER create. NULL/0 → leave the CP request fields
// empty so the CP applies its default (t3.large/50GB). Sizing is
// orthogonal to tier — see migration 20260515140000.
var instanceType string
var diskGB int32
{
var dbDir, dbAccess, dbInstanceType string
var dbDiskGB sql.NullInt64
if (workspacePath == "" || workspaceAccess == "") && db.DB != nil {
if err := db.DB.QueryRow(
`SELECT COALESCE(workspace_dir, ''), COALESCE(workspace_access, 'none'),
COALESCE(instance_type, ''), disk_gb
FROM workspaces WHERE id = $1`,
workspaceID,
).Scan(&dbDir, &dbAccess, &dbInstanceType, &dbDiskGB); err == nil {
if workspacePath == "" && dbDir != "" {
workspacePath = dbDir
}
if workspaceAccess == "" {
workspaceAccess = dbAccess
}
instanceType = dbInstanceType
if dbDiskGB.Valid {
diskGB = int32(dbDiskGB.Int64)
}
}
}
}
@@ -288,6 +306,8 @@ func (h *WorkspaceHandler) buildProvisionerConfig(
WorkspacePath: workspacePath,
WorkspaceAccess: workspaceAccess,
Tier: payload.Tier,
InstanceType: instanceType,
DiskGB: diskGB,
Runtime: payload.Runtime,
EnvVars: envVars,
PlatformURL: h.platformURL,
@@ -0,0 +1,77 @@
package handlers
import (
"sort"
"testing"
)
// TestWorkspaceInstanceTypeAllowlist_MirrorsCP pins the workspace-server
// copy of the instance-type allowlist to the CP's authoritative list
// (controlplane internal/provisioner/ec2.go workspaceInstanceTypeAllowlist).
//
// The CP is the enforcement point — it returns 400 on an unsupported
// type. workspace-server keeps a copy ONLY so the user gets a fast,
// clear rejection in the canvas Config tab instead of a round-trip to
// the CP. If the two drift, a user could save an override that the CP
// then silently falls back to the default for — exactly the
// "told a change took when it didn't" failure this feature exists to
// prevent. The two repos can't share a Go package (separate modules),
// so this test hard-codes the expected set; updating it is the
// deliberate checkpoint when the CP allowlist changes.
//
// CP source of truth (controlplane internal/provisioner/ec2.go):
//
// var workspaceInstanceTypeAllowlist = map[string]struct{}{
// "t3.medium", "t3.large", "t3.xlarge", "t3.2xlarge",
// "m6i.large", "m6i.xlarge", "c6i.xlarge",
// }
func TestWorkspaceInstanceTypeAllowlist_MirrorsCP(t *testing.T) {
cpAuthoritative := []string{
"c6i.xlarge",
"m6i.large",
"m6i.xlarge",
"t3.2xlarge",
"t3.large",
"t3.medium",
"t3.xlarge",
}
got := append([]string(nil), allowedWorkspaceInstanceTypes...)
sort.Strings(got)
want := append([]string(nil), cpAuthoritative...)
sort.Strings(want)
if len(got) != len(want) {
t.Fatalf("allowlist size drift: workspace-server has %d (%v), CP has %d (%v) — keep in sync with controlplane workspaceInstanceTypeAllowlist",
len(got), got, len(want), want)
}
for i := range got {
if got[i] != want[i] {
t.Fatalf("allowlist drift at %d: workspace-server=%q CP=%q\nfull ws=%v\nfull CP=%v",
i, got[i], want[i], got, want)
}
}
}
// TestIsAllowedWorkspaceInstanceType pins the membership helper used by
// the PATCH handler's early-reject path.
func TestIsAllowedWorkspaceInstanceType(t *testing.T) {
if !isAllowedWorkspaceInstanceType("t3.large") {
t.Error("t3.large (the default) must be allowed")
}
if !isAllowedWorkspaceInstanceType("t3.medium") {
t.Error("t3.medium (floor) must be allowed")
}
if !isAllowedWorkspaceInstanceType("t3.2xlarge") {
t.Error("t3.2xlarge (ceiling) must be allowed")
}
if isAllowedWorkspaceInstanceType("p4d.24xlarge") {
t.Error("p4d.24xlarge (off allowlist, GPU/cost blowout) must NOT be allowed")
}
if isAllowedWorkspaceInstanceType("'; DROP TABLE workspaces;--") {
t.Error("SQL-injection-shaped garbage must NOT be allowed")
}
if isAllowedWorkspaceInstanceType("") {
t.Error("empty string is not a valid instance type for the membership check")
}
}
@@ -0,0 +1,168 @@
//go:build integration
// +build integration
// workspace_sizing_integration_test.go — REAL Postgres integration
// test for the per-workspace EC2 sizing override round-trip.
//
// Run with:
//
// INTEGRATION_DB_URL="postgres://dev:dev@localhost:5432/molecule?sslmode=disable" \
// go test -tags=integration ./internal/handlers/ -run Integration_WorkspaceSizing -v
//
// CI: piggybacks on handlers-postgres-integration.yml (path filter
// covers workspace-server/internal/handlers/** and migrations/**).
//
// Why this is NOT a sqlmock test
// ------------------------------
// sqlmock pins query SHAPE, not behaviour. Only a real Postgres with
// migration 20260515140000 applied can confirm:
//
// - The instance_type/disk_gb columns actually exist and accept the
// values the canvas Config tab writes.
// - A persisted override is read back by the SAME SELECT
// buildProvisionerConfig issues on the (re)provision path — i.e.
// the override actually reaches the CP request, not a silent drop
// (feedback_no_proxy_e2e_claims: prove the literal path).
// - The "no override" default leaves the columns NULL so the CP
// applies its own default (t3.large / 50GB).
//
// Per feedback_mandatory_local_e2e_before_ship: ship-mode requires the
// round-trip exercised against a real Postgres before the PR merges.
package handlers
import (
"context"
"database/sql"
"testing"
"github.com/google/uuid"
_ "github.com/lib/pq"
)
func integrationDB_WorkspaceSizing(t *testing.T) *sql.DB {
t.Helper()
conn := integrationDB_WorkspaceCreateName(t) // reuses INTEGRATION_DB_URL + skip
// Ensure the sizing columns exist (idempotent — the migration is
// the dev/CI default; this covers a pre-2026-05-15 snapshot DB).
if _, err := conn.ExecContext(context.Background(), `
ALTER TABLE workspaces
ADD COLUMN IF NOT EXISTS instance_type TEXT,
ADD COLUMN IF NOT EXISTS disk_gb INTEGER
`); err != nil {
t.Fatalf("ensure sizing columns: %v", err)
}
return conn
}
// TestIntegration_WorkspaceSizing_OverrideRoundTrips proves the full
// persisted path: an override written to the workspaces row (what the
// PATCH /workspaces/:id handler does) is read back exactly the way
// buildProvisionerConfig reads it on the (re)provision path.
func TestIntegration_WorkspaceSizing_OverrideRoundTrips(t *testing.T) {
conn := integrationDB_WorkspaceSizing(t)
ctx := context.Background()
const namePrefix = "sizing-itest-"
t.Cleanup(func() { cleanupTestRows(t, conn, namePrefix) })
id := uuid.NewString()
if _, err := conn.ExecContext(ctx,
`INSERT INTO workspaces (id, name, tier, status) VALUES ($1, $2, 4, 'online')`,
id, namePrefix+"override"); err != nil {
t.Fatalf("insert: %v", err)
}
// Simulate PATCH /workspaces/:id { instance_type, disk_gb }.
if _, err := conn.ExecContext(ctx,
`UPDATE workspaces SET instance_type = $2, disk_gb = $3 WHERE id = $1`,
id, "t3.xlarge", 120); err != nil {
t.Fatalf("patch sizing: %v", err)
}
// Read it back with the EXACT projection buildProvisionerConfig
// uses (workspace_provision.go) so this test fails if that query
// drifts away from the columns.
var dbDir, dbAccess, dbInstanceType string
var dbDiskGB sql.NullInt64
if err := conn.QueryRowContext(ctx,
`SELECT COALESCE(workspace_dir, ''), COALESCE(workspace_access, 'none'),
COALESCE(instance_type, ''), disk_gb
FROM workspaces WHERE id = $1`, id,
).Scan(&dbDir, &dbAccess, &dbInstanceType, &dbDiskGB); err != nil {
t.Fatalf("read back: %v", err)
}
if dbInstanceType != "t3.xlarge" {
t.Errorf("instance_type round-trip: got %q, want t3.xlarge", dbInstanceType)
}
if !dbDiskGB.Valid || dbDiskGB.Int64 != 120 {
t.Errorf("disk_gb round-trip: got %v, want 120", dbDiskGB)
}
}
// TestIntegration_WorkspaceSizing_DefaultIsNull proves the no-override
// path leaves both columns NULL, so buildProvisionerConfig sends empty
// fields and the CP applies its default (t3.large / 50GB) — never a
// stale or zero value misread as an override.
func TestIntegration_WorkspaceSizing_DefaultIsNull(t *testing.T) {
conn := integrationDB_WorkspaceSizing(t)
ctx := context.Background()
const namePrefix = "sizing-itest-"
t.Cleanup(func() { cleanupTestRows(t, conn, namePrefix) })
id := uuid.NewString()
if _, err := conn.ExecContext(ctx,
`INSERT INTO workspaces (id, name, tier, status) VALUES ($1, $2, 1, 'online')`,
id, namePrefix+"default"); err != nil {
t.Fatalf("insert: %v", err)
}
var instanceType sql.NullString
var diskGB sql.NullInt64
if err := conn.QueryRowContext(ctx,
`SELECT instance_type, disk_gb FROM workspaces WHERE id = $1`, id,
).Scan(&instanceType, &diskGB); err != nil {
t.Fatalf("read: %v", err)
}
if instanceType.Valid {
t.Errorf("fresh workspace instance_type should be NULL, got %q", instanceType.String)
}
if diskGB.Valid {
t.Errorf("fresh workspace disk_gb should be NULL, got %d", diskGB.Int64)
}
}
// TestIntegration_WorkspaceSizing_ClearReverts proves clearing the
// override (PATCH with null) returns the workspace to the CP default
// rather than pinning a previous value — the override is genuinely
// user-reversible.
func TestIntegration_WorkspaceSizing_ClearReverts(t *testing.T) {
conn := integrationDB_WorkspaceSizing(t)
ctx := context.Background()
const namePrefix = "sizing-itest-"
t.Cleanup(func() { cleanupTestRows(t, conn, namePrefix) })
id := uuid.NewString()
if _, err := conn.ExecContext(ctx,
`INSERT INTO workspaces (id, name, tier, status, instance_type, disk_gb)
VALUES ($1, $2, 4, 'online', 't3.2xlarge', 200)`,
id, namePrefix+"clear"); err != nil {
t.Fatalf("insert: %v", err)
}
// Simulate PATCH clearing both (handler maps "" / 0 / null → NULL).
if _, err := conn.ExecContext(ctx,
`UPDATE workspaces SET instance_type = NULL, disk_gb = NULL WHERE id = $1`, id); err != nil {
t.Fatalf("clear: %v", err)
}
var instanceType sql.NullString
var diskGB sql.NullInt64
if err := conn.QueryRowContext(ctx,
`SELECT instance_type, disk_gb FROM workspaces WHERE id = $1`, id,
).Scan(&instanceType, &diskGB); err != nil {
t.Fatalf("read: %v", err)
}
if instanceType.Valid || diskGB.Valid {
t.Errorf("after clear, expected both NULL; got instance_type=%v disk_gb=%v", instanceType, diskGB)
}
}
@@ -23,8 +23,8 @@ package models
// - claude-code: "sonnet" — Anthropic's CLI accepts the short
// name and resolves it via the operator's anthropic-oauth or
// ANTHROPIC_API_KEY chain.
// - everything else (hermes, langgraph, autogen, codex, openclaw,
// external, ""): a fully-qualified
// - everything else (hermes, langgraph, crewai, autogen, deepagents,
// codex, openclaw, gemini-cli, external, ""): a fully-qualified
// vendor:model slug that the universal MODEL_PROVIDER chain in
// molecule-core PR #247 can route via per-vendor required_env.
//
@@ -21,9 +21,12 @@ func TestDefaultModel(t *testing.T) {
// as a generic "unknown" failure.
{"hermes", "anthropic:claude-opus-4-7"},
{"langgraph", "anthropic:claude-opus-4-7"},
{"crewai", "anthropic:claude-opus-4-7"},
{"autogen", "anthropic:claude-opus-4-7"},
{"deepagents", "anthropic:claude-opus-4-7"},
{"codex", "anthropic:claude-opus-4-7"},
{"openclaw", "anthropic:claude-opus-4-7"},
{"gemini-cli", "anthropic:claude-opus-4-7"},
{"external", "anthropic:claude-opus-4-7"},
// Unknown / empty — fall through to universal default rather
@@ -152,12 +152,20 @@ func (p *CPProvisioner) adminAuthHeaders(req *http.Request) {
}
type cpProvisionRequest struct {
OrgID string `json:"org_id"`
WorkspaceID string `json:"workspace_id"`
Runtime string `json:"runtime"`
Tier int `json:"tier"`
PlatformURL string `json:"platform_url"`
Env map[string]string `json:"env"`
OrgID string `json:"org_id"`
WorkspaceID string `json:"workspace_id"`
Runtime string `json:"runtime"`
// Tier is the ACCESS model only (T4 = full root access). It does
// NOT drive sizing — see InstanceType / DiskGB.
Tier int `json:"tier"`
// InstanceType + DiskGB are the optional per-workspace sizing
// override. Omitted (empty / 0) → CP applies its default
// (t3.large / 50GB). The CP validates instance_type against its
// allowlist and returns 400 on an unsupported value.
InstanceType string `json:"instance_type,omitempty"`
DiskGB int32 `json:"disk_gb,omitempty"`
PlatformURL string `json:"platform_url"`
Env map[string]string `json:"env"`
// ConfigFiles are template + generated config files to write into the
// EC2 instance's /configs directory. OFFSEC-010: collected by
// collectCPConfigFiles which rejects symlinks and non-regular files
@@ -197,13 +205,15 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
}
req := cpProvisionRequest{
OrgID: p.orgID,
WorkspaceID: cfg.WorkspaceID,
Runtime: cfg.Runtime,
Tier: cfg.Tier,
PlatformURL: cfg.PlatformURL,
Env: env,
ConfigFiles: configFiles,
OrgID: p.orgID,
WorkspaceID: cfg.WorkspaceID,
Runtime: cfg.Runtime,
Tier: cfg.Tier,
InstanceType: cfg.InstanceType,
DiskGB: cfg.DiskGB,
PlatformURL: cfg.PlatformURL,
Env: env,
ConfigFiles: configFiles,
}
body, err := json.Marshal(req)
@@ -1062,3 +1062,77 @@ func TestCollectCPConfigFiles_RejectsRootSymlink(t *testing.T) {
t.Errorf("expected symlink-related error, got: %v", err)
}
}
// TestStart_ForwardsSizingOverride proves the per-workspace sizing
// override (instance_type + disk_gb) actually reaches the CP provision
// request — not a silent drop. This is the workspace-server half of
// the tier↔sizing decoupling: sizing is independent of Tier and is
// plumbed canvas → workspace-server → CP.
func TestStart_ForwardsSizingOverride(t *testing.T) {
var got cpProvisionRequest
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_ = json.NewDecoder(r.Body).Decode(&got)
w.WriteHeader(http.StatusCreated)
_, _ = io.WriteString(w, `{"instance_id":"i-size","state":"pending"}`)
}))
defer srv.Close()
p := &CPProvisioner{
baseURL: srv.URL,
orgID: "org-1",
sharedSecret: "s3cret",
httpClient: srv.Client(),
}
if _, err := p.Start(context.Background(), WorkspaceConfig{
WorkspaceID: "ws-size",
Runtime: "claude-code",
Tier: 4, // access tier — must NOT influence sizing
InstanceType: "t3.xlarge",
DiskGB: 120,
PlatformURL: "http://tenant",
}); err != nil {
t.Fatalf("Start: %v", err)
}
if got.InstanceType != "t3.xlarge" {
t.Errorf("CP request instance_type = %q, want t3.xlarge (override dropped)", got.InstanceType)
}
if got.DiskGB != 120 {
t.Errorf("CP request disk_gb = %d, want 120 (override dropped)", got.DiskGB)
}
// Tier still forwarded (access model), independently of sizing.
if got.Tier != 4 {
t.Errorf("CP request tier = %d, want 4 (access model must still forward)", got.Tier)
}
}
// TestStart_NoSizingOverrideOmitsFields proves the default path sends
// NO instance_type / disk_gb so the CP applies its own default
// (t3.large / 50GB) rather than receiving a zero-value that could be
// misread.
func TestStart_NoSizingOverrideOmitsFields(t *testing.T) {
var rawBody []byte
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rawBody, _ = io.ReadAll(r.Body)
w.WriteHeader(http.StatusCreated)
_, _ = io.WriteString(w, `{"instance_id":"i-def","state":"pending"}`)
}))
defer srv.Close()
p := &CPProvisioner{
baseURL: srv.URL, orgID: "org-1", sharedSecret: "s", httpClient: srv.Client(),
}
if _, err := p.Start(context.Background(), WorkspaceConfig{
WorkspaceID: "ws-def", Runtime: "python", Tier: 1, PlatformURL: "http://t",
}); err != nil {
t.Fatalf("Start: %v", err)
}
// omitempty on InstanceType/DiskGB → absent from the JSON entirely.
if strings.Contains(string(rawBody), "instance_type") {
t.Errorf("default request leaked instance_type into body: %s", rawBody)
}
if strings.Contains(string(rawBody), "disk_gb") {
t.Errorf("default request leaked disk_gb into body: %s", rawBody)
}
}
@@ -190,7 +190,7 @@ func TestEnsureLocalImage_RepoNotFound(t *testing.T) {
opts.HTTPClient = srv.Client()
opts.remoteHeadSha = nil // exercise real HTTP path
_, err := ensureLocalImageWithOpts(context.Background(), "hermes", opts)
_, err := ensureLocalImageWithOpts(context.Background(), "crewai", opts)
if err == nil {
t.Fatalf("expected error, got nil")
}
@@ -35,19 +35,6 @@ import (
// drift-risk #6.
var ErrNoBackend = errors.New("provisioner: no backend configured (zero-valued receiver)")
// ErrUnresolvableRuntime is returned by selectImage when a workspace
// names a runtime that has no resolvable image (not in RuntimeImages and
// no operator-pinned cfg.Image). RFC internal#483 + security review 4269:
// previously such a request silently fell through to DefaultImage
// (langgraph) — a user asking for crewai would get a langgraph container
// with no signal. The CTO standing directive
// (feedback_platform_must_hardgate_base_contract) is fail-closed: a
// named-but-unresolvable runtime must reject with a structured,
// runtime-naming error so the existing provision-failed notify/log path
// surfaces it, NOT silently degrade. The genuinely-unspecified (empty)
// runtime is still a distinct, legitimate path that keeps DefaultImage.
var ErrUnresolvableRuntime = errors.New("provisioner: requested runtime has no resolvable image")
// RuntimeImages maps runtime names to their Docker image refs.
// Each standalone template repo publishes its image via the reusable
// publish-template-image workflow in molecule-ci on every main merge.
@@ -91,12 +78,21 @@ const (
// WorkspaceConfig holds the parameters needed to provision a workspace container.
type WorkspaceConfig struct {
WorkspaceID string
TemplatePath string // Host path to template dir to copy from (e.g. claude-code-default/)
ConfigFiles map[string][]byte // Generated config files to write into /configs volume
PluginsPath string // Host path to plugins directory (mounted at /plugins)
WorkspacePath string // Host path to bind-mount as /workspace (if empty, uses Docker named volume)
Tier int
WorkspaceID string
TemplatePath string // Host path to template dir to copy from (e.g. claude-code-default/)
ConfigFiles map[string][]byte // Generated config files to write into /configs volume
PluginsPath string // Host path to plugins directory (mounted at /plugins)
WorkspacePath string // Host path to bind-mount as /workspace (if empty, uses Docker named volume)
Tier int
// InstanceType + DiskGB are the optional per-workspace,
// user-configurable EC2 sizing override (canvas Config tab,
// DB-backed). Empty / 0 = "use the CP default" (t3.large / 50GB).
// Decoupled from Tier by design — Tier is the ACCESS model only
// (T4 = full root access), it does NOT drive sizing. Only the
// SaaS CP provisioner path consumes these; the local Docker
// provisioner ignores them (sizing is an EC2 concept).
InstanceType string
DiskGB int32
Runtime string // "langgraph" (default) or "claude-code", "codex", "ollama", "custom"
EnvVars map[string]string // Additional env vars (API keys, etc.)
PlatformURL string
@@ -117,33 +113,20 @@ type WorkspaceConfig struct {
// selectImage resolves the final Docker image ref for a workspace. The handler
// layer is the source of truth — if it set cfg.Image (the digest-pinned form
// from runtime_image_pins, #2272), honor that. Otherwise fall back to the
// runtime→tag lookup in RuntimeImages (legacy `:latest` behavior).
//
// Fail-closed contract (RFC internal#483 / security review 4269 /
// feedback_platform_must_hardgate_base_contract): if the workspace NAMES a
// runtime that resolves to no image (not in RuntimeImages, no pinned
// cfg.Image), reject with ErrUnresolvableRuntime instead of silently
// substituting DefaultImage. Pre-fix, removing crewai/deepagents/gemini-cli
// from the catalog left those create requests silently provisioning a
// langgraph container — the user asked for crewai and got langgraph with no
// signal. The error propagates through Start → markProvisionFailed, which
// already broadcasts WorkspaceProvisionFailed and records the message.
//
// The genuinely-unspecified runtime (empty cfg.Runtime, e.g. an org template
// that doesn't pin one) is an intended distinct path and still resolves to
// DefaultImage — only a NAMED-but-unresolvable runtime is rejected.
func selectImage(cfg WorkspaceConfig) (string, error) {
// runtime→tag lookup in RuntimeImages (legacy `:latest` behavior). When the
// runtime isn't recognized either, fall back to DefaultImage so Start() still
// has something to hand Docker — surfacing a "No such image" later is more
// actionable than a silent "" panic in ContainerCreate.
func selectImage(cfg WorkspaceConfig) string {
if cfg.Image != "" {
return cfg.Image, nil
return cfg.Image
}
if cfg.Runtime != "" {
if img, ok := RuntimeImages[cfg.Runtime]; ok {
return img, nil
return img
}
return "", fmt.Errorf("%w: runtime %q (known runtimes: %v)",
ErrUnresolvableRuntime, cfg.Runtime, knownRuntimes)
}
return DefaultImage, nil
return DefaultImage
}
// Workspace-access constants for #65. Matches the CHECK constraint on
@@ -215,24 +198,6 @@ const containerNamePrefix = "ws-"
// (the wiped-DB case after `docker compose down -v`).
const LabelManaged = "molecule.platform.managed"
// AgentUID / AgentGID are the uid/gid of the unprivileged `agent` user that
// every workspace template creates and drops to via `gosu agent` before
// exec'ing the runtime (the a2a_mcp_server runs under this uid). The value is
// fixed at 1000:1000 across all templates — see:
// - workspace-configs-templates/claude-code-default/Dockerfile (`useradd -u 1000 ... agent`)
// - workspace-configs-templates/hermes/Dockerfile (`useradd -u 1000 ... agent`)
// - workspace/entrypoint.sh (`exec gosu agent` — "uid 1000")
//
// Files the platform injects into /configs AFTER the entrypoint's
// `chown -R agent:agent /configs` (the post-start #418 re-injection and the
// pre-start #1877 volume write) must be owned by this uid/gid, otherwise the
// agent-uid MCP server hits EACCES reading /configs/.auth_token, sends an
// empty bearer, and the platform 401s on /registry/{id}/peers (list_peers).
const (
AgentUID = 1000
AgentGID = 1000
)
// managedLabels is the canonical label map applied to every workspace
// container + volume. Pulled out so a future addition (e.g. instance
// UUID for multi-platform-shared-daemon disambiguation) is one edit.
@@ -362,15 +327,7 @@ func (p *Provisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, e
env := buildContainerEnv(cfg)
image, imgErr := selectImage(cfg)
if imgErr != nil {
// Fail-closed: a named-but-unresolvable runtime must not silently
// become DefaultImage (RFC internal#483 / review 4269). The caller's
// error path (markProvisionFailed) broadcasts the failure + records
// the message so the canvas surfaces it.
log.Printf("Provisioner: refusing to start %s: %v", cfg.WorkspaceID, imgErr)
return "", imgErr
}
image := selectImage(cfg)
// Local-build mode (issue #63 / Task #194): when MOLECULE_IMAGE_REGISTRY
// is unset, the OSS contributor path skips the registry pull entirely
@@ -914,18 +871,8 @@ func buildTemplateTar(templatePath string) (*bytes.Buffer, error) {
return &buf, nil
}
// buildConfigFilesTar builds the tar stream that WriteFilesToContainer streams
// into /configs via CopyToContainer. Every entry is stamped Uid/Gid = agent
// (AgentUID/AgentGID) so the files land agent-owned after extraction. This is
// the issue #418 post-start re-injection path: it runs AFTER the template
// entrypoint's `chown -R agent:agent /configs`, so without explicit ownership
// in the tar header the files extract as root:root (tar Uid/Gid default 0) and
// the agent-uid MCP server can no longer read /configs/.auth_token (and
// /configs/.platform_inbound_secret) → empty bearer → list_peers 401.
//
// Pulled out as a pure function so the ownership contract is unit-testable
// without a live Docker daemon (mirrors buildTemplateTar).
func buildConfigFilesTar(files map[string][]byte) (*bytes.Buffer, error) {
// WriteFilesToContainer writes in-memory files into /configs in the container.
func (p *Provisioner) WriteFilesToContainer(ctx context.Context, containerID string, files map[string][]byte) error {
var buf bytes.Buffer
tw := tar.NewWriter(&buf)
@@ -938,10 +885,8 @@ func buildConfigFilesTar(files map[string][]byte) (*bytes.Buffer, error) {
Typeflag: tar.TypeDir,
Name: dir + "/",
Mode: 0755,
Uid: AgentUID,
Gid: AgentGID,
}); err != nil {
return nil, fmt.Errorf("failed to write tar dir header for %s: %w", dir, err)
return fmt.Errorf("failed to write tar dir header for %s: %w", dir, err)
}
createdDirs[dir] = true
}
@@ -950,30 +895,19 @@ func buildConfigFilesTar(files map[string][]byte) (*bytes.Buffer, error) {
Name: name,
Mode: 0644,
Size: int64(len(data)),
Uid: AgentUID,
Gid: AgentGID,
}
if err := tw.WriteHeader(header); err != nil {
return nil, fmt.Errorf("failed to write tar header for %s: %w", name, err)
return fmt.Errorf("failed to write tar header for %s: %w", name, err)
}
if _, err := tw.Write(data); err != nil {
return nil, fmt.Errorf("failed to write tar data for %s: %w", name, err)
return fmt.Errorf("failed to write tar data for %s: %w", name, err)
}
}
if err := tw.Close(); err != nil {
return nil, fmt.Errorf("failed to close tar writer: %w", err)
return fmt.Errorf("failed to close tar writer: %w", err)
}
return &buf, nil
}
// WriteFilesToContainer writes in-memory files into /configs in the container,
// agent-owned (see buildConfigFilesTar).
func (p *Provisioner) WriteFilesToContainer(ctx context.Context, containerID string, files map[string][]byte) error {
buf, err := buildConfigFilesTar(files)
if err != nil {
return err
}
return p.cli.CopyToContainer(ctx, containerID, "/configs", buf, container.CopyToContainerOptions{})
return p.cli.CopyToContainer(ctx, containerID, "/configs", &buf, container.CopyToContainerOptions{})
}
// CopyToContainer exposes CopyToContainer from the Docker client for use by other packages.
@@ -1063,28 +997,13 @@ func (p *Provisioner) ReadFromVolume(ctx context.Context, volumeName, filePath s
return clean, nil
}
// writeAuthTokenVolumeCmd is the shell command the throwaway alpine container
// runs to seed /vol/.auth_token. alpine runs it as root, so without the
// explicit `chown 1000:1000` the file stays root:root after the template
// entrypoint's `chown -R agent:agent /configs` has already run — the agent-uid
// (AgentUID) MCP server then gets EACCES reading it → empty bearer →
// list_peers 401. Pulled out as a pure function so the ownership contract is
// unit-testable without a live Docker daemon. Issue #1877.
func writeAuthTokenVolumeCmd() string {
return fmt.Sprintf(
"mkdir -p /vol && printf '%%s' $TOKEN > /vol/.auth_token && chmod 0600 /vol/.auth_token && chown %d:%d /vol/.auth_token",
AgentUID, AgentGID,
)
}
// WriteAuthTokenToVolume writes the workspace auth token into the config volume
// BEFORE the container starts, eliminating the token-injection race window where
// a restarted container could read a stale token from /configs/.auth_token before
// WriteFilesToContainer writes the new one. Issue #1877.
//
// Uses a throwaway alpine container to write directly to the named volume,
// bypassing the container lifecycle entirely. The written file is chowned to
// the agent uid/gid (see writeAuthTokenVolumeCmd).
// bypassing the container lifecycle entirely.
func (p *Provisioner) WriteAuthTokenToVolume(ctx context.Context, workspaceID, token string) error {
if p == nil || p.cli == nil {
return ErrNoBackend
@@ -1092,7 +1011,7 @@ func (p *Provisioner) WriteAuthTokenToVolume(ctx context.Context, workspaceID, t
volName := ConfigVolumeName(workspaceID)
resp, err := p.cli.ContainerCreate(ctx, &container.Config{
Image: "alpine",
Cmd: []string{"sh", "-c", writeAuthTokenVolumeCmd()},
Cmd: []string{"sh", "-c", "mkdir -p /vol && printf '%s' $TOKEN > /vol/.auth_token && chmod 0600 /vol/.auth_token"},
Env: []string{"TOKEN=" + token},
}, &container.HostConfig{
Binds: []string{volName + ":/vol"},
@@ -513,10 +513,7 @@ func TestWorkspaceConfig_ResetClaudeSessionFieldPresent(t *testing.T) {
// we lose the "one bad publish doesn't break every workspace" guarantee.
func TestSelectImage_PrefersExplicitImage(t *testing.T) {
pinned := "ghcr.io/molecule-ai/workspace-template-claude-code@sha256:3d6761a97ed07d7d33cfc19a8fbab81175d9d9179618d493dbc00c5f7ef076a3"
got, err := selectImage(WorkspaceConfig{Runtime: "claude-code", Image: pinned})
if err != nil {
t.Fatalf("selectImage with cfg.Image=pinned: unexpected error %v", err)
}
got := selectImage(WorkspaceConfig{Runtime: "claude-code", Image: pinned})
if got != pinned {
t.Errorf("selectImage with cfg.Image=pinned: got %q, want %q", got, pinned)
}
@@ -526,46 +523,28 @@ func TestSelectImage_PrefersExplicitImage(t *testing.T) {
// pin lookup deliberately bypassed via WORKSPACE_IMAGE_LOCAL_OVERRIDE).
// selectImage must use the legacy runtime→:latest map.
func TestSelectImage_FallsBackToRuntimeMap(t *testing.T) {
got, err := selectImage(WorkspaceConfig{Runtime: "claude-code", Image: ""})
if err != nil {
t.Fatalf("selectImage with empty Image: unexpected error %v", err)
}
got := selectImage(WorkspaceConfig{Runtime: "claude-code", Image: ""})
want := RuntimeImages["claude-code"]
if got != want {
t.Errorf("selectImage with empty Image: got %q, want %q", got, want)
}
}
// TestSelectImage_NamedUnresolvableRuntimeRejects pins the fail-closed
// contract (RFC internal#483 / security review 4269 /
// feedback_platform_must_hardgate_base_contract): a NAMED runtime with no
// resolvable image must reject with ErrUnresolvableRuntime, NOT silently
// substitute DefaultImage. Pre-fix this returned langgraph — a user asking
// for a removed runtime (crewai/deepagents/gemini-cli) silently got a
// langgraph container. "crewai" is the concrete regression from the
// security finding.
func TestSelectImage_NamedUnresolvableRuntimeRejects(t *testing.T) {
for _, rt := range []string{"no-such-runtime", "crewai", "deepagents", "gemini-cli"} {
got, err := selectImage(WorkspaceConfig{Runtime: rt})
if !errors.Is(err, ErrUnresolvableRuntime) {
t.Errorf("selectImage(%q): got err %v, want ErrUnresolvableRuntime", rt, err)
}
if got != "" {
t.Errorf("selectImage(%q): got image %q, want \"\" on reject", rt, got)
}
if err != nil && !strings.Contains(err.Error(), rt) {
t.Errorf("selectImage(%q): error must name the offending runtime, got %v", rt, err)
}
// TestSelectImage_UnknownRuntimeFallsBackToDefault preserves today's
// behavior — an unrecognized runtime resolves to DefaultImage rather than
// "" so ContainerCreate gets a usable arg and surfaces a meaningful
// "No such image" error if the default itself is missing.
func TestSelectImage_UnknownRuntimeFallsBackToDefault(t *testing.T) {
got := selectImage(WorkspaceConfig{Runtime: "no-such-runtime"})
if got != DefaultImage {
t.Errorf("selectImage with unknown runtime: got %q, want DefaultImage %q", got, DefaultImage)
}
}
// TestSelectImage_EmptyRuntimeFallsBackToDefault: same invariant for the
// no-runtime-supplied path (legacy callers / older handler code).
func TestSelectImage_EmptyRuntimeFallsBackToDefault(t *testing.T) {
got, err := selectImage(WorkspaceConfig{})
if err != nil {
t.Fatalf("selectImage with zero cfg: unexpected error %v (empty runtime is a legitimate DefaultImage path)", err)
}
got := selectImage(WorkspaceConfig{})
if got != DefaultImage {
t.Errorf("selectImage with zero cfg: got %q, want DefaultImage %q", got, DefaultImage)
}
@@ -829,7 +808,7 @@ func TestIsImageNotFoundErr(t *testing.T) {
{"nil", nil, false},
{"moby no such image", fmtErr(`Error response from daemon: No such image: workspace-template:openclaw`), true},
{"no such image lowercase", fmtErr(`error: no such image: foo:bar`), true},
{"image not found", fmtErr(`Error: image "workspace-template:hermes" not found`), true},
{"image not found", fmtErr(`Error: image "workspace-template:crewai" not found`), true},
{"generic not found without image", fmtErr(`container not found`), false},
{"unrelated error", fmtErr(`connection refused`), false},
{"permission denied", fmtErr(`permission denied`), false},
@@ -21,6 +21,9 @@ var knownRuntimes = []string{
"autogen",
"claude-code",
"codex",
"crewai",
"deepagents",
"gemini-cli",
"hermes",
"langgraph",
"openclaw",
@@ -53,8 +53,8 @@ func TestRuntimeImage_AllKnownRuntimes(t *testing.T) {
}
}
// Pin the count so adding a runtime requires explicit test acknowledgement.
if len(knownRuntimes) != 6 {
t.Errorf("knownRuntimes length = %d, want 6 (autogen, claude-code, codex, hermes, langgraph, openclaw)", len(knownRuntimes))
if len(knownRuntimes) != 9 {
t.Errorf("knownRuntimes length = %d, want 9 (autogen, claude-code, codex, crewai, deepagents, gemini-cli, hermes, langgraph, openclaw)", len(knownRuntimes))
}
}
@@ -1,95 +0,0 @@
package provisioner
import (
"archive/tar"
"errors"
"io"
"strings"
"testing"
)
// These tests pin the P0 fix for the fleet-wide list_peers 401 (Hermes and
// every other template): the workspace-server token-injection paths wrote
// /configs/.auth_token (and /configs/.platform_inbound_secret) as root:root
// AFTER the template entrypoint's `chown -R agent:agent /configs` ran, so the
// agent-uid (1000) MCP server (a2a_mcp_server, running via `gosu agent`) hit
// `[Errno 13] Permission denied` reading the bearer → empty bearer → platform
// 401 on /registry/{id}/peers (the literal tool_list_peers path).
//
// The agent uid is 1000:1000, verified from the templates:
// - workspace-configs-templates/claude-code-default/Dockerfile: `useradd -u 1000 ... agent`
// - workspace-configs-templates/hermes/Dockerfile: `useradd -u 1000 ... agent`
// - workspace/entrypoint.sh / claude-code-default/entrypoint.sh: `exec gosu agent` ("uid 1000")
//
// Both tests assert the real artifact (the tar headers Docker's CopyToContainer
// honours for ownership, and the literal shell command the throwaway alpine
// container runs), not a mock that bypasses ownership. They FAIL on pre-fix
// code (no Uid/Gid in tar headers; no chown in the alpine command → root:root)
// and PASS post-fix (agent-owned).
// TestWriteFilesToContainerTar_FilesAreAgentOwned covers the issue #418
// post-start re-injection path (WriteFilesToContainer): the tar it streams
// into /configs via CopyToContainer must carry Uid/Gid = agent (1000) so the
// extracted files land agent-readable, not root:root. This is the path that
// (re)writes BOTH .auth_token and .platform_inbound_secret on a cadence.
func TestWriteFilesToContainerTar_FilesAreAgentOwned(t *testing.T) {
files := map[string][]byte{
".auth_token": []byte("tok-abc123"),
".platform_inbound_secret": []byte("inbound-secret-xyz"),
"nested/dir/file.txt": []byte("data"),
}
buf, err := buildConfigFilesTar(files)
if err != nil {
t.Fatalf("buildConfigFilesTar: %v", err)
}
tr := tar.NewReader(buf)
seen := map[string]bool{}
for {
hdr, err := tr.Next()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
t.Fatalf("read tar: %v", err)
}
if _, err := io.Copy(io.Discard, tr); err != nil {
t.Fatalf("drain %s: %v", hdr.Name, err)
}
seen[hdr.Name] = true
if hdr.Uid != AgentUID {
t.Fatalf("tar entry %q Uid = %d, want %d (agent) — root-owned injection causes the list_peers 401",
hdr.Name, hdr.Uid, AgentUID)
}
if hdr.Gid != AgentGID {
t.Fatalf("tar entry %q Gid = %d, want %d (agent)", hdr.Name, hdr.Gid, AgentGID)
}
}
for _, want := range []string{".auth_token", ".platform_inbound_secret"} {
if !seen[want] {
t.Fatalf("tar missing %q (seen: %v)", want, seen)
}
}
}
// TestWriteAuthTokenVolumeCmd_ChownsToAgent covers the issue #1877 pre-start
// volume-write path (WriteAuthTokenToVolume): the throwaway alpine container
// writes /vol/.auth_token then chmod 0600 but, pre-fix, never chowns it, so it
// stays root:root (alpine runs the command as root). The literal command must
// chown the file to the agent uid:gid so the agent-uid MCP server can read it.
func TestWriteAuthTokenVolumeCmd_ChownsToAgent(t *testing.T) {
cmd := writeAuthTokenVolumeCmd()
if !strings.Contains(cmd, "chmod 0600 /vol/.auth_token") {
t.Fatalf("alpine cmd lost the 0600 chmod (regression): %q", cmd)
}
wantChown := "chown 1000:1000 /vol/.auth_token"
if !strings.Contains(cmd, wantChown) {
t.Fatalf("alpine cmd = %q, missing %q — without it .auth_token stays root:root "+
"and the agent-uid MCP server gets EACCES → empty bearer → list_peers 401",
cmd, wantChown)
}
}
@@ -0,0 +1,3 @@
ALTER TABLE workspaces
DROP COLUMN IF EXISTS instance_type,
DROP COLUMN IF EXISTS disk_gb;
@@ -0,0 +1,23 @@
-- Per-workspace, user-configurable EC2 sizing override.
--
-- Sizing is ORTHOGONAL to the access tier. The access `tier` column
-- (T1..T4; T4 = full root-level access to the dedicated EC2) controls
-- how much of the box the agent can touch — it has NOTHING to do with
-- how big the box is. Sizing used to be wrongly derived from tier in
-- the control-plane provisioner (workspaceTierResources); that coupling
-- is removed (controlplane PR #173). These columns carry the optional
-- user override the canvas Config tab sets.
--
-- NULL / 0 = "no override — use the CP default" (t3.large + 50GB).
-- The CP clamps instance_type to its allowlist (t3.medium..t3.2xlarge)
-- and disk_gb to [30, 500]; these columns are the persisted intent,
-- the CP is the enforcement point.
--
-- Resize semantics: provision-time only. AWS cannot change instance
-- type live (needs stop/start) and cannot shrink EBS in place — the
-- new spec takes effect on the next provision/restart, surfaced in the
-- canvas Config copy.
ALTER TABLE workspaces
ADD COLUMN IF NOT EXISTS instance_type TEXT,
ADD COLUMN IF NOT EXISTS disk_gb INTEGER;
-47
View File
@@ -431,43 +431,6 @@ def _is_self_notify_row(row: dict[str, Any]) -> bool:
return source_id is None or source_id == ""
def _is_self_echo_row(row: dict[str, Any], workspace_id: str) -> bool:
"""Return True if ``row`` is a self-originated a2a_receive row.
Internal #469: when a workspace delegates to a target that never picks
up the task, ``tool_delegate_task`` calls ``report_activity`` which
POSTs to the platform with source_id set to the *sender's* workspace
UUID (mandated by spoof-defense in workspace-server's a2a_proxy). The
activity API exposes that row under type=a2a_receive, so the inbox
poller re-fetches it. Without this guard the row is surfaced as
kind='peer_agent' with the workspace's own identity as peer_id —
the workspace sees its own delegation-failure echoed back as if a
peer had delegated to it.
The guard mirrors the existing _is_self_notify_row pattern: both
skip rows that would otherwise create spurious inbound signal. The
long-term fix (making the platform write a distinct activity_type
for agent-outbound rows) is tracked separately; this guard stays
because it only excludes rows the agent never wants.
``workspace_id`` must be non-empty — an empty-string workspace_id
(single-workspace legacy path) can never match a UUID source_id, so
the predicate is always False there, which is safe.
RFC #2829 PR-2 note: rows with method="delegate_result" are excluded
from the self-echo guard even when source_id matches our workspace_id.
The platform may write a delegation-result row with source_id set to
our workspace_id (e.g. a self-delegation or edge case in the platform's
result-writing path). Such rows must reach the inbox so that
message_from_activity can surface them as peer_agent inbound and the
runtime receives the delegation result. Silently filtering them as
self-echo would break delegation result delivery.
"""
if not workspace_id:
return False
return row.get("source_id") == workspace_id and row.get("method") != "delegate_result"
def message_from_activity(row: dict[str, Any]) -> InboxMessage:
"""Convert one /activity row into an InboxMessage.
@@ -660,16 +623,6 @@ def _poll_once(
# the same self-notify on every iteration.
last_id = str(row.get("id", "")) or last_id
continue
if _is_self_echo_row(row, workspace_id):
# Internal #469: tool_delegate_task writes its own a2a_receive
# row with source_id = this workspace's UUID (spoof-defense).
# The poll fetches it back as kind='peer_agent', making the
# workspace echo its own delegation-failure as an inbound from
# a phantom peer. Skip it — the real delegation-result path
# (delegate_result push) is separate and unaffected. Cursor
# still advances so the next poll doesn't re-seen this row.
last_id = str(row.get("id", "")) or last_id
continue
message = message_from_activity(row)
if not message.activity_id:
continue
-145
View File
@@ -495,151 +495,6 @@ def test_poll_once_skips_self_notify_rows(state: inbox.InboxState):
assert [m.activity_id for m in queue] == ["act-real"]
# ---------------------------------------------------------------------------
# _is_self_echo_row — internal #469 fix
# ---------------------------------------------------------------------------
#
# When a workspace delegates to a target that never picks up the task,
# tool_delegate_task calls report_activity("a2a_receive", ...) which POSTs
# to the platform with source_id set to the *sender's* workspace UUID
# (spoof-defense). The activity API returns that row under type=a2a_receive
# on the next poll, so message_from_activity sets peer_id = workspace's own
# UUID — the workspace sees its own delegation-failure as an inbound from
# a phantom peer. _is_self_echo_row guards against this.
#
# Internal #469 was live-reproduced on hongming.moleculesai.app 2026-05-16.
def test_is_self_echo_row_true_when_source_id_matches_workspace():
row = {"source_id": "ws-abc123", "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-abc123") is True
def test_is_self_echo_row_false_when_source_id_differs():
"""A real peer agent (different workspace_id) must NOT be filtered."""
row = {"source_id": "ws-peer", "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_is_self_echo_row_false_when_source_id_is_none():
"""Canvas-user inbound has no source_id — never an echo."""
row = {"source_id": None, "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_is_self_echo_row_false_when_workspace_id_is_empty():
"""Single-workspace legacy path with empty workspace_id cannot
match a UUID source_id — predicate is always False, which is safe."""
row = {"source_id": "ws-abc123", "method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "") is False
def test_is_self_echo_row_false_when_source_id_key_absent():
row = {"method": "a2a_receive"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_is_self_echo_row_false_for_delegate_result():
"""RFC #2829 PR-2 regression pin: a row with source_id matching our
workspace_id but method=delegate_result must NOT be filtered as a
self-echo. The platform may write a delegation-result row with our
workspace_id as source_id; such rows must reach the inbox so the
runtime receives the delegation result. Silently filtering them would
break delegate_result delivery."""
row = {"source_id": "ws-1", "method": "delegate_result"}
assert inbox._is_self_echo_row(row, "ws-1") is False
def test_poll_once_skips_self_echo_rows(state: inbox.InboxState):
"""Internal #469 regression pin: a row with source_id matching our
workspace_id must NOT land in the inbox queue — it is our own
delegation-report echoing back, not a real peer inbound."""
rows = [
{
"id": "act-real-peer",
"source_id": "ws-peer",
"method": "a2a_receive",
"summary": None,
"request_body": {"parts": [{"type": "text", "text": "real peer inbound"}]},
"created_at": "2026-04-30T22:00:00Z",
},
{
"id": "act-self-echo",
"source_id": "ws-1",
"method": "a2a_receive",
"summary": "task result: target timed out",
"request_body": None,
"created_at": "2026-04-30T22:00:01Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
# Only the real peer inbound counted; self-echo silently dropped.
assert n == 1
queue = state.peek(10)
assert [m.activity_id for m in queue] == ["act-real-peer"]
assert queue[0].peer_id == "ws-peer"
def test_poll_once_advances_cursor_past_self_echo(state: inbox.InboxState):
"""Cursor must advance past self-echo rows even though we don't
enqueue them. Otherwise the next poll re-fetches the same self-echo
on every iteration, wasting requests and blocking real inbound."""
state.save_cursor("act-old")
rows = [
{
"id": "act-self-echo",
"source_id": "ws-1",
"method": "a2a_receive",
"summary": "task result: timeout",
"request_body": None,
"created_at": "2026-04-30T22:00:00Z",
},
]
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
n = inbox._poll_once(state, "http://platform", "ws-1", {})
assert n == 0
assert state.peek(10) == []
# Cursor must move past the skipped row so we don't re-poll it.
assert state.load_cursor() == "act-self-echo"
def test_poll_once_self_echo_does_not_fire_notification(state: inbox.InboxState):
"""The notification callback (channel push to Claude Code etc.)
must not fire for self-echo rows. Same rationale as self-notify:
push-capable hosts would see the echo loop on the push channel."""
rows = [
{
"id": "act-self-echo",
"source_id": "ws-1",
"method": "a2a_receive",
"summary": "task result: timeout",
"request_body": None,
"created_at": "2026-04-30T22:00:00Z",
},
]
received: list[dict] = []
inbox.set_notification_callback(received.append)
try:
resp = _make_response(200, rows)
p, _ = _patch_httpx(resp)
with p:
inbox._poll_once(state, "http://platform", "ws-1", {})
finally:
inbox.set_notification_callback(None)
assert received == [], (
"self-echo rows must not surface as MCP notifications — "
"doing so re-creates the echo loop on push-capable hosts"
)
def test_poll_once_advances_cursor_past_self_notify(state: inbox.InboxState):
"""Cursor must advance past self-notify rows even though we don't
enqueue them. Otherwise the next poll re-fetches the same self-