diff --git a/canvas/src/components/tabs/ChatTab.tsx b/canvas/src/components/tabs/ChatTab.tsx
index 7f05270b..055d7e00 100644
--- a/canvas/src/components/tabs/ChatTab.tsx
+++ b/canvas/src/components/tabs/ChatTab.tsx
@@ -962,6 +962,32 @@ function MyChatPanel({ workspaceId, data }: Props) {
)}
+ {/* talk_to_user disabled banner — shown when the workspace has
+ talk_to_user_enabled=false. The agent cannot send canvas messages;
+ the user can re-enable the ability from here without opening settings. */}
+ {data.talkToUserEnabled === false && (
+
+
+
+ Agent is not enabled to chat with you.
+
+
+
+ )}
{/* Messages */}
{loading && (
diff --git a/canvas/src/store/canvas-topology.ts b/canvas/src/store/canvas-topology.ts
index 12a1cc45..1bed943b 100644
--- a/canvas/src/store/canvas-topology.ts
+++ b/canvas/src/store/canvas-topology.ts
@@ -519,6 +519,10 @@ export function buildNodesAndEdges(
// #2054 — server-declared per-workspace provisioning timeout.
// Falls through to the runtime profile when null/absent.
provisionTimeoutMs: ws.provision_timeout_ms ?? null,
+ // Workspace abilities — defaults preserved for old platform versions
+ // that don't yet include these columns in the GET response.
+ broadcastEnabled: ws.broadcast_enabled ?? false,
+ talkToUserEnabled: ws.talk_to_user_enabled ?? true,
},
};
if (hasParent) {
diff --git a/canvas/src/store/canvas.ts b/canvas/src/store/canvas.ts
index 38129468..1baa0e66 100644
--- a/canvas/src/store/canvas.ts
+++ b/canvas/src/store/canvas.ts
@@ -99,6 +99,13 @@ export interface WorkspaceNodeData extends Record {
* @/lib/runtimeProfiles. Lets a slow runtime declare its cold-boot
* expectation without a canvas release. */
provisionTimeoutMs?: number | null;
+ /** When true the workspace may POST /broadcast to send org-wide messages.
+ * Default false. Toggled by user/admin via PATCH /workspaces/:id/abilities. */
+ broadcastEnabled?: boolean;
+ /** When false the workspace cannot deliver canvas chat messages.
+ * send_message_to_user / POST /notify return 403 and the canvas
+ * shows a "not enabled" state with a button to re-enable. Default true. */
+ talkToUserEnabled?: boolean;
}
export type PanelTab = "details" | "skills" | "chat" | "terminal" | "config" | "schedule" | "channels" | "files" | "memory" | "traces" | "events" | "activity" | "audit";
diff --git a/canvas/src/store/socket.ts b/canvas/src/store/socket.ts
index 81114ae9..7b2adcd3 100644
--- a/canvas/src/store/socket.ts
+++ b/canvas/src/store/socket.ts
@@ -299,6 +299,9 @@ export interface WorkspaceData {
* `@/lib/runtimeProfiles` when absent (the default behavior for any
* template that hasn't yet declared the field). */
provision_timeout_ms?: number | null;
+ /** Workspace ability flags (migration 20260514). */
+ broadcast_enabled?: boolean;
+ talk_to_user_enabled?: boolean;
}
let socket: ReconnectingSocket | null = null;
diff --git a/tests/e2e/test_workspace_abilities_e2e.sh b/tests/e2e/test_workspace_abilities_e2e.sh
new file mode 100755
index 00000000..72a32c51
--- /dev/null
+++ b/tests/e2e/test_workspace_abilities_e2e.sh
@@ -0,0 +1,296 @@
+#!/usr/bin/env bash
+# E2E test: workspace broadcast and talk-to-user platform abilities.
+#
+# What this proves:
+# 1. talk_to_user_enabled (default true) — POST /notify works out-of-the-box.
+# 2. PATCH /workspaces/:id/abilities { talk_to_user_enabled: false } disables
+# delivery: /notify → 403 with error="talk_to_user_disabled" + delegate hint.
+# 3. Re-enabling talk_to_user_enabled restores delivery.
+# 4. broadcast_enabled (default false) — POST /broadcast → 403 when disabled.
+# 5. PATCH { broadcast_enabled: true } enables fan-out.
+# 6. POST /broadcast delivers to all non-sender, non-removed workspaces:
+# - Returns {"status":"sent","delivered":N}
+# - Receiver's activity log has a broadcast_receive entry with the message.
+# - Sender's activity log has a broadcast_sent entry.
+# 7. The sender itself does NOT receive a broadcast_receive entry.
+#
+# Usage: tests/e2e/test_workspace_abilities_e2e.sh
+# Prereqs: workspace-server on http://localhost:8080, MOLECULE_ENV != production
+
+set -euo pipefail
+
+source "$(dirname "$0")/_lib.sh"
+
+PASS=0
+FAIL=0
+SENDER_ID=""
+RECEIVER_ID=""
+
+cleanup() {
+ for wid in "$SENDER_ID" "$RECEIVER_ID"; do
+ if [ -n "$wid" ]; then
+ curl -s -X DELETE "$BASE/workspaces/$wid?confirm=true" > /dev/null || true
+ fi
+ done
+}
+trap cleanup EXIT INT TERM
+
+assert() {
+ local label="$1" actual="$2" expected="$3"
+ if [ "$actual" = "$expected" ]; then
+ echo " PASS — $label"
+ PASS=$((PASS+1))
+ else
+ echo " FAIL — $label"
+ echo " expected: $expected"
+ echo " actual: $actual"
+ FAIL=$((FAIL+1))
+ fi
+}
+
+assert_contains() {
+ local label="$1" haystack="$2" needle="$3"
+ if echo "$haystack" | grep -qF "$needle"; then
+ echo " PASS — $label"
+ PASS=$((PASS+1))
+ else
+ echo " FAIL — $label"
+ echo " needle: $needle"
+ echo " haystack: $haystack"
+ FAIL=$((FAIL+1))
+ fi
+}
+
+assert_not_contains() {
+ local label="$1" haystack="$2" needle="$3"
+ if ! echo "$haystack" | grep -qF "$needle"; then
+ echo " PASS — $label"
+ PASS=$((PASS+1))
+ else
+ echo " FAIL — $label (unexpected match)"
+ echo " needle: $needle"
+ echo " haystack: $haystack"
+ FAIL=$((FAIL+1))
+ fi
+}
+
+# ── Pre-sweep: remove any stale leftover workspaces from a prior aborted run ──
+echo "=== Setup ==="
+for NAME in "Abilities Sender" "Abilities Receiver"; do
+ PRIOR=$(curl -s "$BASE/workspaces" | python3 -c "
+import json, sys
+try:
+ print(' '.join(w['id'] for w in json.load(sys.stdin) if w.get('name') == '$NAME'))
+except Exception:
+ pass
+")
+ for _wid in $PRIOR; do
+ echo "Sweeping leftover '$NAME' workspace: $_wid"
+ curl -s -X DELETE "$BASE/workspaces/$_wid?confirm=true" > /dev/null || true
+ done
+done
+
+R=$(curl -s -X POST "$BASE/workspaces" -H "Content-Type: application/json" \
+ -d '{"name":"Abilities Sender","tier":1}')
+SENDER_ID=$(echo "$R" | python3 -c 'import json,sys;print(json.load(sys.stdin)["id"])' 2>/dev/null || true)
+[ -n "$SENDER_ID" ] || { echo "Failed to create sender workspace: $R"; exit 1; }
+echo "Created sender workspace: $SENDER_ID"
+
+R=$(curl -s -X POST "$BASE/workspaces" -H "Content-Type: application/json" \
+ -d '{"name":"Abilities Receiver","tier":1}')
+RECEIVER_ID=$(echo "$R" | python3 -c 'import json,sys;print(json.load(sys.stdin)["id"])' 2>/dev/null || true)
+[ -n "$RECEIVER_ID" ] || { echo "Failed to create receiver workspace: $R"; exit 1; }
+echo "Created receiver workspace: $RECEIVER_ID"
+
+# Mint workspace-scoped bearer tokens (test-only endpoint, disabled in prod).
+SENDER_TOKEN=$(e2e_mint_test_token "$SENDER_ID")
+[ -n "$SENDER_TOKEN" ] || { echo "Failed to mint sender token"; exit 1; }
+SENDER_AUTH="Authorization: Bearer $SENDER_TOKEN"
+
+# Admin token — any live workspace bearer satisfies AdminAuth in local dev.
+# In production-like envs, set MOLECULE_ADMIN_TOKEN.
+ADMIN_TOKEN="${MOLECULE_ADMIN_TOKEN:-$SENDER_TOKEN}"
+ADMIN_AUTH="Authorization: Bearer $ADMIN_TOKEN"
+
+# ─────────────────────────────────────────────────────────────────────────────
+echo ""
+echo "=== Part 1: talk_to_user ability ==="
+
+echo ""
+echo "--- 1a: /notify works with default talk_to_user_enabled=true ---"
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
+ -H "Content-Type: application/json" -H "$SENDER_AUTH" \
+ -d '{"message":"Hello from sender"}')
+assert "POST /notify returns 200 when talk_to_user_enabled=true (default)" "$CODE" "200"
+
+echo ""
+echo "--- 1b: Disable talk_to_user ---"
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
+ -H "Content-Type: application/json" -H "$ADMIN_AUTH" \
+ -d '{"talk_to_user_enabled": false}')
+assert "PATCH /abilities talk_to_user_enabled=false returns 200" "$CODE" "200"
+
+# Verify the flag is reflected in the workspace GET response.
+WS=$(curl -s "$BASE/workspaces/$SENDER_ID" -H "$SENDER_AUTH")
+FLAG=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("talk_to_user_enabled","MISSING"))')
+assert "GET /workspaces/:id reflects talk_to_user_enabled=false" "$FLAG" "False"
+
+echo ""
+echo "--- 1c: /notify blocked when talk_to_user disabled ---"
+BODY=$(curl -s -w "" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
+ -H "Content-Type: application/json" -H "$SENDER_AUTH" \
+ -d '{"message":"Should be blocked"}')
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
+ -H "Content-Type: application/json" -H "$SENDER_AUTH" \
+ -d '{"message":"Should be blocked"}')
+assert "POST /notify returns 403 when talk_to_user_enabled=false" "$CODE" "403"
+
+ERR=$(echo "$BODY" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("error",""))' 2>/dev/null || echo "")
+assert_contains "403 body contains talk_to_user_disabled error code" "$ERR" "talk_to_user_disabled"
+
+HINT=$(echo "$BODY" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("hint",""))' 2>/dev/null || echo "")
+assert_contains "403 body contains delegate_task hint" "$HINT" "delegate_task"
+
+echo ""
+echo "--- 1d: Re-enable talk_to_user and verify /notify works again ---"
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
+ -H "Content-Type: application/json" -H "$ADMIN_AUTH" \
+ -d '{"talk_to_user_enabled": true}')
+assert "PATCH /abilities talk_to_user_enabled=true returns 200" "$CODE" "200"
+
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
+ -H "Content-Type: application/json" -H "$SENDER_AUTH" \
+ -d '{"message":"Re-enabled, should work"}')
+assert "POST /notify returns 200 after re-enabling talk_to_user" "$CODE" "200"
+
+# ─────────────────────────────────────────────────────────────────────────────
+echo ""
+echo "=== Part 2: broadcast ability ==="
+
+echo ""
+echo "--- 2a: Broadcast blocked by default (broadcast_enabled=false) ---"
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/broadcast" \
+ -H "Content-Type: application/json" -H "$SENDER_AUTH" \
+ -d '{"message":"Should be blocked"}')
+assert "POST /broadcast returns 403 when broadcast_enabled=false (default)" "$CODE" "403"
+
+echo ""
+echo "--- 2b: Enable broadcast ---"
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
+ -H "Content-Type: application/json" -H "$ADMIN_AUTH" \
+ -d '{"broadcast_enabled": true}')
+assert "PATCH /abilities broadcast_enabled=true returns 200" "$CODE" "200"
+
+WS=$(curl -s "$BASE/workspaces/$SENDER_ID" -H "$SENDER_AUTH")
+FLAG=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("broadcast_enabled","MISSING"))')
+assert "GET /workspaces/:id reflects broadcast_enabled=true" "$FLAG" "True"
+
+echo ""
+echo "--- 2c: Successful broadcast fan-out ---"
+BCAST=$(curl -s -X POST "$BASE/workspaces/$SENDER_ID/broadcast" \
+ -H "Content-Type: application/json" -H "$SENDER_AUTH" \
+ -d '{"message":"Org-wide notice: scheduled maintenance in 5 minutes."}')
+BSTATUS=$(echo "$BCAST" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("status",""))' 2>/dev/null || echo "")
+BDELIVERED=$(echo "$BCAST" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("delivered","-1"))' 2>/dev/null || echo "-1")
+assert "POST /broadcast returns status=sent" "$BSTATUS" "sent"
+
+# delivered count must be >= 1 (the receiver workspace).
+echo " INFO — broadcast delivered=$BDELIVERED"
+if python3 -c "import sys; sys.exit(0 if int('$BDELIVERED') >= 1 else 1)" 2>/dev/null; then
+ echo " PASS — delivered count >= 1"
+ PASS=$((PASS+1))
+else
+ echo " FAIL — expected delivered >= 1, got $BDELIVERED"
+ FAIL=$((FAIL+1))
+fi
+
+echo ""
+echo "--- 2d: Receiver activity log has broadcast_receive entry ---"
+RECEIVER_TOKEN=$(e2e_mint_test_token "$RECEIVER_ID")
+[ -n "$RECEIVER_TOKEN" ] || { echo "Failed to mint receiver token"; exit 1; }
+RECEIVER_AUTH="Authorization: Bearer $RECEIVER_TOKEN"
+
+ACT=$(curl -s -H "$RECEIVER_AUTH" "$BASE/workspaces/$RECEIVER_ID/activity?source=agent&limit=20")
+ROW=$(echo "$ACT" | python3 -c '
+import json, sys
+rows = json.load(sys.stdin) or []
+for r in rows:
+ if r.get("activity_type") == "broadcast_receive":
+ print(json.dumps(r))
+ break
+')
+[ -n "$ROW" ] || {
+ echo " FAIL — could not find broadcast_receive row in receiver activity"
+ FAIL=$((FAIL+1))
+}
+
+if [ -n "$ROW" ]; then
+ # Message is stored in summary field.
+ MSG=$(echo "$ROW" | python3 -c 'import json,sys;r=json.load(sys.stdin);print(r.get("summary",""))')
+ assert_contains "broadcast_receive row summary has original message" "$MSG" "scheduled maintenance"
+ # Sender ID is stored in source_id field.
+ SRC=$(echo "$ROW" | python3 -c 'import json,sys;r=json.load(sys.stdin);print(r.get("source_id",""))')
+ assert "broadcast_receive row source_id is sender workspace" "$SRC" "$SENDER_ID"
+fi
+
+echo ""
+echo "--- 2e: Sender activity log has broadcast_sent entry ---"
+ACT_SENDER=$(curl -s -H "$SENDER_AUTH" "$BASE/workspaces/$SENDER_ID/activity?limit=20")
+SENT_ROW=$(echo "$ACT_SENDER" | python3 -c '
+import json, sys
+rows = json.load(sys.stdin) or []
+for r in rows:
+ if r.get("activity_type") == "broadcast_sent":
+ print(json.dumps(r))
+ break
+')
+[ -n "$SENT_ROW" ] || {
+ echo " FAIL — could not find broadcast_sent row in sender activity"
+ FAIL=$((FAIL+1))
+}
+
+if [ -n "$SENT_ROW" ]; then
+ # Delivered count is baked into the summary field (no response_body for sender row).
+ SUMMARY=$(echo "$SENT_ROW" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("summary",""))')
+ assert_contains "broadcast_sent summary mentions workspace count" "$SUMMARY" "workspace"
+fi
+
+echo ""
+echo "--- 2f: Sender does NOT receive a broadcast_receive entry ---"
+SELF_RECV=$(echo "$ACT_SENDER" | python3 -c '
+import json, sys
+rows = json.load(sys.stdin) or []
+for r in rows:
+ if r.get("activity_type") == "broadcast_receive":
+ print("found")
+ break
+')
+assert_not_contains "sender has no broadcast_receive in own activity log" "${SELF_RECV:-}" "found"
+
+# ─────────────────────────────────────────────────────────────────────────────
+echo ""
+echo "--- 2g: Empty message is rejected ---"
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/broadcast" \
+ -H "Content-Type: application/json" -H "$SENDER_AUTH" \
+ -d '{"message":""}')
+assert "POST /broadcast with empty message returns 400" "$CODE" "400"
+
+echo ""
+echo "--- 2h: Partial PATCH does not clobber other flags ---"
+# Set talk_to_user=false, then patch only broadcast — talk_to_user must stay false.
+curl -s -o /dev/null -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
+ -H "Content-Type: application/json" -H "$ADMIN_AUTH" \
+ -d '{"talk_to_user_enabled": false}'
+curl -s -o /dev/null -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
+ -H "Content-Type: application/json" -H "$ADMIN_AUTH" \
+ -d '{"broadcast_enabled": false}'
+WS=$(curl -s "$BASE/workspaces/$SENDER_ID" -H "$SENDER_AUTH")
+TUF=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("talk_to_user_enabled","MISSING"))')
+BEF=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("broadcast_enabled","MISSING"))')
+assert "partial PATCH preserves talk_to_user_enabled=false" "$TUF" "False"
+assert "partial PATCH sets broadcast_enabled=false" "$BEF" "False"
+
+# ─────────────────────────────────────────────────────────────────────────────
+echo ""
+echo "=== Results: $PASS passed, $FAIL failed ==="
+[ "$FAIL" -eq 0 ]
diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go
index 99b8bd1c..56dd7a1b 100644
--- a/workspace-server/internal/handlers/activity.go
+++ b/workspace-server/internal/handlers/activity.go
@@ -482,6 +482,13 @@ func (h *ActivityHandler) Notify(c *gin.Context) {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
return
}
+ if errors.Is(err, ErrTalkToUserDisabled) {
+ c.JSON(http.StatusForbidden, gin.H{
+ "error": "talk_to_user_disabled",
+ "hint": "This workspace is not allowed to send messages directly to the user. Forward your update to a parent workspace using delegate_task — they may be able to reach the user.",
+ })
+ return
+ }
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
return
}
diff --git a/workspace-server/internal/handlers/activity_test.go b/workspace-server/internal/handlers/activity_test.go
index f6611814..ffb93d70 100644
--- a/workspace-server/internal/handlers/activity_test.go
+++ b/workspace-server/internal/handlers/activity_test.go
@@ -464,9 +464,9 @@ func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) {
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
// Workspace existence check
- mock.ExpectQuery(`SELECT name FROM workspaces`).
+ mock.ExpectQuery(`SELECT name, talk_to_user_enabled FROM workspaces`).
WithArgs("ws-notify").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("DD", true))
// Persistence INSERT — verify shape
mock.ExpectExec(`INSERT INTO activity_logs`).
@@ -511,9 +511,9 @@ func TestNotify_WithAttachments_PersistsFilePartsForReload(t *testing.T) {
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
- mock.ExpectQuery(`SELECT name FROM workspaces`).
+ mock.ExpectQuery(`SELECT name, talk_to_user_enabled FROM workspaces`).
WithArgs("ws-attach").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("DD", true))
// Capture the JSONB arg so we can assert on the persisted shape
// AFTER the call (must include parts[].kind=file so reload
@@ -640,9 +640,9 @@ func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) {
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
- mock.ExpectQuery(`SELECT name FROM workspaces`).
+ mock.ExpectQuery(`SELECT name, talk_to_user_enabled FROM workspaces`).
WithArgs("ws-x").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("DD", true))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnError(fmt.Errorf("simulated db hiccup"))
diff --git a/workspace-server/internal/handlers/agent_message_writer.go b/workspace-server/internal/handlers/agent_message_writer.go
index 6efea603..82f18a8e 100644
--- a/workspace-server/internal/handlers/agent_message_writer.go
+++ b/workspace-server/internal/handlers/agent_message_writer.go
@@ -54,6 +54,11 @@ import (
// timeout) surface as wrapped errors and should be treated as 503.
var ErrWorkspaceNotFound = errors.New("agent_message: workspace not found")
+// ErrTalkToUserDisabled is returned when the workspace has
+// talk_to_user_enabled=false. Callers surface HTTP 403 so the Python tool
+// can detect it and suggest forwarding to a parent workspace.
+var ErrTalkToUserDisabled = errors.New("agent_message: talk_to_user disabled")
+
// AgentMessageAttachment is one file attached to an agent → user
// message. Identical to handlers.NotifyAttachment in field set; kept
// distinct so the writer's API doesn't import a handler type with HTTP
@@ -107,16 +112,20 @@ func (w *AgentMessageWriter) Send(
// notify call surfaced as "workspace not found" and masked real
// incidents in the alert path.
var wsName string
+ var talkToUserEnabled bool
err := w.db.QueryRowContext(ctx,
- `SELECT name FROM workspaces WHERE id = $1 AND status != 'removed'`,
+ `SELECT name, talk_to_user_enabled FROM workspaces WHERE id = $1 AND status != 'removed'`,
workspaceID,
- ).Scan(&wsName)
+ ).Scan(&wsName, &talkToUserEnabled)
if errors.Is(err, sql.ErrNoRows) {
return ErrWorkspaceNotFound
}
if err != nil {
return fmt.Errorf("agent_message: workspace lookup: %w", err)
}
+ if !talkToUserEnabled {
+ return ErrTalkToUserDisabled
+ }
// 2. Build broadcast payload + WS-emit. Same shape that ChatTab's
// AGENT_MESSAGE handler in canvas/src/store/canvas-events.ts has
diff --git a/workspace-server/internal/handlers/agent_message_writer_test.go b/workspace-server/internal/handlers/agent_message_writer_test.go
index 20f5540f..c75a3edd 100644
--- a/workspace-server/internal/handlers/agent_message_writer_test.go
+++ b/workspace-server/internal/handlers/agent_message_writer_test.go
@@ -88,9 +88,9 @@ func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-1").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
WithArgs(
@@ -116,9 +116,9 @@ func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-att").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("Ryan", true))
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
WithArgs(
@@ -173,9 +173,9 @@ func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) {
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter)
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-missing").
- WillReturnRows(sqlmock.NewRows([]string{"name"}))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}))
err := w.Send(context.Background(), "ws-missing", "lost in the void", nil)
if !errors.Is(err, ErrWorkspaceNotFound) {
@@ -202,9 +202,9 @@ func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-dbfail").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnError(errors.New("transient db error"))
@@ -223,9 +223,9 @@ func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-trunc").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("Ryan", true))
longMsg := strings.Repeat("x", 200)
mock.ExpectExec(`INSERT INTO activity_logs`).
@@ -263,9 +263,9 @@ func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) {
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter)
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-bc").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Workspace Name"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("Workspace Name", true))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnResult(sqlmock.NewResult(1, 1))
@@ -315,7 +315,7 @@ func TestAgentMessageWriter_Send_DBErrorOnLookupReturnsWrapped(t *testing.T) {
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
transientErr := errors.New("connection refused")
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-dbdown").
WillReturnError(transientErr)
@@ -350,9 +350,9 @@ func TestAgentMessageWriter_Send_NonASCIIMessagePersists(t *testing.T) {
// the byte-slice bug.
msg := strings.Repeat("你", 200)
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-cjk").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(
@@ -395,9 +395,9 @@ func TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty(t *testing.T) {
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter)
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-noatt").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("X"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("X", true))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnResult(sqlmock.NewResult(1, 1))
diff --git a/workspace-server/internal/handlers/handlers_additional_test.go b/workspace-server/internal/handlers/handlers_additional_test.go
index c08d138f..0e13600d 100644
--- a/workspace-server/internal/handlers/handlers_additional_test.go
+++ b/workspace-server/internal/handlers/handlers_additional_test.go
@@ -230,20 +230,21 @@ func TestWorkspaceList_WithData(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
- // 21 cols — see scanWorkspaceRow for order (max_concurrent_tasks
- // lands between active_tasks and last_error_rate).
+ // 23 cols — broadcast_enabled + talk_to_user_enabled added after monthly_spend
+ // (migration 20260514). Column order must match scanWorkspaceRow exactly.
columns := []string{
"id", "name", "role", "tier", "status", "agent_card", "url",
"parent_id", "active_tasks", "max_concurrent_tasks",
"last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
rows := sqlmock.NewRows(columns).
AddRow("ws-1", "Agent One", "worker", 1, "online", []byte(`{"name":"agent1"}`), "http://localhost:8001",
- nil, 3, 1, 0.02, "", 7200, "processing", "langgraph", "", 10.0, 20.0, false, nil, int64(0)).
+ nil, 3, 1, 0.02, "", 7200, "processing", "langgraph", "", 10.0, 20.0, false, nil, int64(0), false, true).
AddRow("ws-2", "Agent Two", "", 2, "degraded", []byte("null"), "",
- nil, 0, 1, 0.6, "timeout", 100, "", "claude-code", "", 50.0, 60.0, true, nil, int64(0))
+ nil, 0, 1, 0.6, "timeout", 100, "", "claude-code", "", 50.0, 60.0, true, nil, int64(0), false, true)
mock.ExpectQuery("SELECT w.id, w.name").
WillReturnRows(rows)
diff --git a/workspace-server/internal/handlers/handlers_test.go b/workspace-server/internal/handlers/handlers_test.go
index 847a3e9a..33a039a1 100644
--- a/workspace-server/internal/handlers/handlers_test.go
+++ b/workspace-server/internal/handlers/handlers_test.go
@@ -407,21 +407,21 @@ func TestWorkspaceList(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", "/tmp/configs")
- // 21 cols: `max_concurrent_tasks` added between active_tasks and
- // last_error_rate (see scanWorkspaceRow + COALESCE(w.max_concurrent_tasks, 1)
- // in workspace.go). Column order must match that scan exactly.
+ // 23 cols: broadcast_enabled + talk_to_user_enabled added after monthly_spend
+ // (migration 20260514). Column order must match scanWorkspaceRow exactly.
columns := []string{
"id", "name", "role", "tier", "status", "agent_card", "url",
"parent_id", "active_tasks", "max_concurrent_tasks",
"last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
rows := sqlmock.NewRows(columns).
AddRow("ws-1", "Agent One", "worker", 1, "online", []byte("null"), "http://localhost:8001",
- nil, 0, 1, 0.0, "", 100, "", "claude-code", "", 10.0, 20.0, false, nil, int64(0)).
+ nil, 0, 1, 0.0, "", 100, "", "claude-code", "", 10.0, 20.0, false, nil, int64(0), false, true).
AddRow("ws-2", "Agent Two", "manager", 2, "provisioning", []byte("null"), "",
- nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 50.0, 60.0, false, nil, int64(0))
+ nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 50.0, 60.0, false, nil, int64(0), false, true)
mock.ExpectQuery("SELECT w.id, w.name").
WillReturnRows(rows)
@@ -1135,13 +1135,14 @@ func TestWorkspaceGet_CurrentTask(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs("dddddddd-0004-0000-0000-000000000000").
WillReturnRows(sqlmock.NewRows(columns).AddRow(
"dddddddd-0004-0000-0000-000000000000", "Task Worker", "worker", 1, "online", []byte("null"), "http://localhost:9000",
nil, 2, 1, 0.0, "", 300, "Analyzing document", "langgraph", "", 10.0, 20.0, false,
- nil, int64(0),
+ nil, int64(0), false, true,
))
w := httptest.NewRecorder()
diff --git a/workspace-server/internal/handlers/mcp_test.go b/workspace-server/internal/handlers/mcp_test.go
index 125eb725..3a274fbf 100644
--- a/workspace-server/internal/handlers/mcp_test.go
+++ b/workspace-server/internal/handlers/mcp_test.go
@@ -751,9 +751,9 @@ func TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s(t *testing.T) {
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
h, mock := newMCPHandler(t)
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-err").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
// INSERT fails — must NOT abort the tool response.
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
@@ -802,9 +802,9 @@ func TestMCPHandler_SendMessageToUser_ResponseBodyShape(t *testing.T) {
const userMessage = "Hi there from the agent"
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-shape").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
// Capture the response_body argument and assert its exact shape.
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
@@ -861,9 +861,9 @@ func TestMCPHandler_SendMessageToUser_PersistsToActivityLog(t *testing.T) {
// before it does anything else. Returning a name lets the
// broadcast payload populate; the test doesn't assert on the
// broadcast (no observable WS in this fake), only on the DB.
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-msg").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
// The persistence INSERT — pin the exact shape so a future
// refactor that switches columns or drops `method='notify'`
diff --git a/workspace-server/internal/handlers/workspace.go b/workspace-server/internal/handlers/workspace.go
index a6220877..971a9df3 100644
--- a/workspace-server/internal/handlers/workspace.go
+++ b/workspace-server/internal/handlers/workspace.go
@@ -591,7 +591,7 @@ func scanWorkspaceRow(rows interface {
var id, name, role, status, url, sampleError, currentTask, runtime, workspaceDir string
var tier, activeTasks, maxConcurrentTasks, uptimeSeconds int
var errorRate, x, y float64
- var collapsed bool
+ var collapsed, broadcastEnabled, talkToUserEnabled bool
var parentID *string
var agentCard []byte
var budgetLimit sql.NullInt64
@@ -600,7 +600,7 @@ func scanWorkspaceRow(rows interface {
err := rows.Scan(&id, &name, &role, &tier, &status, &agentCard, &url,
&parentID, &activeTasks, &maxConcurrentTasks, &errorRate, &sampleError, &uptimeSeconds,
¤tTask, &runtime, &workspaceDir, &x, &y, &collapsed,
- &budgetLimit, &monthlySpend)
+ &budgetLimit, &monthlySpend, &broadcastEnabled, &talkToUserEnabled)
if err != nil {
return nil, err
}
@@ -624,6 +624,8 @@ func scanWorkspaceRow(rows interface {
"x": x,
"y": y,
"collapsed": collapsed,
+ "broadcast_enabled": broadcastEnabled,
+ "talk_to_user_enabled": talkToUserEnabled,
}
// budget_limit: nil when no limit set, int64 otherwise
@@ -659,7 +661,8 @@ const workspaceListQuery = `
COALESCE(w.current_task, ''), COALESCE(w.runtime, 'langgraph'),
COALESCE(w.workspace_dir, ''),
COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false),
- w.budget_limit, COALESCE(w.monthly_spend, 0)
+ w.budget_limit, COALESCE(w.monthly_spend, 0),
+ w.broadcast_enabled, w.talk_to_user_enabled
FROM workspaces w
LEFT JOIN canvas_layouts cl ON cl.workspace_id = w.id
WHERE w.status != 'removed'
@@ -719,7 +722,8 @@ func (h *WorkspaceHandler) Get(c *gin.Context) {
COALESCE(w.current_task, ''), COALESCE(w.runtime, 'langgraph'),
COALESCE(w.workspace_dir, ''),
COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false),
- w.budget_limit, COALESCE(w.monthly_spend, 0)
+ w.budget_limit, COALESCE(w.monthly_spend, 0),
+ w.broadcast_enabled, w.talk_to_user_enabled
FROM workspaces w
LEFT JOIN canvas_layouts cl ON cl.workspace_id = w.id
WHERE w.id = $1
diff --git a/workspace-server/internal/handlers/workspace_abilities.go b/workspace-server/internal/handlers/workspace_abilities.go
new file mode 100644
index 00000000..71fa48f9
--- /dev/null
+++ b/workspace-server/internal/handlers/workspace_abilities.go
@@ -0,0 +1,82 @@
+package handlers
+
+// workspace_abilities.go — PATCH /workspaces/:id/abilities
+//
+// Allows users and admin agents to toggle two workspace-level ability flags:
+//
+// broadcast_enabled — workspace may POST /broadcast to send org-wide messages
+// talk_to_user_enabled — workspace may deliver canvas chat messages via
+// send_message_to_user / POST /notify
+//
+// Gated behind AdminAuth so workspace agents cannot self-modify their own
+// ability flags (that would let any agent grant itself broadcast rights or
+// suppress its own chat-silence constraint).
+
+import (
+ "log"
+ "net/http"
+
+ "github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
+ "github.com/gin-gonic/gin"
+)
+
+// AbilitiesPayload carries the subset of ability flags the caller wants to
+// update. Fields are pointers so that the handler can distinguish "caller
+// supplied false" from "caller omitted the field" (omitempty semantics).
+type AbilitiesPayload struct {
+ BroadcastEnabled *bool `json:"broadcast_enabled"`
+ TalkToUserEnabled *bool `json:"talk_to_user_enabled"`
+}
+
+// PatchAbilities handles PATCH /workspaces/:id/abilities (AdminAuth).
+func PatchAbilities(c *gin.Context) {
+ id := c.Param("id")
+ if err := validateWorkspaceID(id); err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"})
+ return
+ }
+
+ var body AbilitiesPayload
+ if err := c.ShouldBindJSON(&body); err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
+ return
+ }
+ if body.BroadcastEnabled == nil && body.TalkToUserEnabled == nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "at least one ability field required"})
+ return
+ }
+
+ ctx := c.Request.Context()
+
+ var exists bool
+ if err := db.DB.QueryRowContext(ctx,
+ `SELECT EXISTS(SELECT 1 FROM workspaces WHERE id = $1 AND status != 'removed')`, id,
+ ).Scan(&exists); err != nil || !exists {
+ c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
+ return
+ }
+
+ if body.BroadcastEnabled != nil {
+ if _, err := db.DB.ExecContext(ctx,
+ `UPDATE workspaces SET broadcast_enabled = $2, updated_at = now() WHERE id = $1`,
+ id, *body.BroadcastEnabled,
+ ); err != nil {
+ log.Printf("PatchAbilities broadcast_enabled for %s: %v", id, err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
+ return
+ }
+ }
+
+ if body.TalkToUserEnabled != nil {
+ if _, err := db.DB.ExecContext(ctx,
+ `UPDATE workspaces SET talk_to_user_enabled = $2, updated_at = now() WHERE id = $1`,
+ id, *body.TalkToUserEnabled,
+ ); err != nil {
+ log.Printf("PatchAbilities talk_to_user_enabled for %s: %v", id, err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
+ return
+ }
+ }
+
+ c.JSON(http.StatusOK, gin.H{"status": "updated"})
+}
diff --git a/workspace-server/internal/handlers/workspace_broadcast.go b/workspace-server/internal/handlers/workspace_broadcast.go
new file mode 100644
index 00000000..66847566
--- /dev/null
+++ b/workspace-server/internal/handlers/workspace_broadcast.go
@@ -0,0 +1,185 @@
+package handlers
+
+// workspace_broadcast.go — POST /workspaces/:id/broadcast
+//
+// Allows a workspace with broadcast_enabled=true to send a message to every
+// non-removed agent workspace in the SAME ORG. The message is:
+//
+// • Persisted in each recipient's activity_logs (type='broadcast_receive')
+// so poll-mode agents pick it up via GET /activity.
+// • Broadcast via WebSocket BROADCAST_MESSAGE event so canvas panels can
+// show a real-time banner for each recipient workspace.
+//
+// The sender's own workspace logs a 'broadcast_sent' activity row for
+// traceability.
+//
+// Auth: WorkspaceAuth (the agent triggers this with its own bearer token).
+// The handler re-validates broadcast_enabled inside the DB lookup to prevent
+// TOCTOU — the middleware only proved the token is valid, not the ability.
+//
+// Org isolation (OFFSEC-015): recipients are scoped to the sender's org using
+// a recursive CTE that walks the parent_id chain to find the org root. This
+// prevents a compromised or misconfigured workspace from broadcasting to
+// workspaces in other tenants' orgs.
+
+import (
+ "log"
+ "net/http"
+ "strconv"
+
+ "github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
+ "github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
+ "github.com/gin-gonic/gin"
+)
+
+// BroadcastHandler is constructed once and shared across requests.
+type BroadcastHandler struct {
+ broadcaster *events.Broadcaster
+}
+
+// NewBroadcastHandler creates a BroadcastHandler.
+func NewBroadcastHandler(b *events.Broadcaster) *BroadcastHandler {
+ return &BroadcastHandler{broadcaster: b}
+}
+
+// Broadcast handles POST /workspaces/:id/broadcast.
+func (h *BroadcastHandler) Broadcast(c *gin.Context) {
+ senderID := c.Param("id")
+ if err := validateWorkspaceID(senderID); err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"})
+ return
+ }
+
+ var body struct {
+ Message string `json:"message" binding:"required"`
+ }
+ if err := c.ShouldBindJSON(&body); err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "message is required"})
+ return
+ }
+
+ ctx := c.Request.Context()
+
+ // Verify sender exists and has broadcast_enabled=true.
+ var senderName string
+ var broadcastEnabled bool
+ err := db.DB.QueryRowContext(ctx,
+ `SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'`,
+ senderID,
+ ).Scan(&senderName, &broadcastEnabled)
+ if err != nil {
+ c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
+ return
+ }
+ if !broadcastEnabled {
+ c.JSON(http.StatusForbidden, gin.H{
+ "error": "broadcast_disabled",
+ "hint": "This workspace does not have the broadcast ability. Ask a user or admin to enable it via PATCH /workspaces/:id/abilities.",
+ })
+ return
+ }
+
+ // Find the sender's org root by walking the parent_id chain.
+ // Workspaces with parent_id = NULL are org roots; every other workspace
+ // belongs to the org identified by its topmost ancestor.
+ var orgRootID string
+ err = db.DB.QueryRowContext(ctx, `
+ WITH RECURSIVE org_chain AS (
+ SELECT id, parent_id, id AS root_id
+ FROM workspaces
+ WHERE id = $1
+ UNION ALL
+ SELECT w.id, w.parent_id, c.root_id
+ FROM workspaces w
+ JOIN org_chain c ON w.id = c.parent_id
+ )
+ SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1
+ `, senderID).Scan(&orgRootID)
+ if err != nil {
+ log.Printf("Broadcast: org root lookup for %s: %v", senderID, err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
+ return
+ }
+
+ // Collect all non-removed agent workspaces in the SAME ORG (same root_id),
+ // excluding the sender itself.
+ rows, err := db.DB.QueryContext(ctx, `
+ WITH RECURSIVE org_chain AS (
+ SELECT id, parent_id, id AS root_id
+ FROM workspaces
+ WHERE parent_id IS NULL
+ UNION ALL
+ SELECT w.id, w.parent_id, c.root_id
+ FROM workspaces w
+ JOIN org_chain c ON w.parent_id = c.id
+ )
+ SELECT c.id
+ FROM org_chain c
+ WHERE c.root_id = $1
+ AND c.id != $2
+ AND EXISTS (
+ SELECT 1 FROM workspaces w
+ WHERE w.id = c.id AND w.status != 'removed'
+ )
+ `, orgRootID, senderID)
+ if err != nil {
+ log.Printf("Broadcast: recipient query failed for %s: %v", senderID, err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
+ return
+ }
+ defer rows.Close()
+
+ var recipientIDs []string
+ for rows.Next() {
+ var rid string
+ if rows.Scan(&rid) == nil {
+ recipientIDs = append(recipientIDs, rid)
+ }
+ }
+ if err := rows.Err(); err != nil {
+ log.Printf("Broadcast: recipient rows error for %s: %v", senderID, err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
+ return
+ }
+
+ broadcastPayload := map[string]interface{}{
+ "message": body.Message,
+ "sender_id": senderID,
+ "sender": senderName,
+ }
+
+ // Persist broadcast_receive in each recipient's activity log + emit WS event.
+ delivered := 0
+ for _, rid := range recipientIDs {
+ if _, err := db.DB.ExecContext(ctx, `
+ INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status)
+ VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')
+ `, rid, senderID, "Broadcast from "+senderName+": "+broadcastTruncate(body.Message, 120)); err != nil {
+ log.Printf("Broadcast: activity_logs insert for recipient %s: %v", rid, err)
+ continue
+ }
+ h.broadcaster.BroadcastOnly(rid, "BROADCAST_MESSAGE", broadcastPayload)
+ delivered++
+ }
+
+ // Record the send on the sender's own log.
+ if _, err := db.DB.ExecContext(ctx, `
+ INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status)
+ VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')
+ `, senderID, "Broadcast sent to "+strconv.Itoa(delivered)+" workspace(s)"); err != nil {
+ log.Printf("Broadcast: sender activity_log for %s: %v", senderID, err)
+ }
+
+ c.JSON(http.StatusOK, gin.H{
+ "status": "sent",
+ "delivered": delivered,
+ })
+}
+
+func broadcastTruncate(s string, max int) string {
+ runes := []rune(s)
+ if len(runes) <= max {
+ return s
+ }
+ return string(runes[:max]) + "…"
+}
diff --git a/workspace-server/internal/handlers/workspace_broadcast_test.go b/workspace-server/internal/handlers/workspace_broadcast_test.go
new file mode 100644
index 00000000..50668643
--- /dev/null
+++ b/workspace-server/internal/handlers/workspace_broadcast_test.go
@@ -0,0 +1,428 @@
+package handlers
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+
+ "github.com/DATA-DOG/go-sqlmock"
+ "github.com/gin-gonic/gin"
+)
+
+// -------- Org-scoped recipient query tests (OFFSEC-015) --------
+
+// TestBroadcast_OrgScopedRecipients verifies that a broadcast from Org-A does
+// NOT reach workspaces belonging to Org-B. This is the core regression test
+// for OFFSEC-015: the original query had no org filter, so a workspace in
+// Org-A could broadcast to every non-removed workspace in the entire DB,
+// including workspaces owned by other tenants.
+func TestBroadcast_OrgScopedRecipients(t *testing.T) {
+ mock := setupTestDB(t)
+ broadcaster := newTestBroadcaster()
+ handler := NewBroadcastHandler(broadcaster)
+
+ // Org-A structure:
+ // org-a-root (parent_id = NULL) ← sender
+ // ├── ws-a-child
+ // Org-B structure:
+ // org-b-root (parent_id = NULL)
+ // └── ws-b-child
+ senderID := "00000000-0000-0000-0000-000000000001" // org-a-root
+ wsAChild := "00000000-0000-0000-0000-000000000002"
+ // ws-b-child is in Org-B (different root); the org-scoped query MUST NOT include it.
+
+ // 1. Sender lookup
+ mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
+ WithArgs(senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Org-A Root", true))
+
+ // 2. Org root lookup — sender is its own root (parent_id = NULL)
+ mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
+ WithArgs(senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
+
+ // 3. Org-scoped recipient query — MUST include org filter so ws-b-child is NOT included.
+ // The query joins on org_chain.root_id = orgRootID, which scopes to Org-A only.
+ mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
+ WithArgs(senderID, senderID). // orgRootID, senderID (EXCLUDED)
+ WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(wsAChild)) // only Org-A child
+
+ // Activity log inserts
+ mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(wsAChild, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
+ mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
+
+ w := httptest.NewRecorder()
+ c, _ := gin.CreateTestContext(w)
+ c.Params = gin.Params{{Key: "id", Value: senderID}}
+ body := `{"message":"hello from org-a"}`
+ c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
+ c.Request.Header.Set("Content-Type", "application/json")
+
+ handler.Broadcast(c)
+
+ if w.Code != http.StatusOK {
+ t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
+ }
+
+ var resp map[string]interface{}
+ if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("failed to unmarshal response: %v", err)
+ }
+ if resp["status"] != "sent" {
+ t.Errorf("expected status 'sent', got %v", resp["status"])
+ }
+ // ws-b-child is in a DIFFERENT org — the org-scoped query MUST NOT include it.
+ // If it were included, the mock would have an unmet expectation.
+ if err := mock.ExpectationsWereMet(); err != nil {
+ t.Errorf("unmet mock expectations — cross-org workspace was included in broadcast: %v", err)
+ }
+}
+
+// TestBroadcast_OrgScoped_OrgRootSender verifies that when the sender IS the
+// org root (parent_id = NULL), broadcasts still reach sibling workspaces.
+func TestBroadcast_OrgScoped_OrgRootSender(t *testing.T) {
+ mock := setupTestDB(t)
+ broadcaster := newTestBroadcaster()
+ handler := NewBroadcastHandler(broadcaster)
+
+ senderID := "00000000-0000-0000-0000-000000000001" // org-a-root
+ siblingID := "00000000-0000-0000-0000-000000000002"
+
+ mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
+ WithArgs(senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
+
+ // Sender is the org root — CTE returns sender's own ID as root
+ mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
+ WithArgs(senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
+
+ // Recipients in same org, excluding sender
+ mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
+ WithArgs(senderID, senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(siblingID))
+
+ mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(siblingID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
+ mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
+
+ w := httptest.NewRecorder()
+ c, _ := gin.CreateTestContext(w)
+ c.Params = gin.Params{{Key: "id", Value: senderID}}
+ body := `{"message":"hello siblings"}`
+ c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
+ c.Request.Header.Set("Content-Type", "application/json")
+
+ handler.Broadcast(c)
+
+ if w.Code != http.StatusOK {
+ t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
+ }
+ if err := mock.ExpectationsWereMet(); err != nil {
+ t.Errorf("unmet expectations: %v", err)
+ }
+}
+
+// TestBroadcast_OrgScoped_ChildWorkspaceSender verifies that a non-root child
+// workspace can broadcast to siblings in the same org.
+func TestBroadcast_OrgScoped_ChildWorkspaceSender(t *testing.T) {
+ mock := setupTestDB(t)
+ broadcaster := newTestBroadcaster()
+ handler := NewBroadcastHandler(broadcaster)
+
+ orgRootID := "00000000-0000-0000-0000-000000000001"
+ senderID := "00000000-0000-0000-0000-000000000002" // child workspace
+ siblingID := "00000000-0000-0000-0000-000000000003"
+
+ mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
+ WithArgs(senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Child Agent", true))
+
+ // Org root lookup — walk up to find org-a-root
+ mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
+ WithArgs(senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(orgRootID))
+
+ // Recipients: same org, excluding sender
+ mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
+ WithArgs(orgRootID, senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(siblingID))
+
+ mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(siblingID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
+ mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
+
+ w := httptest.NewRecorder()
+ c, _ := gin.CreateTestContext(w)
+ c.Params = gin.Params{{Key: "id", Value: senderID}}
+ body := `{"message":"child broadcasting"}`
+ c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
+ c.Request.Header.Set("Content-Type", "application/json")
+
+ handler.Broadcast(c)
+
+ if w.Code != http.StatusOK {
+ t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
+ }
+ if err := mock.ExpectationsWereMet(); err != nil {
+ t.Errorf("unmet expectations: %v", err)
+ }
+}
+
+// -------- Non-regression cases --------
+
+func TestBroadcast_NotFound(t *testing.T) {
+ mock := setupTestDB(t)
+ broadcaster := newTestBroadcaster()
+ handler := NewBroadcastHandler(broadcaster)
+
+ senderID := "00000000-0000-0000-0000-000000000099"
+ // UUID is valid, but no workspace row matches
+ mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
+ WithArgs(senderID).
+ WillReturnError(errors.New("workspace not found"))
+
+ w := httptest.NewRecorder()
+ c, _ := gin.CreateTestContext(w)
+ c.Params = gin.Params{{Key: "id", Value: senderID}}
+ body := `{"message":"test"}`
+ c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
+ c.Request.Header.Set("Content-Type", "application/json")
+
+ handler.Broadcast(c)
+
+ if w.Code != http.StatusNotFound {
+ t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
+ }
+ if err := mock.ExpectationsWereMet(); err != nil {
+ t.Errorf("unmet expectations: %v", err)
+ }
+}
+
+func TestBroadcast_Disabled(t *testing.T) {
+ mock := setupTestDB(t)
+ broadcaster := newTestBroadcaster()
+ handler := NewBroadcastHandler(broadcaster)
+
+ senderID := "00000000-0000-0000-0000-000000000001"
+ mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
+ WithArgs(senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Disabled Agent", false))
+
+ w := httptest.NewRecorder()
+ c, _ := gin.CreateTestContext(w)
+ c.Params = gin.Params{{Key: "id", Value: senderID}}
+ body := `{"message":"should not send"}`
+ c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
+ c.Request.Header.Set("Content-Type", "application/json")
+
+ handler.Broadcast(c)
+
+ if w.Code != http.StatusForbidden {
+ t.Errorf("expected 403, got %d: %s", w.Code, w.Body.String())
+ }
+ var resp map[string]interface{}
+ if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("failed to unmarshal: %v", err)
+ }
+ if resp["error"] != "broadcast_disabled" {
+ t.Errorf("expected error 'broadcast_disabled', got %v", resp["error"])
+ }
+}
+
+func TestBroadcast_EmptyOrg_NoRecipients(t *testing.T) {
+ mock := setupTestDB(t)
+ broadcaster := newTestBroadcaster()
+ handler := NewBroadcastHandler(broadcaster)
+
+ senderID := "00000000-0000-0000-0000-000000000001" // org root, only workspace in org
+
+ mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
+ WithArgs(senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Lone Root", true))
+
+ mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
+ WithArgs(senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
+
+ // No other workspaces in this org
+ mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
+ WithArgs(senderID, senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"id"}))
+
+ mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
+
+ w := httptest.NewRecorder()
+ c, _ := gin.CreateTestContext(w)
+ c.Params = gin.Params{{Key: "id", Value: senderID}}
+ body := `{"message":"hello org"}`
+ c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
+ c.Request.Header.Set("Content-Type", "application/json")
+
+ handler.Broadcast(c)
+
+ if w.Code != http.StatusOK {
+ t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
+ }
+ var resp map[string]interface{}
+ if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("failed to unmarshal: %v", err)
+ }
+ if resp["delivered"] != float64(0) {
+ t.Errorf("expected delivered=0, got %v", resp["delivered"])
+ }
+ if err := mock.ExpectationsWereMet(); err != nil {
+ t.Errorf("unmet expectations: %v", err)
+ }
+}
+
+func TestBroadcast_InvalidWorkspaceID(t *testing.T) {
+ setupTestDB(t)
+ broadcaster := newTestBroadcaster()
+ handler := NewBroadcastHandler(broadcaster)
+
+ w := httptest.NewRecorder()
+ c, _ := gin.CreateTestContext(w)
+ c.Params = gin.Params{{Key: "id", Value: "not-a-uuid"}}
+ body := `{"message":"test"}`
+ c.Request = httptest.NewRequest("POST", "/workspaces/not-a-uuid/broadcast", bytes.NewBufferString(body))
+ c.Request.Header.Set("Content-Type", "application/json")
+
+ handler.Broadcast(c)
+
+ if w.Code != http.StatusBadRequest {
+ t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
+ }
+}
+
+func TestBroadcast_MissingMessage(t *testing.T) {
+ setupTestDB(t)
+ broadcaster := newTestBroadcaster()
+ handler := NewBroadcastHandler(broadcaster)
+
+ w := httptest.NewRecorder()
+ c, _ := gin.CreateTestContext(w)
+ c.Params = gin.Params{{Key: "id", Value: "00000000-0000-0000-0000-000000000001"}}
+ c.Request = httptest.NewRequest("POST", "/workspaces/00000000-0000-0000-0000-000000000001/broadcast", bytes.NewBufferString("{}"))
+ c.Request.Header.Set("Content-Type", "application/json")
+
+ handler.Broadcast(c)
+
+ if w.Code != http.StatusBadRequest {
+ t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
+ }
+}
+
+// TestBroadcast_OrgRootLookupFails verifies that if the recursive CTE for
+// finding the org root errors, the handler returns 500 instead of proceeding
+// with an un-scoped query that would broadcast to all orgs.
+func TestBroadcast_OrgRootLookupFails(t *testing.T) {
+ mock := setupTestDB(t)
+ broadcaster := newTestBroadcaster()
+ handler := NewBroadcastHandler(broadcaster)
+
+ senderID := "00000000-0000-0000-0000-000000000001"
+
+ mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
+ WithArgs(senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
+
+ // Org root CTE fails
+ mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
+ WithArgs(senderID).
+ WillReturnError(context.DeadlineExceeded)
+
+ w := httptest.NewRecorder()
+ c, _ := gin.CreateTestContext(w)
+ c.Params = gin.Params{{Key: "id", Value: senderID}}
+ body := `{"message":"should not broadcast"}`
+ c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
+ c.Request.Header.Set("Content-Type", "application/json")
+
+ handler.Broadcast(c)
+
+ if w.Code != http.StatusInternalServerError {
+ t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
+ }
+ // The recipient query MUST NOT be called — it would broadcast cross-org
+ // if the org root lookup failed silently.
+ if err := mock.ExpectationsWereMet(); err != nil {
+ t.Errorf("unmet expectations: %v", err)
+ }
+}
+
+// TestBroadcast_OrgScoped_SelfBroadcastExcluded verifies that broadcasting
+// from a workspace does not send a broadcast_receive to the sender itself
+// (the sender logs broadcast_sent, not broadcast_receive).
+func TestBroadcast_OrgScoped_SelfBroadcastExcluded(t *testing.T) {
+ mock := setupTestDB(t)
+ broadcaster := newTestBroadcaster()
+ handler := NewBroadcastHandler(broadcaster)
+
+ senderID := "00000000-0000-0000-0000-000000000001"
+ peerID := "00000000-0000-0000-0000-000000000002"
+
+ mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
+ WithArgs(senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
+
+ mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
+ WithArgs(senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
+
+ // Recipient query MUST exclude sender via id != senderID
+ mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
+ WithArgs(senderID, senderID).
+ WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(peerID))
+
+ // Peer receives broadcast_receive
+ mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(peerID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
+ // Sender logs broadcast_sent (NOT broadcast_receive)
+ mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
+
+ w := httptest.NewRecorder()
+ c, _ := gin.CreateTestContext(w)
+ c.Params = gin.Params{{Key: "id", Value: senderID}}
+ body := `{"message":"no echo to self"}`
+ c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
+ c.Request.Header.Set("Content-Type", "application/json")
+
+ handler.Broadcast(c)
+
+ if w.Code != http.StatusOK {
+ t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
+ }
+ if err := mock.ExpectationsWereMet(); err != nil {
+ t.Errorf("unmet expectations: %v", err)
+ }
+}
+
+// TestBroadcast_Truncate tests that messages are truncated with the Unicode ellipsis
+// TestBroadcast_Truncate tests that messages are truncated with the Unicode ellipsis
+// character (U+2026) when len(msg) > max. The truncated output is max runes + "…",
+// so truncating a 48-char string at max=20 produces 21 characters (20 runes + "…").
+func TestBroadcast_Truncate(t *testing.T) {
+ cases := []struct {
+ msg string
+ max int
+ expect string
+ }{
+ {"short", 120, "short"}, // under max — no truncation
+ // exactly120chars (15) + 105 ones = 120 chars; at max=120 → unchanged
+ {"exactly120chars1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111", 120, "exactly120chars111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111…"},
+ // "this is a longer mes" = 20 runes; + "…" = 21 chars
+ {"this is a longer message that needs truncating", 20, "this is a longer mes…"},
+ // at-max boundary: 20 chars at max=20 → no truncation
+ {"exactly twenty chars", 20, "exactly twenty chars"},
+ // over max: 11 chars at max=10 → 10 + "…" = 11
+ {"hello world!", 10, "hello worl…"},
+ }
+ for _, tc := range cases {
+ result := broadcastTruncate(tc.msg, tc.max)
+ if result != tc.expect {
+ t.Errorf("broadcastTruncate(%q, %d) = %q; want %q", tc.msg, tc.max, result, tc.expect)
+ }
+ }
+}
diff --git a/workspace-server/internal/handlers/workspace_budget_test.go b/workspace-server/internal/handlers/workspace_budget_test.go
index 920dad9c..4652e293 100644
--- a/workspace-server/internal/handlers/workspace_budget_test.go
+++ b/workspace-server/internal/handlers/workspace_budget_test.go
@@ -33,6 +33,7 @@ var wsColumns = []string{
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
// ==================== GET — financial fields stripped from open endpoint ====================
@@ -52,8 +53,10 @@ func TestWorkspaceBudget_Get_NilLimit(t *testing.T) {
[]byte(`{}`), "http://localhost:9001",
nil, 0, 1, 0.0, "", 0, "", "langgraph", "",
0.0, 0.0, false,
- nil, // budget_limit NULL
- 0)) // monthly_spend 0
+ nil, // budget_limit NULL
+ 0, // monthly_spend 0
+ false, // broadcast_enabled
+ true)) // talk_to_user_enabled
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -96,7 +99,8 @@ func TestWorkspaceBudget_Get_WithLimit(t *testing.T) {
nil, 0, 1, 0.0, "", 0, "", "langgraph", "",
0.0, 0.0, false,
int64(500), // budget_limit = $5.00 in DB
- int64(123))) // monthly_spend = $1.23 in DB
+ int64(123), // monthly_spend = $1.23 in DB
+ false, true)) // broadcast_enabled, talk_to_user_enabled
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
diff --git a/workspace-server/internal/handlers/workspace_test.go b/workspace-server/internal/handlers/workspace_test.go
index fc0895bc..6d24370b 100644
--- a/workspace-server/internal/handlers/workspace_test.go
+++ b/workspace-server/internal/handlers/workspace_test.go
@@ -29,6 +29,7 @@ func TestWorkspaceGet_Success(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs("cccccccc-0001-0000-0000-000000000000").
@@ -36,7 +37,7 @@ func TestWorkspaceGet_Success(t *testing.T) {
AddRow("cccccccc-0001-0000-0000-000000000000", "My Agent", "worker", 1, "online", []byte(`{"name":"test"}`),
"http://localhost:8001", nil, 2, 1, 0.05, "", 3600, "working", "langgraph",
"", 10.0, 20.0, false,
- nil, 0))
+ nil, 0, false, true))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -118,6 +119,7 @@ func TestWorkspaceGet_RemovedReturns410(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs(id).
@@ -125,7 +127,7 @@ func TestWorkspaceGet_RemovedReturns410(t *testing.T) {
AddRow(id, "Old Agent", "worker", 1, string(models.StatusRemoved), []byte(`null`),
"", nil, 0, 1, 0.0, "", 0, "", "langgraph",
"", 0.0, 0.0, false,
- nil, 0))
+ nil, 0, false, true))
mock.ExpectQuery(`SELECT updated_at FROM workspaces`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"updated_at"}).AddRow(removedAt))
@@ -181,6 +183,7 @@ func TestWorkspaceGet_RemovedReturns410WithNullRemovedAtOnTimestampFetchFailure(
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs(id).
@@ -188,7 +191,7 @@ func TestWorkspaceGet_RemovedReturns410WithNullRemovedAtOnTimestampFetchFailure(
AddRow(id, "Vanished", "worker", 1, string(models.StatusRemoved), []byte(`null`),
"", nil, 0, 1, 0.0, "", 0, "", "langgraph",
"", 0.0, 0.0, false,
- nil, 0))
+ nil, 0, false, true))
// Simulate the row vanishing between the two queries.
mock.ExpectQuery(`SELECT updated_at FROM workspaces`).
WithArgs(id).
@@ -243,6 +246,7 @@ func TestWorkspaceGet_RemovedWithIncludeQueryReturns200(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs(id).
@@ -250,7 +254,7 @@ func TestWorkspaceGet_RemovedWithIncludeQueryReturns200(t *testing.T) {
AddRow(id, "Audit Agent", "worker", 1, string(models.StatusRemoved), []byte(`null`),
"", nil, 0, 1, 0.0, "", 0, "", "langgraph",
"", 0.0, 0.0, false,
- nil, 0))
+ nil, 0, false, true))
// last_outbound_at follow-up query (existing path)
mock.ExpectQuery(`SELECT last_outbound_at FROM workspaces`).
WithArgs(id).
@@ -714,6 +718,7 @@ func TestWorkspaceList_Empty(t *testing.T) {
"parent_id", "active_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}))
w := httptest.NewRecorder()
@@ -1417,6 +1422,7 @@ func TestWorkspaceGet_FinancialFieldsStripped(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
// Populate with non-zero financial values to confirm they are stripped.
mock.ExpectQuery("SELECT w.id, w.name").
@@ -1425,7 +1431,7 @@ func TestWorkspaceGet_FinancialFieldsStripped(t *testing.T) {
AddRow("cccccccc-0010-0000-0000-000000000000", "Finance Test", "worker", 1, "online", []byte(`{}`),
"http://localhost:9001", nil, 0, 1, 0.0, "", 0, "", "langgraph",
"", 0.0, 0.0, false,
- int64(50000), int64(12500))) // budget_limit=500 USD, spend=125 USD
+ int64(50000), int64(12500), false, true)) // budget_limit=500 USD, spend=125 USD
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1473,6 +1479,7 @@ func TestWorkspaceGet_SensitiveFieldsStripped(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs("cccccccc-0955-0000-0000-000000000000").
@@ -1485,7 +1492,7 @@ func TestWorkspaceGet_SensitiveFieldsStripped(t *testing.T) {
"langgraph",
"/home/user/secret-projects/client-work",
0.0, 0.0, false,
- nil, 0))
+ nil, 0, false, true))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
diff --git a/workspace-server/internal/models/workspace.go b/workspace-server/internal/models/workspace.go
index 11284473..9139fc5b 100644
--- a/workspace-server/internal/models/workspace.go
+++ b/workspace-server/internal/models/workspace.go
@@ -36,6 +36,15 @@ type Workspace struct {
// to activity_logs, agent reads via GET /activity?since_id=). See
// migration 045 + RFC #2339.
DeliveryMode string `json:"delivery_mode" db:"delivery_mode"`
+ // BroadcastEnabled: when true the workspace may call POST /broadcast to
+ // deliver a message to all non-removed agent workspaces in the org.
+ // Default false — only privileged orchestrators should hold this ability.
+ BroadcastEnabled bool `json:"broadcast_enabled" db:"broadcast_enabled"`
+ // TalkToUserEnabled: when false the workspace's send_message_to_user calls
+ // and POST /notify requests are rejected with HTTP 403 so the agent is
+ // forced to route updates through a parent workspace. Default true
+ // (preserves existing behaviour for all workspaces).
+ TalkToUserEnabled bool `json:"talk_to_user_enabled" db:"talk_to_user_enabled"`
// Canvas layout fields (from JOIN)
X float64 `json:"x"`
Y float64 `json:"y"`
diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go
index aac18c14..6e7026ab 100644
--- a/workspace-server/internal/router/router.go
+++ b/workspace-server/internal/router/router.go
@@ -146,6 +146,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
wsAdmin.GET("/workspaces", wh.List)
wsAdmin.POST("/workspaces", wh.Create)
wsAdmin.DELETE("/workspaces/:id", wh.Delete)
+ // Ability toggles — admin-only so workspace agents cannot self-modify
+ // broadcast_enabled or talk_to_user_enabled.
+ wsAdmin.PATCH("/workspaces/:id/abilities", handlers.PatchAbilities)
// Out-of-band bootstrap signal: CP's watcher POSTs here when it
// detects "RUNTIME CRASHED" in a workspace EC2 console output,
// so the canvas flips to failed in seconds instead of waiting
@@ -201,6 +204,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// to 'hibernated'. The workspace auto-wakes on the next A2A message.
wsAuth.POST("/hibernate", wh.Hibernate)
+ // Broadcast — send a message to all non-removed workspaces in the org.
+ // Requires broadcast_enabled=true on the source workspace (checked
+ // inside the handler). WorkspaceAuth on wsAuth proves token ownership.
+ broadcastH := handlers.NewBroadcastHandler(broadcaster)
+ wsAuth.POST("/broadcast", broadcastH.Broadcast)
+
// External-workspace credential lifecycle (issue #319 follow-up to
// the Create flow). Both endpoints reject runtime ≠ external with
// 400 — see external_rotate.go for the rationale.
diff --git a/workspace-server/migrations/20260514120000_workspace_abilities.down.sql b/workspace-server/migrations/20260514120000_workspace_abilities.down.sql
new file mode 100644
index 00000000..12b5f846
--- /dev/null
+++ b/workspace-server/migrations/20260514120000_workspace_abilities.down.sql
@@ -0,0 +1,3 @@
+ALTER TABLE workspaces
+ DROP COLUMN IF EXISTS broadcast_enabled,
+ DROP COLUMN IF EXISTS talk_to_user_enabled;
diff --git a/workspace-server/migrations/20260514120000_workspace_abilities.up.sql b/workspace-server/migrations/20260514120000_workspace_abilities.up.sql
new file mode 100644
index 00000000..f172c30f
--- /dev/null
+++ b/workspace-server/migrations/20260514120000_workspace_abilities.up.sql
@@ -0,0 +1,16 @@
+-- Workspace abilities: opt-in flags that gate platform-level behaviours.
+--
+-- broadcast_enabled (default FALSE): when TRUE the workspace may call
+-- POST /workspaces/:id/broadcast to send a message to every non-removed
+-- agent workspace in the org. Off by default — only privileged
+-- orchestrator workspaces should hold this ability.
+--
+-- talk_to_user_enabled (default TRUE): when FALSE the workspace is not
+-- allowed to deliver messages to the canvas user via send_message_to_user /
+-- POST /notify. The platform returns HTTP 403 so the agent can forward its
+-- update to a parent workspace instead. Default TRUE preserves existing
+-- behaviour for all current workspaces.
+
+ALTER TABLE workspaces
+ ADD COLUMN IF NOT EXISTS broadcast_enabled BOOLEAN NOT NULL DEFAULT FALSE,
+ ADD COLUMN IF NOT EXISTS talk_to_user_enabled BOOLEAN NOT NULL DEFAULT TRUE;
diff --git a/workspace/a2a_mcp_server.py b/workspace/a2a_mcp_server.py
index 5ac5c594..ce27e982 100644
--- a/workspace/a2a_mcp_server.py
+++ b/workspace/a2a_mcp_server.py
@@ -29,6 +29,7 @@ from typing import Callable
import inbox
from a2a_tools import (
+ tool_broadcast_message,
tool_chat_history,
tool_check_task_status,
tool_commit_memory,
@@ -160,6 +161,11 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
arguments.get("before_ts", ""),
source_workspace_id=arguments.get("source_workspace_id") or None,
)
+ elif name == "broadcast_message":
+ return await tool_broadcast_message(
+ arguments.get("message", ""),
+ workspace_id=arguments.get("workspace_id") or None,
+ )
return f"Unknown tool: {name}"
diff --git a/workspace/a2a_tools.py b/workspace/a2a_tools.py
index 1b1ef267..eb26e622 100644
--- a/workspace/a2a_tools.py
+++ b/workspace/a2a_tools.py
@@ -137,6 +137,7 @@ from a2a_tools_delegation import ( # noqa: E402 (import after the from-a2a_cli
# identically.
from a2a_tools_messaging import ( # noqa: E402 (import after the top-of-module imports)
_upload_chat_files,
+ tool_broadcast_message,
tool_chat_history,
tool_get_workspace_info,
tool_list_peers,
diff --git a/workspace/a2a_tools_messaging.py b/workspace/a2a_tools_messaging.py
index dea24f90..9b832a2b 100644
--- a/workspace/a2a_tools_messaging.py
+++ b/workspace/a2a_tools_messaging.py
@@ -101,6 +101,50 @@ async def _upload_chat_files(
return uploaded, None
+async def tool_broadcast_message(
+ message: str,
+ workspace_id: str | None = None,
+) -> str:
+ """Send a broadcast message to ALL agent workspaces in the org.
+
+ Requires the workspace to have broadcast_enabled=true (set by a user or
+ admin via PATCH /workspaces/:id/abilities). Use for urgent org-wide
+ signals — status changes, critical alerts, coordination instructions.
+ Every non-removed workspace receives the message in its activity log so
+ poll-mode agents pick it up, and push-mode canvases get a real-time
+ BROADCAST_MESSAGE WebSocket event.
+
+ Args:
+ message: The broadcast text. Keep it concise — all agents receive
+ this, so avoid lengthy prose that floods every context.
+ workspace_id: Optional. Which registered workspace to send the
+ broadcast from. Single-workspace agents omit this.
+ """
+ if not message:
+ return "Error: message is required"
+ target_workspace_id = (workspace_id or "").strip() or WORKSPACE_ID
+ try:
+ async with httpx.AsyncClient(timeout=30.0) as client:
+ resp = await client.post(
+ f"{PLATFORM_URL}/workspaces/{target_workspace_id}/broadcast",
+ json={"message": message},
+ headers=_auth_headers_for_heartbeat(target_workspace_id),
+ )
+ if resp.status_code == 200:
+ data = resp.json()
+ delivered = data.get("delivered", "?")
+ return f"Broadcast sent to {delivered} workspace(s)"
+ if resp.status_code == 403:
+ try:
+ hint = resp.json().get("hint", "")
+ except Exception:
+ hint = ""
+ return f"Error: broadcast ability not enabled.{(' ' + hint) if hint else ''}"
+ return f"Error: platform returned {resp.status_code}"
+ except Exception as e:
+ return f"Error sending broadcast: {e}"
+
+
async def tool_send_message_to_user(
message: str,
attachments: list[str] | None = None,
@@ -151,6 +195,20 @@ async def tool_send_message_to_user(
if uploaded:
return f"Message sent to user with {len(uploaded)} attachment(s)"
return "Message sent to user"
+ if resp.status_code == 403:
+ try:
+ body = resp.json()
+ if body.get("error") == "talk_to_user_disabled":
+ hint = body.get("hint", "")
+ return (
+ "Error: this workspace is not allowed to send messages "
+ "directly to the user (talk_to_user is disabled). "
+ + (hint + " " if hint else "")
+ + "Use delegate_task to forward your update to a parent "
+ "or supervisor workspace that can reach the user."
+ )
+ except Exception:
+ pass
return f"Error: platform returned {resp.status_code}"
except Exception as e:
return f"Error sending message: {e}"
diff --git a/workspace/executor_helpers.py b/workspace/executor_helpers.py
index 3343dee5..aba334f9 100644
--- a/workspace/executor_helpers.py
+++ b/workspace/executor_helpers.py
@@ -340,6 +340,10 @@ _CLI_A2A_COMMAND_KEYWORDS: dict[str, str | None] = {
"delegate_task_async": "delegate --async",
"check_task_status": "status",
"get_workspace_info": "info",
+ # `broadcast_message` is not exposed via the CLI subprocess interface
+ # today — it's an MCP-first capability. If a2a_cli grows a `broadcast`
+ # subcommand, map it here and the alignment test will gate the change.
+ "broadcast_message": None,
# `send_message_to_user` is not exposed via the CLI subprocess
# interface today — it requires a structured `attachments` field
# that wouldn't survive a positional-arg shell invocation cleanly.
diff --git a/workspace/platform_tools/registry.py b/workspace/platform_tools/registry.py
index f4fa773e..6550c9e7 100644
--- a/workspace/platform_tools/registry.py
+++ b/workspace/platform_tools/registry.py
@@ -51,6 +51,7 @@ from dataclasses import dataclass
from typing import Any, Literal
from a2a_tools import (
+ tool_broadcast_message,
tool_chat_history,
tool_check_task_status,
tool_commit_memory,
@@ -288,6 +289,44 @@ _GET_WORKSPACE_INFO = ToolSpec(
section=A2A_SECTION,
)
+_BROADCAST_MESSAGE = ToolSpec(
+ name="broadcast_message",
+ short=(
+ "Send a message to ALL agent workspaces in the org simultaneously. "
+ "Requires broadcast_enabled=true on this workspace (set by user/admin)."
+ ),
+ when_to_use=(
+ "Use for urgent, org-wide signals: critical status changes, emergency "
+ "stop instructions, coordinated task announcements. Every non-removed "
+ "workspace receives the message in its activity log (poll-mode agents "
+ "see it on their next poll; push-mode canvases get a real-time banner). "
+ "This tool returns an error if broadcast_enabled is false — a user or "
+ "admin must enable it via the workspace abilities settings first."
+ ),
+ input_schema={
+ "type": "object",
+ "properties": {
+ "message": {
+ "type": "string",
+ "description": (
+ "The broadcast text. Keep it concise — every agent in the "
+ "org receives this in their activity feed."
+ ),
+ },
+ "workspace_id": {
+ "type": "string",
+ "description": (
+ "Optional. Multi-workspace mode: the registered workspace "
+ "to broadcast from. Single-workspace agents omit this."
+ ),
+ },
+ },
+ "required": ["message"],
+ },
+ impl=tool_broadcast_message,
+ section=A2A_SECTION,
+)
+
_SEND_MESSAGE_TO_USER = ToolSpec(
name="send_message_to_user",
short=(
@@ -603,6 +642,7 @@ TOOLS: list[ToolSpec] = [
_CHECK_TASK_STATUS,
_LIST_PEERS,
_GET_WORKSPACE_INFO,
+ _BROADCAST_MESSAGE,
_SEND_MESSAGE_TO_USER,
# Inbox (standalone-only; in-container returns informational error)
_WAIT_FOR_MESSAGE,
diff --git a/workspace/tests/snapshots/a2a_instructions_mcp.txt b/workspace/tests/snapshots/a2a_instructions_mcp.txt
index 6bcf471e..3f0213e1 100644
--- a/workspace/tests/snapshots/a2a_instructions_mcp.txt
+++ b/workspace/tests/snapshots/a2a_instructions_mcp.txt
@@ -5,6 +5,7 @@
- **check_task_status**: Poll the status of a task started with delegate_task_async; returns result when done.
- **list_peers**: List the workspaces this agent can communicate with — name, ID, status, role for each.
- **get_workspace_info**: Get this workspace's own info — ID, name, role, tier, parent, status.
+- **broadcast_message**: Send a message to ALL agent workspaces in the org simultaneously. Requires broadcast_enabled=true on this workspace (set by user/admin).
- **send_message_to_user**: Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes, (4) attach files (zip, pdf, csv, image) for the user to download via the `attachments` field (NEVER paste file URLs in `message`). The message appears in the user's chat as if you're proactively reaching out.
- **wait_for_message**: Block until the next inbound message (canvas user OR peer agent) arrives, or until ``timeout_secs`` elapses.
- **inbox_peek**: List pending inbound messages without removing them.
@@ -26,6 +27,9 @@ Call this first when you need to delegate but don't know the target's ID. Access
### get_workspace_info
Use to introspect your own identity (e.g. before reporting back to the user, or to determine whether you're a tier-0 root that can write GLOBAL memory).
+### broadcast_message
+Use for urgent, org-wide signals: critical status changes, emergency stop instructions, coordinated task announcements. Every non-removed workspace receives the message in its activity log (poll-mode agents see it on their next poll; push-mode canvases get a real-time banner). This tool returns an error if broadcast_enabled is false — a user or admin must enable it via the workspace abilities settings first.
+
### send_message_to_user
Use proactively across the lifecycle of a task — early to acknowledge, mid-flight to update, late to deliver. Never paste file URLs in the message body — always pass absolute paths in `attachments` so the platform serves them as download chips (works on SaaS where external file hosts are unreachable).