Compare commits

..

2 Commits

Author SHA1 Message Date
fullstack-engineer 1b9e69b309 test(handlers): add queueRowAuthFields + additional GetA2AQueueStatus coverage
CI / all-required (pull_request) Blocked by required conditions
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 31s
CI / Detect changes (pull_request) Successful in 34s
Harness Replays / detect-changes (pull_request) Successful in 32s
E2E API Smoke Test / detect-changes (pull_request) Successful in 46s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 46s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 37s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 51s
gate-check-v3 / gate-check (pull_request) Successful in 33s
qa-review / approved (pull_request) Successful in 35s
sop-tier-check / tier-check (pull_request) Successful in 36s
sop-checklist / all-items-acked (pull_request) Successful in 40s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m47s
CI / Canvas (Next.js) (pull_request) Failing after 10m22s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / Platform (Go) (pull_request) Failing after 10m44s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 12s
CI / Python Lint & Test (pull_request) Successful in 11s
Harness Replays / Harness Replays (pull_request) Successful in 7s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 13s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Failing after 6m55s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Failing after 7m4s
security-review / approved (pull_request) Has been cancelled
Extends fix/a2a-queue-status-coverage with:
- TestQueueRowAuthFields_Success_BothPresent (internal helper success path)
- TestQueueRowAuthFields_NoRows_ReturnsErrNoRows
- TestQueueRowAuthFields_QueryError_ReturnsError
- TestGetA2AQueueStatus_AuthPass_CallerMatchesCallerID (caller_id auth path)
- TestGetA2AQueueStatus_AuthPass_OrgTokenBypassesAuth (org-level token bypass)
- TestGetA2AQueueStatus_StatusQueryNoRows_NotFound (race-to-404)
- TestGetA2AQueueStatus_ResponseBodyIncludedWhenCompleted

All 30 platform packages pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-16 14:23:21 +00:00
fullstack-engineer 0967ed908d test(handlers): add coverage for QueueDepth, QueueStatusByID, GetA2AQueueStatus, emitOrgEvent nil-payload
a2a_queue_status.go had 0% coverage across all 3 exported symbols.
Added 14 tests exercising:

QueueDepth (package-level):
- TestQueueDepth_Success: COUNT returns 7
- TestQueueDepth_EmptyQueue: COUNT returns 0
- TestQueueDepth_QueryError_ReturnsZero: DB error → returns 0 (non-fatal)

QueueStatusByID (package-level):
- TestQueueStatusByID_Success: fully-populated QueueStatus from LEFT JOIN
- TestQueueStatusByID_CompletedWithResponse: completed item populates ResponseBody
- TestQueueStatusByID_ErrNoRows: sql.ErrNoRows propagates
- TestQueueStatusByID_QueryError: DB error propagates

GetA2AQueueStatus (HTTP handler):
- TestGetA2AQueueStatus_MissingQueueID_Returns400
- TestGetA2AQueueStatus_NoIdentity_Returns404 (not 401 per design)
- TestGetA2AQueueStatus_QueueNotFound_Returns404
- TestGetA2AQueueStatus_UnauthorizedCaller_Returns404 (not 403 per design)
- TestGetA2AQueueStatus_AuthorizedAsTarget_Success
- TestGetA2AQueueStatus_QueueRowLookupError_Returns500
- TestGetA2AQueueStatus_StatusFetchError_Returns500

org_import_reconcile_test.go:
- TestEmitOrgEvent_NilPayloadInitializesEmptyMap: exercises the
  payload == nil branch so the empty-map init path is covered.

All tests pass; full suite: 69.1% → 69.7%.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-16 14:23:21 +00:00
9 changed files with 646 additions and 653 deletions
@@ -16,40 +16,7 @@ interface TokensTabProps {
workspaceId: string;
}
// The settings panel passes the literal sentinel "global" when no canvas
// node is selected. Workspace tokens are inherently per-workspace — there
// is no /workspaces/global/tokens endpoint (querying the uuid column with
// "global" 500s on Postgres). The org-wide equivalent lives in the
// separate "Org API Keys" tab. Mirrors the sentinel-awareness that
// api/secrets.ts already has (workspaceId === 'global' → /settings/secrets).
const GLOBAL_WORKSPACE_ID = 'global';
export function TokensTab({ workspaceId }: TokensTabProps) {
if (workspaceId === GLOBAL_WORKSPACE_ID) {
return (
<div className="p-4 space-y-4">
<div>
<h3 className="text-sm font-semibold text-ink">API Tokens</h3>
<p className="text-[10px] text-ink-mid mt-0.5">
Bearer tokens for authenticating API calls to this workspace.
</p>
</div>
<div className="text-center py-6">
<p className="text-xs text-ink-mid">Select a workspace node first</p>
<p className="text-[10px] text-ink-mid mt-1">
Workspace tokens are scoped to a single workspace. Select a node
on the canvas to manage its tokens, or use the{' '}
<span className="text-accent font-medium">Org API Keys</span> tab
for org-wide API keys.
</p>
</div>
</div>
);
}
return <WorkspaceTokensTab workspaceId={workspaceId} />;
}
function WorkspaceTokensTab({ workspaceId }: TokensTabProps) {
const [tokens, setTokens] = useState<Token[]>([]);
const [loading, setLoading] = useState(true);
const [creating, setCreating] = useState(false);
@@ -302,35 +302,3 @@ describe("TokensTab — error", () => {
expect(document.querySelector('[role="status"]')).toBeNull();
});
});
// ─── "global" sentinel (no node selected) ────────────────────────────────────
//
// Regression: SettingsPanel passes the literal "global" when no canvas
// node is selected. workspace tokens are per-workspace and there is no
// /workspaces/global/tokens endpoint — calling it 500'd
// ("invalid input syntax for type uuid: global"). The tab must NOT call
// the API in that state and must point the user at the Org API Keys tab.
describe("TokensTab — global sentinel (no node selected)", () => {
beforeEach(() => {
mockApiGet.mockReset();
mockApiPost.mockReset();
mockApiGet.mockRejectedValue(new Error("should not be called"));
});
it("does not call the API and shows a pointer to Org API Keys", async () => {
render(<TokensTab workspaceId="global" />);
await flush();
expect(mockApiGet).not.toHaveBeenCalled();
expect(mockApiPost).not.toHaveBeenCalled();
expect(document.body.textContent).toContain("Select a workspace node");
expect(document.body.textContent).toContain("Org API Keys");
// No error banner, no scary 500 surfacing.
expect(document.querySelector(".text-bad")).toBeNull();
});
it("has no create button in the global state", async () => {
render(<TokensTab workspaceId="global" />);
await flush();
expect(document.body.textContent).not.toContain("New Token");
});
});
@@ -1,7 +1,16 @@
package handlers
import (
"context"
"database/sql"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// TestExtractExpiresInSeconds covers the JSON parser used at enqueue time
@@ -58,3 +67,597 @@ func TestExtractExpiresInSeconds(t *testing.T) {
})
}
}
// ─── QueueDepth ─────────────────────────────────────────────────────────────
// TestQueueDepth_Success verifies QueueDepth returns the COUNT of queued items
// for a workspace.
func TestQueueDepth_Success(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM a2a_queue WHERE workspace_id = \$1 AND status = 'queued'`).
WithArgs("ws-queue-depth-1").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(7))
got := QueueDepth(context.Background(), "ws-queue-depth-1")
if got != 7 {
t.Errorf("QueueDepth() = %d; want 7", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// TestQueueDepth_EmptyQueue returns 0 when no queued items exist.
func TestQueueDepth_EmptyQueue(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM a2a_queue WHERE workspace_id = \$1 AND status = 'queued'`).
WithArgs("ws-empty").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
got := QueueDepth(context.Background(), "ws-empty")
if got != 0 {
t.Errorf("QueueDepth() = %d; want 0", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// TestQueueDepth_QueryError returns 0 on DB error (non-fatal; caller only uses
// the count for display purposes).
func TestQueueDepth_QueryError_ReturnsZero(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM a2a_queue WHERE workspace_id = \$1 AND status = 'queued'`).
WithArgs("ws-err").
WillReturnError(errors.New("connection refused"))
// QueueDepth swallows the error and returns 0.
got := QueueDepth(context.Background(), "ws-err")
if got != 0 {
t.Errorf("QueueDepth() on error = %d; want 0", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// ─── QueueStatusByID ────────────────────────────────────────────────────────
// TestQueueStatusByID_Success verifies QueueStatusByID returns a fully-populated
// QueueStatus from the LEFT JOIN of a2a_queue and activity_logs.
func TestQueueStatusByID_Success(t *testing.T) {
mock := setupTestDB(t)
// The LEFT JOIN query returns all queue columns + NULL for activity_logs
// when no delegation row exists.
mock.ExpectQuery(`SELECT\s+q\.id,\s+q\.workspace_id,\s+q\.status,\s+q\.priority,\s+q\.attempts,\s+q\.last_error,\s+q\.enqueued_at::text,\s+q\.dispatched_at::text,\s+q\.completed_at::text,\s+q\.expires_at::text,\s+al\.response_body::text\s+FROM a2a_queue q\s+LEFT JOIN activity_logs al`).
WithArgs("queue-ok-1").
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "status", "priority", "attempts",
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
"response_body",
}).AddRow(
"queue-ok-1", "ws-1", "queued", 50, 1,
nil, "2026-05-16T10:00:00Z", nil, nil, "2026-05-16T12:00:00Z",
nil,
))
qs, err := QueueStatusByID(context.Background(), "queue-ok-1")
if err != nil {
t.Fatalf("QueueStatusByID() error = %v; want nil", err)
}
if qs.ID != "queue-ok-1" {
t.Errorf("ID = %q; want queue-ok-1", qs.ID)
}
if qs.WorkspaceID != "ws-1" {
t.Errorf("WorkspaceID = %q; want ws-1", qs.WorkspaceID)
}
if qs.Status != "queued" {
t.Errorf("Status = %q; want queued", qs.Status)
}
if qs.Priority != 50 {
t.Errorf("Priority = %d; want 50", qs.Priority)
}
if qs.Attempts != 1 {
t.Errorf("Attempts = %d; want 1", qs.Attempts)
}
if qs.LastError != nil {
t.Errorf("LastError = %v; want nil", qs.LastError)
}
if qs.EnqueuedAt != "2026-05-16T10:00:00Z" {
t.Errorf("EnqueuedAt = %q; want 2026-05-16T10:00:00Z", qs.EnqueuedAt)
}
if qs.DispatchedAt != nil {
t.Errorf("DispatchedAt = %v; want nil", qs.DispatchedAt)
}
if qs.CompletedAt != nil {
t.Errorf("CompletedAt = %v; want nil", qs.CompletedAt)
}
if *qs.ExpiresAt != "2026-05-16T12:00:00Z" {
t.Errorf("ExpiresAt = %v; want 2026-05-16T12:00:00Z", qs.ExpiresAt)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// TestQueueStatusByID_CompletedWithResponse verifies that a completed queue item
// populates ResponseBody from the LEFT JOINed activity_logs row.
func TestQueueStatusByID_CompletedWithResponse(t *testing.T) {
mock := setupTestDB(t)
respBody := `{"result":"done"}`
mock.ExpectQuery(`SELECT\s+q\.id`).
WithArgs("queue-done-1").
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "status", "priority", "attempts",
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
"response_body",
}).AddRow(
"queue-done-1", "ws-1", "completed", 50, 1,
nil, "2026-05-16T10:00:00Z", "2026-05-16T10:01:00Z", "2026-05-16T10:02:00Z", nil,
respBody,
))
qs, err := QueueStatusByID(context.Background(), "queue-done-1")
if err != nil {
t.Fatalf("QueueStatusByID() error = %v; want nil", err)
}
if qs.Status != "completed" {
t.Errorf("Status = %q; want completed", qs.Status)
}
if qs.ResponseBody == nil {
t.Fatal("ResponseBody = nil; want non-nil for completed item")
}
var resp map[string]interface{}
if err := json.Unmarshal(qs.ResponseBody, &resp); err != nil {
t.Fatalf("ResponseBody not valid JSON: %v", err)
}
if resp["result"] != "done" {
t.Errorf("ResponseBody result = %v; want done", resp["result"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// TestQueueStatusByID_ErrNoRows returns sql.ErrNoRows when the queue ID doesn't exist.
func TestQueueStatusByID_ErrNoRows(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT\s+q\.id`).
WithArgs("queue-missing").
WillReturnError(sql.ErrNoRows)
_, err := QueueStatusByID(context.Background(), "queue-missing")
if !errors.Is(err, sql.ErrNoRows) {
t.Errorf("QueueStatusByID() error = %v; want sql.ErrNoRows", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// TestQueueStatusByID_QueryError propagates DB errors as-is.
func TestQueueStatusByID_QueryError(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT\s+q\.id`).
WithArgs("queue-err").
WillReturnError(errors.New("connection refused"))
_, err := QueueStatusByID(context.Background(), "queue-err")
if err == nil {
t.Fatal("QueueStatusByID() error = nil; want non-nil")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// ─── GetA2AQueueStatus (HTTP handler) ─────────────────────────────────────
func newGetA2AQueueStatusHarness(t *testing.T) (sqlmock.Sqlmock, *httptest.ResponseRecorder, *gin.Context) {
mock := setupTestDB(t)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
return mock, w, c
}
func TestGetA2AQueueStatus_MissingQueueID_Returns400(t *testing.T) {
_, w, c := newGetA2AQueueStatusHarness(t)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: ""}}
c.Request = httptest.NewRequest("GET", "/", nil)
h := newHandlerWithTestDeps(t)
h.GetA2AQueueStatus(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
func TestGetA2AQueueStatus_NoIdentity_Returns404(t *testing.T) {
_, w, c := newGetA2AQueueStatusHarness(t)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-123"}}
c.Request = httptest.NewRequest("GET", "/", nil)
h := newHandlerWithTestDeps(t)
h.GetA2AQueueStatus(c)
// Returns 404 (not 401) per the existence-non-inference policy.
if w.Code != http.StatusNotFound {
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
}
}
func TestGetA2AQueueStatus_QueueNotFound_Returns404(t *testing.T) {
mock, w, c := newGetA2AQueueStatusHarness(t)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-404"}}
c.Request = httptest.NewRequest("GET", "/", nil)
c.Request.Header.Set("X-Workspace-ID", "ws-1")
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
WithArgs("q-404").
WillReturnError(sql.ErrNoRows)
h := newHandlerWithTestDeps(t)
h.GetA2AQueueStatus(c)
if w.Code != http.StatusNotFound {
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestGetA2AQueueStatus_UnauthorizedCaller_Returns404(t *testing.T) {
mock, w, c := newGetA2AQueueStatusHarness(t)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-unauth"}}
c.Request = httptest.NewRequest("GET", "/", nil)
c.Request.Header.Set("X-Workspace-ID", "ws-wrong")
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
WithArgs("q-unauth").
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
AddRow("ws-caller-a", "ws-target-b"))
h := newHandlerWithTestDeps(t)
h.GetA2AQueueStatus(c)
// Returns 404 per the existence-non-inference policy.
if w.Code != http.StatusNotFound {
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestGetA2AQueueStatus_AuthorizedAsTarget_Success(t *testing.T) {
mock, w, c := newGetA2AQueueStatusHarness(t)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-ok"}}
c.Request = httptest.NewRequest("GET", "/", nil)
c.Request.Header.Set("X-Workspace-ID", "ws-target")
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
WithArgs("q-ok").
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
AddRow("ws-caller", "ws-target"))
mock.ExpectQuery(`SELECT\s+q\.id`).
WithArgs("q-ok").
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "status", "priority", "attempts",
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
"response_body",
}).AddRow(
"q-ok", "ws-target", "queued", 50, 1,
nil, "2026-05-16T10:00:00Z", nil, nil, nil,
nil,
))
h := newHandlerWithTestDeps(t)
h.GetA2AQueueStatus(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var qs QueueStatus
if err := json.Unmarshal(w.Body.Bytes(), &qs); err != nil {
t.Fatalf("body parse: %v", err)
}
if qs.ID != "q-ok" {
t.Errorf("queue_id = %q; want q-ok", qs.ID)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestGetA2AQueueStatus_QueueRowLookupError_Returns500(t *testing.T) {
mock, w, c := newGetA2AQueueStatusHarness(t)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-lookup-err"}}
c.Request = httptest.NewRequest("GET", "/", nil)
c.Request.Header.Set("X-Workspace-ID", "ws-1")
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
WithArgs("q-lookup-err").
WillReturnError(errors.New("connection refused"))
h := newHandlerWithTestDeps(t)
h.GetA2AQueueStatus(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestGetA2AQueueStatus_StatusFetchError_Returns500(t *testing.T) {
mock, w, c := newGetA2AQueueStatusHarness(t)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-status-err"}}
c.Request = httptest.NewRequest("GET", "/", nil)
c.Request.Header.Set("X-Workspace-ID", "ws-1")
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
WithArgs("q-status-err").
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
AddRow("ws-1", "ws-1"))
mock.ExpectQuery(`SELECT\s+q\.id`).
WithArgs("q-status-err").
WillReturnError(errors.New("connection refused"))
h := newHandlerWithTestDeps(t)
h.GetA2AQueueStatus(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// ─── queueRowAuthFields (internal helper) ─────────────────────────────────────
// Covers the auth-only 2-col SELECT used by GetA2AQueueStatus to determine
// whether the caller has access before projecting the public status fields.
func TestQueueRowAuthFields_Success_BothPresent(t *testing.T) {
mock := setupTestDB(t)
queueID := "qqqqqqqq-0003-0003-0003-000000000003"
rows := sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
AddRow("ws-caller-3", "ws-target-3")
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
WithArgs(queueID).
WillReturnRows(rows)
callerID, workspaceID, err := queueRowAuthFields(context.Background(), queueID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if callerID != "ws-caller-3" {
t.Errorf("callerID = %q, want %q", callerID, "ws-caller-3")
}
if workspaceID != "ws-target-3" {
t.Errorf("workspaceID = %q, want %q", workspaceID, "ws-target-3")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestQueueRowAuthFields_NoRows_ReturnsErrNoRows(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
WithArgs("qqqqqqqq-missing").
WillReturnError(sql.ErrNoRows)
_, _, err := queueRowAuthFields(context.Background(), "qqqqqqqq-missing")
if !errors.Is(err, sql.ErrNoRows) {
t.Errorf("expected sql.ErrNoRows, got %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestQueueRowAuthFields_QueryError_ReturnsError(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
WithArgs("qqqqqqqq-dberr").
WillReturnError(sql.ErrConnDone)
_, _, err := queueRowAuthFields(context.Background(), "qqqqqqqq-dberr")
if err == nil {
t.Fatal("expected error, got nil")
}
if errors.Is(err, sql.ErrNoRows) {
t.Error("expected non-no-rows error, got sql.ErrNoRows")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ─── Additional GetA2AQueueStatus coverage ─────────────────────────────────────
// TestGetA2AQueueStatus_AuthPass_CallerMatchesCallerID verifies that a caller
// whose workspace matches queue.caller_id (not just workspace_id) passes auth
// and receives the status. This path is distinct from the existing "authorized
// as target" test which covers workspace_id = caller.
func TestGetA2AQueueStatus_AuthPass_CallerMatchesCallerID(t *testing.T) {
mock, w, c := newGetA2AQueueStatusHarness(t)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-caller-match"}}
c.Request = httptest.NewRequest("GET", "/", nil)
c.Request.Header.Set("X-Workspace-ID", "ws-caller-match")
// Queue row: ws-caller-match is the caller, ws-other-target is the target.
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
WithArgs("q-caller-match").
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
AddRow("ws-caller-match", "ws-other-target"))
mock.ExpectQuery(`SELECT\s+q\.id`).
WithArgs("q-caller-match").
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "status", "priority", "attempts",
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
"response_body",
}).AddRow(
"q-caller-match", "ws-other-target", "queued", 50, 0,
nil, "2026-05-16T10:00:00Z", nil, nil, nil,
nil,
))
h := newHandlerWithTestDeps(t)
h.GetA2AQueueStatus(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var qs QueueStatus
json.Unmarshal(w.Body.Bytes(), &qs)
if qs.ID != "q-caller-match" {
t.Errorf("queue_id = %q; want q-caller-match", qs.ID)
}
if qs.Status != "queued" {
t.Errorf("status = %q; want queued", qs.Status)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// TestGetA2AQueueStatus_AuthPass_OrgTokenBypassesAuth verifies that an org-level
// token (canvas/admin) bypasses the caller_id / workspace_id match entirely.
// No X-Workspace-ID header is required; org_token_id in context is sufficient.
func TestGetA2AQueueStatus_AuthPass_OrgTokenBypassesAuth(t *testing.T) {
mock, w, c := newGetA2AQueueStatusHarness(t)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-org-bypass"}}
c.Request = httptest.NewRequest("GET", "/", nil)
// No X-Workspace-ID header — org token is set via context instead.
c.Set("org_token_id", "org-admin-1")
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
WithArgs("q-org-bypass").
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
AddRow("ws-anyone", "ws-anyone"))
mock.ExpectQuery(`SELECT\s+q\.id`).
WithArgs("q-org-bypass").
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "status", "priority", "attempts",
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
"response_body",
}).AddRow(
"q-org-bypass", "ws-anyone", "queued", 25, 0,
nil, "2026-05-16T10:00:00Z", nil, nil, nil,
nil,
))
h := newHandlerWithTestDeps(t)
h.GetA2AQueueStatus(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// TestGetA2AQueueStatus_StatusQueryNoRows_NotFound covers the theoretical race:
// queue row exists (auth check passes), but is deleted before QueueStatusByID runs.
// Handler returns 404 (not 500) — matching the existence-non-inference policy.
func TestGetA2AQueueStatus_StatusQueryNoRows_NotFound(t *testing.T) {
mock, w, c := newGetA2AQueueStatusHarness(t)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-race-no-rows"}}
c.Request = httptest.NewRequest("GET", "/", nil)
c.Request.Header.Set("X-Workspace-ID", "ws-caller")
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
WithArgs("q-race-no-rows").
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
AddRow("ws-caller", "ws-target"))
// Status query returns no rows — row was deleted between auth check and status fetch.
mock.ExpectQuery(`SELECT\s+q\.id`).
WithArgs("q-race-no-rows").
WillReturnError(sql.ErrNoRows)
h := newHandlerWithTestDeps(t)
h.GetA2AQueueStatus(c)
if w.Code != http.StatusNotFound {
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// TestGetA2AQueueStatus_ResponseBodyIncludedWhenCompleted confirms that a completed
// queue item surfaces response_body from activity_logs in the HTTP response body.
func TestGetA2AQueueStatus_ResponseBodyIncludedWhenCompleted(t *testing.T) {
mock, w, c := newGetA2AQueueStatusHarness(t)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-completed-body"}}
c.Request = httptest.NewRequest("GET", "/", nil)
c.Request.Header.Set("X-Workspace-ID", "ws-caller")
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
WithArgs("q-completed-body").
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
AddRow("ws-caller", "ws-target"))
respBody := `{"result":{"status":"ok","reply":"hello world"}}`
mock.ExpectQuery(`SELECT\s+q\.id`).
WithArgs("q-completed-body").
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "status", "priority", "attempts",
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
"response_body",
}).AddRow(
"q-completed-body", "ws-target", "completed", 50, 1,
nil, "2026-05-16T10:00:00Z", "2026-05-16T10:01:00Z", "2026-05-16T10:02:00Z", nil,
respBody,
))
h := newHandlerWithTestDeps(t)
h.GetA2AQueueStatus(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var qs QueueStatus
json.Unmarshal(w.Body.Bytes(), &qs)
if qs.ResponseBody == nil {
t.Fatal("ResponseBody should be set for completed status")
}
if string(qs.ResponseBody) != respBody {
t.Errorf("ResponseBody = %q, want %q", string(qs.ResponseBody), respBody)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
@@ -156,3 +156,20 @@ func equalStrings(a, b []string) bool {
}
return true
}
// TestEmitOrgEvent_NilPayload exercises the `if payload == nil` branch that
// re-initializes payload to an empty map before marshaling.
func TestEmitOrgEvent_NilPayloadInitializesEmptyMap(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectExec(`INSERT INTO structure_events`).
WithArgs("org.import.started", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(1, 1))
// Passing nil triggers: if payload == nil { payload = map[string]any{} }
emitOrgEvent(context.Background(), "org.import.started", nil)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
@@ -10,20 +10,8 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
// validWorkspaceID returns true when id is a syntactically valid UUID.
// workspace_id is a `uuid` column; passing a non-UUID (e.g. the canvas
// "global" sentinel sent when no node is selected) makes Postgres raise
// `invalid input syntax for type uuid`, which previously leaked as an
// opaque 500. Reject up front with a clean 400 instead. Mirrors the
// uuid.Parse guard already used in handlers/activity.go.
func validWorkspaceID(id string) bool {
_, err := uuid.Parse(id)
return err == nil
}
// TokenHandler exposes user-facing token management for workspaces.
// Routes: GET/POST/DELETE /workspaces/:id/tokens (behind WorkspaceAuth).
type TokenHandler struct{}
@@ -43,10 +31,6 @@ type tokenListItem struct {
// never the plaintext or hash).
func (h *TokenHandler) List(c *gin.Context) {
workspaceID := c.Param("id")
if !validWorkspaceID(workspaceID) {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace id"})
return
}
limit := 50
if v := c.Query("limit"); v != "" {
@@ -69,7 +53,6 @@ func (h *TokenHandler) List(c *gin.Context) {
LIMIT $2 OFFSET $3
`, workspaceID, limit, offset)
if err != nil {
log.Printf("tokens: list query failed for workspace %s: %v", workspaceID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list tokens"})
return
}
@@ -102,10 +85,6 @@ const maxTokensPerWorkspace = 50
// exactly once in the response — it cannot be recovered afterwards.
func (h *TokenHandler) Create(c *gin.Context) {
workspaceID := c.Param("id")
if !validWorkspaceID(workspaceID) {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace id"})
return
}
// Rate limit: max active tokens per workspace
var count int
@@ -138,10 +117,6 @@ func (h *TokenHandler) Create(c *gin.Context) {
func (h *TokenHandler) Revoke(c *gin.Context) {
workspaceID := c.Param("id")
tokenID := c.Param("tokenId")
if !validWorkspaceID(workspaceID) {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace id"})
return
}
result, err := db.DB.ExecContext(c.Request.Context(), `
UPDATE workspace_auth_tokens
@@ -41,15 +41,6 @@ import (
func init() { gin.SetMode(gin.TestMode) }
// Workspace IDs are validated as UUIDs up front (tokens.go validWorkspaceID),
// so handler tests must pass syntactically valid UUIDs. Fixed values keep
// sqlmock WithArgs assertions deterministic.
const (
wsUUID1 = "11111111-1111-1111-1111-111111111111"
wsUUID2 = "22222222-2222-2222-2222-222222222222"
wsUUID3 = "33333333-3333-3333-3333-333333333333"
)
// withMockDB swaps `db.DB` for a sqlmock and returns the mock plus a
// restore func. Tests use this in place of setupTokenTestDB which
// skips on a missing real DB.
@@ -90,13 +81,13 @@ func TestTokenHandler_List_HappyPath(t *testing.T) {
created := time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC)
last := created.Add(time.Hour)
mock.ExpectQuery(`SELECT id, prefix, created_at, last_used_at\s+FROM workspace_auth_tokens`).
WithArgs(wsUUID1, 50, 0).
WithArgs("ws-1", 50, 0).
WillReturnRows(sqlmock.NewRows([]string{"id", "prefix", "created_at", "last_used_at"}).
AddRow("tok-1", "abc12345", created, last).
AddRow("tok-2", "def67890", created, nil))
w := makeReq(t, NewTokenHandler().List, "GET",
"/workspaces/ws-1/tokens", gin.Params{{Key: "id", Value: wsUUID1}})
"/workspaces/ws-1/tokens", gin.Params{{Key: "id", Value: "ws-1"}})
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
@@ -130,7 +121,7 @@ func TestTokenHandler_List_EmptyResult(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{"id", "prefix", "created_at", "last_used_at"}))
w := makeReq(t, NewTokenHandler().List, "GET",
"/workspaces/ws-2/tokens", gin.Params{{Key: "id", Value: wsUUID2}})
"/workspaces/ws-2/tokens", gin.Params{{Key: "id", Value: "ws-2"}})
if w.Code != http.StatusOK {
t.Fatalf("expected 200 on empty list, got %d", w.Code)
@@ -155,7 +146,7 @@ func TestTokenHandler_List_QueryError(t *testing.T) {
WillReturnError(errors.New("connection refused"))
w := makeReq(t, NewTokenHandler().List, "GET",
"/workspaces/ws-3/tokens", gin.Params{{Key: "id", Value: wsUUID3}})
"/workspaces/ws-3/tokens", gin.Params{{Key: "id", Value: "ws-3"}})
if w.Code != http.StatusInternalServerError {
t.Errorf("query error must surface as 500, got %d", w.Code)
@@ -167,13 +158,13 @@ func TestTokenHandler_List_RespectsLimit(t *testing.T) {
defer cleanup()
mock.ExpectQuery(`SELECT id, prefix, created_at, last_used_at`).
WithArgs(wsUUID1, 10, 5).
WithArgs("ws-1", 10, 5).
WillReturnRows(sqlmock.NewRows([]string{"id", "prefix", "created_at", "last_used_at"}))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/tokens?limit=10&offset=5", nil)
c.Params = gin.Params{{Key: "id", Value: wsUUID1}}
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
NewTokenHandler().List(c)
if w.Code != http.StatusOK {
@@ -195,7 +186,7 @@ func TestTokenHandler_List_ScanError(t *testing.T) {
AddRow("tok-1", "abc", "not-a-timestamp", nil))
w := makeReq(t, NewTokenHandler().List, "GET",
"/workspaces/ws-1/tokens", gin.Params{{Key: "id", Value: wsUUID1}})
"/workspaces/ws-1/tokens", gin.Params{{Key: "id", Value: "ws-1"}})
if w.Code != http.StatusInternalServerError {
t.Errorf("scan error must surface as 500, got %d: %s", w.Code, w.Body.String())
@@ -210,11 +201,11 @@ func TestTokenHandler_Create_RateLimited(t *testing.T) {
// Count query returns 50 (== max) → 429.
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`).
WithArgs(wsUUID1).
WithArgs("ws-1").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(50))
w := makeReq(t, NewTokenHandler().Create, "POST",
"/workspaces/ws-1/tokens", gin.Params{{Key: "id", Value: wsUUID1}})
"/workspaces/ws-1/tokens", gin.Params{{Key: "id", Value: "ws-1"}})
if w.Code != http.StatusTooManyRequests {
t.Errorf("max active tokens should 429, got %d", w.Code)
@@ -234,7 +225,7 @@ func TestTokenHandler_Create_IssueFails(t *testing.T) {
WillReturnError(errors.New("disk full"))
w := makeReq(t, NewTokenHandler().Create, "POST",
"/workspaces/ws-1/tokens", gin.Params{{Key: "id", Value: wsUUID1}})
"/workspaces/ws-1/tokens", gin.Params{{Key: "id", Value: "ws-1"}})
if w.Code != http.StatusInternalServerError {
t.Errorf("IssueToken DB error must 500, got %d", w.Code)
@@ -251,7 +242,7 @@ func TestTokenHandler_Create_HappyPath(t *testing.T) {
WillReturnResult(sqlmock.NewResult(1, 1))
w := makeReq(t, NewTokenHandler().Create, "POST",
"/workspaces/ws-1/tokens", gin.Params{{Key: "id", Value: wsUUID1}})
"/workspaces/ws-1/tokens", gin.Params{{Key: "id", Value: "ws-1"}})
if w.Code != http.StatusCreated {
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
@@ -266,7 +257,7 @@ func TestTokenHandler_Create_HappyPath(t *testing.T) {
if body.AuthToken == "" {
t.Errorf("auth_token must be present and non-empty in response")
}
if body.WorkspaceID != wsUUID1 {
if body.WorkspaceID != "ws-1" {
t.Errorf("workspace_id mismatch: %q", body.WorkspaceID)
}
}
@@ -278,12 +269,12 @@ func TestTokenHandler_Revoke_HappyPath(t *testing.T) {
defer cleanup()
mock.ExpectExec(`UPDATE workspace_auth_tokens\s+SET revoked_at = now\(\)`).
WithArgs("tok-1", wsUUID1).
WithArgs("tok-1", "ws-1").
WillReturnResult(sqlmock.NewResult(0, 1))
w := makeReq(t, NewTokenHandler().Revoke, "DELETE",
"/workspaces/ws-1/tokens/tok-1", gin.Params{
{Key: "id", Value: wsUUID1},
{Key: "id", Value: "ws-1"},
{Key: "tokenId", Value: "tok-1"},
})
@@ -298,12 +289,12 @@ func TestTokenHandler_Revoke_NotFound(t *testing.T) {
// 0 rows affected → token not found OR already revoked.
mock.ExpectExec(`UPDATE workspace_auth_tokens`).
WithArgs("tok-ghost", wsUUID1).
WithArgs("tok-ghost", "ws-1").
WillReturnResult(sqlmock.NewResult(0, 0))
w := makeReq(t, NewTokenHandler().Revoke, "DELETE",
"/workspaces/ws-1/tokens/tok-ghost", gin.Params{
{Key: "id", Value: wsUUID1},
{Key: "id", Value: "ws-1"},
{Key: "tokenId", Value: "tok-ghost"},
})
@@ -321,7 +312,7 @@ func TestTokenHandler_Revoke_DBError(t *testing.T) {
w := makeReq(t, NewTokenHandler().Revoke, "DELETE",
"/workspaces/ws-1/tokens/tok-1", gin.Params{
{Key: "id", Value: wsUUID1},
{Key: "id", Value: "ws-1"},
{Key: "tokenId", Value: "tok-1"},
})
@@ -330,59 +321,6 @@ func TestTokenHandler_Revoke_DBError(t *testing.T) {
}
}
// ---- UUID validation (regression: "global" sentinel 500) ------------
// The canvas Settings → Workspace Tokens tab sent the literal sentinel
// "global" as the workspace id when no node was selected. workspace_id
// is a `uuid` column, so the query raised
// `invalid input syntax for type uuid: "global"` which leaked as an
// opaque 500. List/Create/Revoke now reject any non-UUID id with a
// clean 400 before touching the DB. No DB expectation is set on the
// mock — a DB hit would fail ExpectationsWereMet, proving short-circuit.
func TestTokenHandler_RejectsNonUUIDWorkspaceID(t *testing.T) {
h := NewTokenHandler()
cases := []struct {
name string
run func(c *gin.Context)
method string
params gin.Params
}{
{"List", h.List, "GET", gin.Params{{Key: "id", Value: "global"}}},
{"Create", h.Create, "POST", gin.Params{{Key: "id", Value: "global"}}},
{"Revoke", h.Revoke, "DELETE", gin.Params{
{Key: "id", Value: "global"},
{Key: "tokenId", Value: "tok-1"},
}},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
mock, cleanup := withMockDB(t)
defer cleanup()
w := makeReq(t, tc.run, tc.method,
"/workspaces/global/tokens", tc.params)
if w.Code != http.StatusBadRequest {
t.Fatalf("%s with non-UUID id must 400, got %d: %s",
tc.name, w.Code, w.Body.String())
}
var body struct {
Error string `json:"error"`
}
_ = json.Unmarshal(w.Body.Bytes(), &body)
if body.Error != "invalid workspace id" {
t.Errorf("%s: want error=%q, got %q",
tc.name, "invalid workspace id", body.Error)
}
// No query/exec was expected → if the handler hit the DB
// this fails, proving the guard short-circuits before SQL.
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("%s leaked a DB call past the uuid guard: %v", tc.name, err)
}
})
}
}
// Compile-time noise removal: the imports list pulls in the sql /
// driver packages and the silenced ctx so a future scenario that
// needs them doesn't have to re-add the import. Documented here so
@@ -11,7 +11,6 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
func init() { gin.SetMode(gin.TestMode) }
@@ -168,14 +167,11 @@ func TestTokenHandler_RevokeWrongWorkspace(t *testing.T) {
h := NewTokenHandler()
// Try to revoke with a different (valid-UUID) workspace ID that does
// not own the token — should 404. A valid UUID is required so this
// exercises the ownership branch, not the up-front uuid-shape 400.
otherWS := uuid.NewString()
// Try to revoke with a different workspace ID — should 404
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: otherWS}, {Key: "tokenId", Value: tokenID}}
c.Request = httptest.NewRequest("DELETE", "/workspaces/"+otherWS+"/tokens/"+tokenID, nil)
c.Params = gin.Params{{Key: "id", Value: "wrong-workspace-id"}, {Key: "tokenId", Value: tokenID}}
c.Request = httptest.NewRequest("DELETE", "/workspaces/wrong/tokens/"+tokenID, nil)
h.Revoke(c)
if w.Code != http.StatusNotFound {
@@ -3,7 +3,7 @@ package handlers
// workspace_broadcast.go — POST /workspaces/:id/broadcast
//
// Allows a workspace with broadcast_enabled=true to send a message to every
// non-removed agent workspace in the SAME ORG. The message is:
// non-removed agent workspace in the org. The message is:
//
// • Persisted in each recipient's activity_logs (type='broadcast_receive')
// so poll-mode agents pick it up via GET /activity.
@@ -16,11 +16,6 @@ package handlers
// Auth: WorkspaceAuth (the agent triggers this with its own bearer token).
// The handler re-validates broadcast_enabled inside the DB lookup to prevent
// TOCTOU — the middleware only proved the token is valid, not the ability.
//
// Org isolation (OFFSEC-015): recipients are scoped to the sender's org using
// a recursive CTE that walks the parent_id chain to find the org root. This
// prevents a compromised or misconfigured workspace from broadcasting to
// workspaces in other tenants' orgs.
import (
"log"
@@ -79,49 +74,11 @@ func (h *BroadcastHandler) Broadcast(c *gin.Context) {
return
}
// Find the sender's org root by walking the parent_id chain.
// Workspaces with parent_id = NULL are org roots; every other workspace
// belongs to the org identified by its topmost ancestor.
var orgRootID string
err = db.DB.QueryRowContext(ctx, `
WITH RECURSIVE org_chain AS (
SELECT id, parent_id, id AS root_id
FROM workspaces
WHERE id = $1
UNION ALL
SELECT w.id, w.parent_id, c.root_id
FROM workspaces w
JOIN org_chain c ON w.id = c.parent_id
)
SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1
`, senderID).Scan(&orgRootID)
if err != nil {
log.Printf("Broadcast: org root lookup for %s: %v", senderID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
return
}
// Collect all non-removed agent workspaces in the SAME ORG (same root_id),
// excluding the sender itself.
rows, err := db.DB.QueryContext(ctx, `
WITH RECURSIVE org_chain AS (
SELECT id, parent_id, id AS root_id
FROM workspaces
WHERE parent_id IS NULL
UNION ALL
SELECT w.id, w.parent_id, c.root_id
FROM workspaces w
JOIN org_chain c ON w.parent_id = c.id
)
SELECT c.id
FROM org_chain c
WHERE c.root_id = $1
AND c.id != $2
AND EXISTS (
SELECT 1 FROM workspaces w
WHERE w.id = c.id AND w.status != 'removed'
)
`, orgRootID, senderID)
// Collect all non-removed agent workspaces (excludes the sender itself).
rows, err := db.DB.QueryContext(ctx,
`SELECT id FROM workspaces WHERE status != 'removed' AND id != $1`,
senderID,
)
if err != nil {
log.Printf("Broadcast: recipient query failed for %s: %v", senderID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
@@ -1,428 +0,0 @@
package handlers
import (
"bytes"
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// -------- Org-scoped recipient query tests (OFFSEC-015) --------
// TestBroadcast_OrgScopedRecipients verifies that a broadcast from Org-A does
// NOT reach workspaces belonging to Org-B. This is the core regression test
// for OFFSEC-015: the original query had no org filter, so a workspace in
// Org-A could broadcast to every non-removed workspace in the entire DB,
// including workspaces owned by other tenants.
func TestBroadcast_OrgScopedRecipients(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
// Org-A structure:
// org-a-root (parent_id = NULL) ← sender
// ├── ws-a-child
// Org-B structure:
// org-b-root (parent_id = NULL)
// └── ws-b-child
senderID := "00000000-0000-0000-0000-000000000001" // org-a-root
wsAChild := "00000000-0000-0000-0000-000000000002"
// ws-b-child is in Org-B (different root); the org-scoped query MUST NOT include it.
// 1. Sender lookup
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Org-A Root", true))
// 2. Org root lookup — sender is its own root (parent_id = NULL)
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
// 3. Org-scoped recipient query — MUST include org filter so ws-b-child is NOT included.
// The query joins on org_chain.root_id = orgRootID, which scopes to Org-A only.
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID, senderID). // orgRootID, senderID (EXCLUDED)
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(wsAChild)) // only Org-A child
// Activity log inserts
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(wsAChild, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"hello from org-a"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to unmarshal response: %v", err)
}
if resp["status"] != "sent" {
t.Errorf("expected status 'sent', got %v", resp["status"])
}
// ws-b-child is in a DIFFERENT org — the org-scoped query MUST NOT include it.
// If it were included, the mock would have an unmet expectation.
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations — cross-org workspace was included in broadcast: %v", err)
}
}
// TestBroadcast_OrgScoped_OrgRootSender verifies that when the sender IS the
// org root (parent_id = NULL), broadcasts still reach sibling workspaces.
func TestBroadcast_OrgScoped_OrgRootSender(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001" // org-a-root
siblingID := "00000000-0000-0000-0000-000000000002"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
// Sender is the org root — CTE returns sender's own ID as root
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
// Recipients in same org, excluding sender
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(siblingID))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(siblingID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"hello siblings"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestBroadcast_OrgScoped_ChildWorkspaceSender verifies that a non-root child
// workspace can broadcast to siblings in the same org.
func TestBroadcast_OrgScoped_ChildWorkspaceSender(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
orgRootID := "00000000-0000-0000-0000-000000000001"
senderID := "00000000-0000-0000-0000-000000000002" // child workspace
siblingID := "00000000-0000-0000-0000-000000000003"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Child Agent", true))
// Org root lookup — walk up to find org-a-root
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(orgRootID))
// Recipients: same org, excluding sender
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(orgRootID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(siblingID))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(siblingID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"child broadcasting"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// -------- Non-regression cases --------
func TestBroadcast_NotFound(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000099"
// UUID is valid, but no workspace row matches
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnError(errors.New("workspace not found"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"test"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusNotFound {
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
func TestBroadcast_Disabled(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Disabled Agent", false))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"should not send"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusForbidden {
t.Errorf("expected 403, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to unmarshal: %v", err)
}
if resp["error"] != "broadcast_disabled" {
t.Errorf("expected error 'broadcast_disabled', got %v", resp["error"])
}
}
func TestBroadcast_EmptyOrg_NoRecipients(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001" // org root, only workspace in org
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Lone Root", true))
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
// No other workspaces in this org
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"hello org"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to unmarshal: %v", err)
}
if resp["delivered"] != float64(0) {
t.Errorf("expected delivered=0, got %v", resp["delivered"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
func TestBroadcast_InvalidWorkspaceID(t *testing.T) {
setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "not-a-uuid"}}
body := `{"message":"test"}`
c.Request = httptest.NewRequest("POST", "/workspaces/not-a-uuid/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
func TestBroadcast_MissingMessage(t *testing.T) {
setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "00000000-0000-0000-0000-000000000001"}}
c.Request = httptest.NewRequest("POST", "/workspaces/00000000-0000-0000-0000-000000000001/broadcast", bytes.NewBufferString("{}"))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
// TestBroadcast_OrgRootLookupFails verifies that if the recursive CTE for
// finding the org root errors, the handler returns 500 instead of proceeding
// with an un-scoped query that would broadcast to all orgs.
func TestBroadcast_OrgRootLookupFails(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
// Org root CTE fails
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnError(context.DeadlineExceeded)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"should not broadcast"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
}
// The recipient query MUST NOT be called — it would broadcast cross-org
// if the org root lookup failed silently.
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestBroadcast_OrgScoped_SelfBroadcastExcluded verifies that broadcasting
// from a workspace does not send a broadcast_receive to the sender itself
// (the sender logs broadcast_sent, not broadcast_receive).
func TestBroadcast_OrgScoped_SelfBroadcastExcluded(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001"
peerID := "00000000-0000-0000-0000-000000000002"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
// Recipient query MUST exclude sender via id != senderID
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(peerID))
// Peer receives broadcast_receive
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(peerID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
// Sender logs broadcast_sent (NOT broadcast_receive)
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"no echo to self"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestBroadcast_Truncate tests that messages are truncated with the Unicode ellipsis
// TestBroadcast_Truncate tests that messages are truncated with the Unicode ellipsis
// character (U+2026) when len(msg) > max. The truncated output is max runes + "…",
// so truncating a 48-char string at max=20 produces 21 characters (20 runes + "…").
func TestBroadcast_Truncate(t *testing.T) {
cases := []struct {
msg string
max int
expect string
}{
{"short", 120, "short"}, // under max — no truncation
// exactly120chars (15) + 105 ones = 120 chars; at max=120 → unchanged
{"exactly120chars1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111", 120, "exactly120chars111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111…"},
// "this is a longer mes" = 20 runes; + "…" = 21 chars
{"this is a longer message that needs truncating", 20, "this is a longer mes…"},
// at-max boundary: 20 chars at max=20 → no truncation
{"exactly twenty chars", 20, "exactly twenty chars"},
// over max: 11 chars at max=10 → 10 + "…" = 11
{"hello world!", 10, "hello worl…"},
}
for _, tc := range cases {
result := broadcastTruncate(tc.msg, tc.max)
if result != tc.expect {
t.Errorf("broadcastTruncate(%q, %d) = %q; want %q", tc.msg, tc.max, result, tc.expect)
}
}
}