Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3a5c25530f |
@@ -1,16 +1,7 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// TestExtractExpiresInSeconds covers the JSON parser used at enqueue time
|
||||
@@ -67,597 +58,3 @@ func TestExtractExpiresInSeconds(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ─── QueueDepth ─────────────────────────────────────────────────────────────
|
||||
|
||||
// TestQueueDepth_Success verifies QueueDepth returns the COUNT of queued items
|
||||
// for a workspace.
|
||||
func TestQueueDepth_Success(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM a2a_queue WHERE workspace_id = \$1 AND status = 'queued'`).
|
||||
WithArgs("ws-queue-depth-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(7))
|
||||
|
||||
got := QueueDepth(context.Background(), "ws-queue-depth-1")
|
||||
if got != 7 {
|
||||
t.Errorf("QueueDepth() = %d; want 7", got)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestQueueDepth_EmptyQueue returns 0 when no queued items exist.
|
||||
func TestQueueDepth_EmptyQueue(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM a2a_queue WHERE workspace_id = \$1 AND status = 'queued'`).
|
||||
WithArgs("ws-empty").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
got := QueueDepth(context.Background(), "ws-empty")
|
||||
if got != 0 {
|
||||
t.Errorf("QueueDepth() = %d; want 0", got)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestQueueDepth_QueryError returns 0 on DB error (non-fatal; caller only uses
|
||||
// the count for display purposes).
|
||||
func TestQueueDepth_QueryError_ReturnsZero(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM a2a_queue WHERE workspace_id = \$1 AND status = 'queued'`).
|
||||
WithArgs("ws-err").
|
||||
WillReturnError(errors.New("connection refused"))
|
||||
|
||||
// QueueDepth swallows the error and returns 0.
|
||||
got := QueueDepth(context.Background(), "ws-err")
|
||||
if got != 0 {
|
||||
t.Errorf("QueueDepth() on error = %d; want 0", got)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ─── QueueStatusByID ────────────────────────────────────────────────────────
|
||||
|
||||
// TestQueueStatusByID_Success verifies QueueStatusByID returns a fully-populated
|
||||
// QueueStatus from the LEFT JOIN of a2a_queue and activity_logs.
|
||||
func TestQueueStatusByID_Success(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
// The LEFT JOIN query returns all queue columns + NULL for activity_logs
|
||||
// when no delegation row exists.
|
||||
mock.ExpectQuery(`SELECT\s+q\.id,\s+q\.workspace_id,\s+q\.status,\s+q\.priority,\s+q\.attempts,\s+q\.last_error,\s+q\.enqueued_at::text,\s+q\.dispatched_at::text,\s+q\.completed_at::text,\s+q\.expires_at::text,\s+al\.response_body::text\s+FROM a2a_queue q\s+LEFT JOIN activity_logs al`).
|
||||
WithArgs("queue-ok-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "status", "priority", "attempts",
|
||||
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
|
||||
"response_body",
|
||||
}).AddRow(
|
||||
"queue-ok-1", "ws-1", "queued", 50, 1,
|
||||
nil, "2026-05-16T10:00:00Z", nil, nil, "2026-05-16T12:00:00Z",
|
||||
nil,
|
||||
))
|
||||
|
||||
qs, err := QueueStatusByID(context.Background(), "queue-ok-1")
|
||||
if err != nil {
|
||||
t.Fatalf("QueueStatusByID() error = %v; want nil", err)
|
||||
}
|
||||
if qs.ID != "queue-ok-1" {
|
||||
t.Errorf("ID = %q; want queue-ok-1", qs.ID)
|
||||
}
|
||||
if qs.WorkspaceID != "ws-1" {
|
||||
t.Errorf("WorkspaceID = %q; want ws-1", qs.WorkspaceID)
|
||||
}
|
||||
if qs.Status != "queued" {
|
||||
t.Errorf("Status = %q; want queued", qs.Status)
|
||||
}
|
||||
if qs.Priority != 50 {
|
||||
t.Errorf("Priority = %d; want 50", qs.Priority)
|
||||
}
|
||||
if qs.Attempts != 1 {
|
||||
t.Errorf("Attempts = %d; want 1", qs.Attempts)
|
||||
}
|
||||
if qs.LastError != nil {
|
||||
t.Errorf("LastError = %v; want nil", qs.LastError)
|
||||
}
|
||||
if qs.EnqueuedAt != "2026-05-16T10:00:00Z" {
|
||||
t.Errorf("EnqueuedAt = %q; want 2026-05-16T10:00:00Z", qs.EnqueuedAt)
|
||||
}
|
||||
if qs.DispatchedAt != nil {
|
||||
t.Errorf("DispatchedAt = %v; want nil", qs.DispatchedAt)
|
||||
}
|
||||
if qs.CompletedAt != nil {
|
||||
t.Errorf("CompletedAt = %v; want nil", qs.CompletedAt)
|
||||
}
|
||||
if *qs.ExpiresAt != "2026-05-16T12:00:00Z" {
|
||||
t.Errorf("ExpiresAt = %v; want 2026-05-16T12:00:00Z", qs.ExpiresAt)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestQueueStatusByID_CompletedWithResponse verifies that a completed queue item
|
||||
// populates ResponseBody from the LEFT JOINed activity_logs row.
|
||||
func TestQueueStatusByID_CompletedWithResponse(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
respBody := `{"result":"done"}`
|
||||
mock.ExpectQuery(`SELECT\s+q\.id`).
|
||||
WithArgs("queue-done-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "status", "priority", "attempts",
|
||||
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
|
||||
"response_body",
|
||||
}).AddRow(
|
||||
"queue-done-1", "ws-1", "completed", 50, 1,
|
||||
nil, "2026-05-16T10:00:00Z", "2026-05-16T10:01:00Z", "2026-05-16T10:02:00Z", nil,
|
||||
respBody,
|
||||
))
|
||||
|
||||
qs, err := QueueStatusByID(context.Background(), "queue-done-1")
|
||||
if err != nil {
|
||||
t.Fatalf("QueueStatusByID() error = %v; want nil", err)
|
||||
}
|
||||
if qs.Status != "completed" {
|
||||
t.Errorf("Status = %q; want completed", qs.Status)
|
||||
}
|
||||
if qs.ResponseBody == nil {
|
||||
t.Fatal("ResponseBody = nil; want non-nil for completed item")
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(qs.ResponseBody, &resp); err != nil {
|
||||
t.Fatalf("ResponseBody not valid JSON: %v", err)
|
||||
}
|
||||
if resp["result"] != "done" {
|
||||
t.Errorf("ResponseBody result = %v; want done", resp["result"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestQueueStatusByID_ErrNoRows returns sql.ErrNoRows when the queue ID doesn't exist.
|
||||
func TestQueueStatusByID_ErrNoRows(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT\s+q\.id`).
|
||||
WithArgs("queue-missing").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
_, err := QueueStatusByID(context.Background(), "queue-missing")
|
||||
if !errors.Is(err, sql.ErrNoRows) {
|
||||
t.Errorf("QueueStatusByID() error = %v; want sql.ErrNoRows", err)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestQueueStatusByID_QueryError propagates DB errors as-is.
|
||||
func TestQueueStatusByID_QueryError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT\s+q\.id`).
|
||||
WithArgs("queue-err").
|
||||
WillReturnError(errors.New("connection refused"))
|
||||
|
||||
_, err := QueueStatusByID(context.Background(), "queue-err")
|
||||
if err == nil {
|
||||
t.Fatal("QueueStatusByID() error = nil; want non-nil")
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ─── GetA2AQueueStatus (HTTP handler) ─────────────────────────────────────
|
||||
|
||||
func newGetA2AQueueStatusHarness(t *testing.T) (sqlmock.Sqlmock, *httptest.ResponseRecorder, *gin.Context) {
|
||||
mock := setupTestDB(t)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
return mock, w, c
|
||||
}
|
||||
|
||||
func TestGetA2AQueueStatus_MissingQueueID_Returns400(t *testing.T) {
|
||||
_, w, c := newGetA2AQueueStatusHarness(t)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: ""}}
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
h.GetA2AQueueStatus(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetA2AQueueStatus_NoIdentity_Returns404(t *testing.T) {
|
||||
_, w, c := newGetA2AQueueStatusHarness(t)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-123"}}
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
h.GetA2AQueueStatus(c)
|
||||
|
||||
// Returns 404 (not 401) per the existence-non-inference policy.
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetA2AQueueStatus_QueueNotFound_Returns404(t *testing.T) {
|
||||
mock, w, c := newGetA2AQueueStatusHarness(t)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-404"}}
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
c.Request.Header.Set("X-Workspace-ID", "ws-1")
|
||||
|
||||
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
|
||||
WithArgs("q-404").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
h.GetA2AQueueStatus(c)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetA2AQueueStatus_UnauthorizedCaller_Returns404(t *testing.T) {
|
||||
mock, w, c := newGetA2AQueueStatusHarness(t)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-unauth"}}
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
c.Request.Header.Set("X-Workspace-ID", "ws-wrong")
|
||||
|
||||
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
|
||||
WithArgs("q-unauth").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
|
||||
AddRow("ws-caller-a", "ws-target-b"))
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
h.GetA2AQueueStatus(c)
|
||||
|
||||
// Returns 404 per the existence-non-inference policy.
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetA2AQueueStatus_AuthorizedAsTarget_Success(t *testing.T) {
|
||||
mock, w, c := newGetA2AQueueStatusHarness(t)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-ok"}}
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
c.Request.Header.Set("X-Workspace-ID", "ws-target")
|
||||
|
||||
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
|
||||
WithArgs("q-ok").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
|
||||
AddRow("ws-caller", "ws-target"))
|
||||
|
||||
mock.ExpectQuery(`SELECT\s+q\.id`).
|
||||
WithArgs("q-ok").
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "status", "priority", "attempts",
|
||||
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
|
||||
"response_body",
|
||||
}).AddRow(
|
||||
"q-ok", "ws-target", "queued", 50, 1,
|
||||
nil, "2026-05-16T10:00:00Z", nil, nil, nil,
|
||||
nil,
|
||||
))
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
h.GetA2AQueueStatus(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var qs QueueStatus
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &qs); err != nil {
|
||||
t.Fatalf("body parse: %v", err)
|
||||
}
|
||||
if qs.ID != "q-ok" {
|
||||
t.Errorf("queue_id = %q; want q-ok", qs.ID)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetA2AQueueStatus_QueueRowLookupError_Returns500(t *testing.T) {
|
||||
mock, w, c := newGetA2AQueueStatusHarness(t)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-lookup-err"}}
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
c.Request.Header.Set("X-Workspace-ID", "ws-1")
|
||||
|
||||
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
|
||||
WithArgs("q-lookup-err").
|
||||
WillReturnError(errors.New("connection refused"))
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
h.GetA2AQueueStatus(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetA2AQueueStatus_StatusFetchError_Returns500(t *testing.T) {
|
||||
mock, w, c := newGetA2AQueueStatusHarness(t)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-status-err"}}
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
c.Request.Header.Set("X-Workspace-ID", "ws-1")
|
||||
|
||||
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
|
||||
WithArgs("q-status-err").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
|
||||
AddRow("ws-1", "ws-1"))
|
||||
|
||||
mock.ExpectQuery(`SELECT\s+q\.id`).
|
||||
WithArgs("q-status-err").
|
||||
WillReturnError(errors.New("connection refused"))
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
h.GetA2AQueueStatus(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ─── queueRowAuthFields (internal helper) ─────────────────────────────────────
|
||||
// Covers the auth-only 2-col SELECT used by GetA2AQueueStatus to determine
|
||||
// whether the caller has access before projecting the public status fields.
|
||||
|
||||
func TestQueueRowAuthFields_Success_BothPresent(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
queueID := "qqqqqqqq-0003-0003-0003-000000000003"
|
||||
rows := sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
|
||||
AddRow("ws-caller-3", "ws-target-3")
|
||||
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
|
||||
WithArgs(queueID).
|
||||
WillReturnRows(rows)
|
||||
|
||||
callerID, workspaceID, err := queueRowAuthFields(context.Background(), queueID)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if callerID != "ws-caller-3" {
|
||||
t.Errorf("callerID = %q, want %q", callerID, "ws-caller-3")
|
||||
}
|
||||
if workspaceID != "ws-target-3" {
|
||||
t.Errorf("workspaceID = %q, want %q", workspaceID, "ws-target-3")
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueueRowAuthFields_NoRows_ReturnsErrNoRows(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
|
||||
WithArgs("qqqqqqqq-missing").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
_, _, err := queueRowAuthFields(context.Background(), "qqqqqqqq-missing")
|
||||
|
||||
if !errors.Is(err, sql.ErrNoRows) {
|
||||
t.Errorf("expected sql.ErrNoRows, got %v", err)
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueueRowAuthFields_QueryError_ReturnsError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
|
||||
WithArgs("qqqqqqqq-dberr").
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
|
||||
_, _, err := queueRowAuthFields(context.Background(), "qqqqqqqq-dberr")
|
||||
|
||||
if err == nil {
|
||||
t.Fatal("expected error, got nil")
|
||||
}
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
t.Error("expected non-no-rows error, got sql.ErrNoRows")
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Additional GetA2AQueueStatus coverage ─────────────────────────────────────
|
||||
|
||||
// TestGetA2AQueueStatus_AuthPass_CallerMatchesCallerID verifies that a caller
|
||||
// whose workspace matches queue.caller_id (not just workspace_id) passes auth
|
||||
// and receives the status. This path is distinct from the existing "authorized
|
||||
// as target" test which covers workspace_id = caller.
|
||||
func TestGetA2AQueueStatus_AuthPass_CallerMatchesCallerID(t *testing.T) {
|
||||
mock, w, c := newGetA2AQueueStatusHarness(t)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-caller-match"}}
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
c.Request.Header.Set("X-Workspace-ID", "ws-caller-match")
|
||||
|
||||
// Queue row: ws-caller-match is the caller, ws-other-target is the target.
|
||||
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
|
||||
WithArgs("q-caller-match").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
|
||||
AddRow("ws-caller-match", "ws-other-target"))
|
||||
|
||||
mock.ExpectQuery(`SELECT\s+q\.id`).
|
||||
WithArgs("q-caller-match").
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "status", "priority", "attempts",
|
||||
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
|
||||
"response_body",
|
||||
}).AddRow(
|
||||
"q-caller-match", "ws-other-target", "queued", 50, 0,
|
||||
nil, "2026-05-16T10:00:00Z", nil, nil, nil,
|
||||
nil,
|
||||
))
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
h.GetA2AQueueStatus(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var qs QueueStatus
|
||||
json.Unmarshal(w.Body.Bytes(), &qs)
|
||||
if qs.ID != "q-caller-match" {
|
||||
t.Errorf("queue_id = %q; want q-caller-match", qs.ID)
|
||||
}
|
||||
if qs.Status != "queued" {
|
||||
t.Errorf("status = %q; want queued", qs.Status)
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetA2AQueueStatus_AuthPass_OrgTokenBypassesAuth verifies that an org-level
|
||||
// token (canvas/admin) bypasses the caller_id / workspace_id match entirely.
|
||||
// No X-Workspace-ID header is required; org_token_id in context is sufficient.
|
||||
func TestGetA2AQueueStatus_AuthPass_OrgTokenBypassesAuth(t *testing.T) {
|
||||
mock, w, c := newGetA2AQueueStatusHarness(t)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-org-bypass"}}
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
// No X-Workspace-ID header — org token is set via context instead.
|
||||
c.Set("org_token_id", "org-admin-1")
|
||||
|
||||
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
|
||||
WithArgs("q-org-bypass").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
|
||||
AddRow("ws-anyone", "ws-anyone"))
|
||||
|
||||
mock.ExpectQuery(`SELECT\s+q\.id`).
|
||||
WithArgs("q-org-bypass").
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "status", "priority", "attempts",
|
||||
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
|
||||
"response_body",
|
||||
}).AddRow(
|
||||
"q-org-bypass", "ws-anyone", "queued", 25, 0,
|
||||
nil, "2026-05-16T10:00:00Z", nil, nil, nil,
|
||||
nil,
|
||||
))
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
h.GetA2AQueueStatus(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetA2AQueueStatus_StatusQueryNoRows_NotFound covers the theoretical race:
|
||||
// queue row exists (auth check passes), but is deleted before QueueStatusByID runs.
|
||||
// Handler returns 404 (not 500) — matching the existence-non-inference policy.
|
||||
func TestGetA2AQueueStatus_StatusQueryNoRows_NotFound(t *testing.T) {
|
||||
mock, w, c := newGetA2AQueueStatusHarness(t)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-race-no-rows"}}
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
c.Request.Header.Set("X-Workspace-ID", "ws-caller")
|
||||
|
||||
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
|
||||
WithArgs("q-race-no-rows").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
|
||||
AddRow("ws-caller", "ws-target"))
|
||||
|
||||
// Status query returns no rows — row was deleted between auth check and status fetch.
|
||||
mock.ExpectQuery(`SELECT\s+q\.id`).
|
||||
WithArgs("q-race-no-rows").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
h.GetA2AQueueStatus(c)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetA2AQueueStatus_ResponseBodyIncludedWhenCompleted confirms that a completed
|
||||
// queue item surfaces response_body from activity_logs in the HTTP response body.
|
||||
func TestGetA2AQueueStatus_ResponseBodyIncludedWhenCompleted(t *testing.T) {
|
||||
mock, w, c := newGetA2AQueueStatusHarness(t)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "queue_id", Value: "q-completed-body"}}
|
||||
c.Request = httptest.NewRequest("GET", "/", nil)
|
||||
c.Request.Header.Set("X-Workspace-ID", "ws-caller")
|
||||
|
||||
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
|
||||
WithArgs("q-completed-body").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
|
||||
AddRow("ws-caller", "ws-target"))
|
||||
|
||||
respBody := `{"result":{"status":"ok","reply":"hello world"}}`
|
||||
mock.ExpectQuery(`SELECT\s+q\.id`).
|
||||
WithArgs("q-completed-body").
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "status", "priority", "attempts",
|
||||
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
|
||||
"response_body",
|
||||
}).AddRow(
|
||||
"q-completed-body", "ws-target", "completed", 50, 1,
|
||||
nil, "2026-05-16T10:00:00Z", "2026-05-16T10:01:00Z", "2026-05-16T10:02:00Z", nil,
|
||||
respBody,
|
||||
))
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
h.GetA2AQueueStatus(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var qs QueueStatus
|
||||
json.Unmarshal(w.Body.Bytes(), &qs)
|
||||
if qs.ResponseBody == nil {
|
||||
t.Fatal("ResponseBody should be set for completed status")
|
||||
}
|
||||
if string(qs.ResponseBody) != respBody {
|
||||
t.Errorf("ResponseBody = %q, want %q", string(qs.ResponseBody), respBody)
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -745,6 +745,19 @@ func (h *DelegationHandler) listDelegationsFromLedger(ctx context.Context, works
|
||||
// delegation state by folding activity_logs rows by delegation_id.
|
||||
// Kept for backward compatibility and for workspaces that never had
|
||||
// DELEGATION_LEDGER_WRITE=1 during their delegation lifecycle.
|
||||
//
|
||||
// FIX: Deduplicates by delegation_id to avoid returning both the initial
|
||||
// 'delegate' row (status=pending/in_progress) and the terminal
|
||||
// 'delegate_result' row (status=completed/failed) for the same delegation.
|
||||
// Without deduplication the agent's lazy refresh (_refresh_queued_from_platform)
|
||||
// sees the initial row first (still status=pending/in_progress) and never
|
||||
// discovers the terminal outcome even though a matching delegate_result row
|
||||
// exists in the result set.
|
||||
//
|
||||
// Deduplication strategy: Go-side map keyed by delegation_id. For each
|
||||
// delegation_id the newest row (by created_at) is kept. Rows with an empty
|
||||
// delegation_id cannot be deduplicated (pre-#318 records lack the field) and
|
||||
// are returned as-is without deduplication.
|
||||
func (h *DelegationHandler) listDelegationsFromActivityLogs(ctx context.Context, workspaceID string) []map[string]interface{} {
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT id, activity_type, COALESCE(source_id::text, ''), COALESCE(target_id::text, ''),
|
||||
@@ -763,23 +776,50 @@ func (h *DelegationHandler) listDelegationsFromActivityLogs(ctx context.Context,
|
||||
defer rows.Close()
|
||||
|
||||
var result []map[string]interface{}
|
||||
// Deduplicate by delegation_id: keep the newest (first in DESC order).
|
||||
// Rows with an empty delegation_id (pre-#318) are kept as-is.
|
||||
seen := make(map[string]map[string]interface{})
|
||||
for rows.Next() {
|
||||
var id, actType, sourceID, targetID, summary, status, errorDetail, responseBody, delegationID string
|
||||
var createdAt time.Time
|
||||
if err := rows.Scan(&id, &actType, &sourceID, &targetID, &summary, &status, &errorDetail, &responseBody, &delegationID, &createdAt); err != nil {
|
||||
continue
|
||||
}
|
||||
entry := map[string]interface{}{
|
||||
"id": id,
|
||||
"type": actType,
|
||||
"source_id": sourceID,
|
||||
"target_id": targetID,
|
||||
"summary": summary,
|
||||
"status": status,
|
||||
"created_at": createdAt,
|
||||
// Rows are ordered by created_at DESC; the first occurrence per
|
||||
// delegation_id is the newest — skip later (older) duplicates.
|
||||
if delegationID == "" {
|
||||
// No delegation_id: cannot correlate, return as standalone.
|
||||
// These are pre-#318 rows that lack delegation_id.
|
||||
entry := map[string]interface{}{
|
||||
"id": id,
|
||||
"type": actType,
|
||||
"source_id": sourceID,
|
||||
"target_id": targetID,
|
||||
"summary": summary,
|
||||
"status": status,
|
||||
"created_at": createdAt,
|
||||
}
|
||||
if errorDetail != "" {
|
||||
entry["error"] = errorDetail
|
||||
}
|
||||
if responseBody != "" {
|
||||
entry["response_preview"] = textutil.TruncateBytes(responseBody, 300)
|
||||
}
|
||||
result = append(result, entry)
|
||||
continue
|
||||
}
|
||||
if delegationID != "" {
|
||||
entry["delegation_id"] = delegationID
|
||||
if _, exists := seen[delegationID]; exists {
|
||||
continue // older duplicate, skip
|
||||
}
|
||||
entry := map[string]interface{}{
|
||||
"delegation_id": delegationID,
|
||||
"id": id,
|
||||
"type": actType,
|
||||
"source_id": sourceID,
|
||||
"target_id": targetID,
|
||||
"summary": summary,
|
||||
"status": status,
|
||||
"created_at": createdAt,
|
||||
}
|
||||
if errorDetail != "" {
|
||||
entry["error"] = errorDetail
|
||||
@@ -787,12 +827,16 @@ func (h *DelegationHandler) listDelegationsFromActivityLogs(ctx context.Context,
|
||||
if responseBody != "" {
|
||||
entry["response_preview"] = textutil.TruncateBytes(responseBody, 300)
|
||||
}
|
||||
result = append(result, entry)
|
||||
seen[delegationID] = entry
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("ListDelegations rows.Err: %v", err)
|
||||
}
|
||||
|
||||
// Append deduplicated entries (those with a delegation_id).
|
||||
for _, entry := range seen {
|
||||
result = append(result, entry)
|
||||
}
|
||||
if result == nil {
|
||||
return []map[string]interface{}{}
|
||||
}
|
||||
|
||||
@@ -273,7 +273,7 @@ func TestListDelegationsFromActivityLogs_SingleDelegateRow(t *testing.T) {
|
||||
"summary", "status", "error_detail",
|
||||
"response_preview", "delegation_id", "created_at",
|
||||
}).AddRow(
|
||||
"act-1", "delegate",
|
||||
"act-1", "delegation",
|
||||
"ws-1", "ws-2",
|
||||
"analyse Q1 numbers",
|
||||
"in_progress",
|
||||
@@ -296,8 +296,8 @@ func TestListDelegationsFromActivityLogs_SingleDelegateRow(t *testing.T) {
|
||||
if e["id"] != "act-1" {
|
||||
t.Errorf("id: got %v, want act-1", e["id"])
|
||||
}
|
||||
if e["type"] != "delegate" {
|
||||
t.Errorf("type: got %v, want delegate", e["type"])
|
||||
if e["type"] != "delegation" {
|
||||
t.Errorf("type: got %v, want delegation", e["type"])
|
||||
}
|
||||
if e["source_id"] != "ws-1" {
|
||||
t.Errorf("source_id: got %v, want ws-1", e["source_id"])
|
||||
@@ -331,9 +331,9 @@ func TestListDelegationsFromActivityLogs_DelegateResultWithError(t *testing.T) {
|
||||
"summary", "status", "error_detail",
|
||||
"response_preview", "delegation_id", "created_at",
|
||||
}).AddRow(
|
||||
"act-2", "delegate_result",
|
||||
"act-2", "delegation",
|
||||
"ws-1", "ws-2",
|
||||
"result summary",
|
||||
"Delegation failed",
|
||||
"failed",
|
||||
"Callee workspace not reachable",
|
||||
`{"text":"the result body text"}`,
|
||||
@@ -353,8 +353,8 @@ func TestListDelegationsFromActivityLogs_DelegateResultWithError(t *testing.T) {
|
||||
t.Fatalf("expected 1 entry, got %d", len(got))
|
||||
}
|
||||
e := got[0]
|
||||
if e["type"] != "delegate_result" {
|
||||
t.Errorf("type: got %v", e["type"])
|
||||
if e["type"] != "delegation" {
|
||||
t.Errorf("type: got %v, want delegation", e["type"])
|
||||
}
|
||||
if e["error"] != "Callee workspace not reachable" {
|
||||
t.Errorf("error: got %v", e["error"])
|
||||
@@ -417,8 +417,8 @@ func TestListDelegationsFromActivityLogs_RowsErr(t *testing.T) {
|
||||
"summary", "status", "error_detail",
|
||||
"response_preview", "delegation_id", "created_at",
|
||||
}).
|
||||
AddRow("act-1", "delegate", "ws-1", "ws-2", "task", "queued", "", "", "", now).
|
||||
AddRow("act-2", "delegate", "ws-1", "ws-3", "another task", "queued", "", "", "", now).
|
||||
AddRow("act-1", "delegation", "ws-1", "ws-2", "task", "queued", "", "", "", now).
|
||||
AddRow("act-2", "delegation", "ws-1", "ws-3", "another task", "queued", "", "", "", now).
|
||||
RowError(1, context.DeadlineExceeded)
|
||||
mock.ExpectQuery("SELECT .+ FROM activity_logs").
|
||||
WithArgs("ws-1").
|
||||
@@ -445,3 +445,168 @@ func TestListDelegationsFromActivityLogs_RowsErr(t *testing.T) {
|
||||
// sqlmock.NewRows([]string{}).AddRow(...) to panic in test SETUP. The handler
|
||||
// has no recover(), so a scan panic would crash the process — the correct
|
||||
// behaviour. Real-DB integration tests cover this path.
|
||||
|
||||
// ---------- Deduplication by delegation_id ----------
|
||||
|
||||
// TestListDelegationsFromActivityLogs_DeduplicationKeepsNewest verifies that when
|
||||
// both the initial 'delegate' row and the terminal 'delegate_result' row exist for
|
||||
// the same delegation_id, only one entry (the newest) is returned. This is the
|
||||
// fix for the double-row artifact where the agent's _refresh_queued_from_platform
|
||||
// would scan the stale initial row (status=in_progress) before reaching the
|
||||
// terminal row (status=completed/failed).
|
||||
func TestListDelegationsFromActivityLogs_DeduplicationKeepsNewest(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create sqlmock: %v", err)
|
||||
}
|
||||
prevDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
|
||||
|
||||
t0 := time.Now().Add(-2 * time.Hour)
|
||||
t1 := time.Now().Add(-1 * time.Hour)
|
||||
// Rows returned in created_at DESC order. The query uses DISTINCT ON
|
||||
// (delegation_id) ORDER BY delegation_id, created_at DESC, so the newest
|
||||
// row per delegation_id is returned.
|
||||
rows := sqlmock.NewRows([]string{
|
||||
"id", "activity_type", "source_id", "target_id",
|
||||
"summary", "status", "error_detail",
|
||||
"response_preview", "delegation_id", "created_at",
|
||||
}).
|
||||
// delegate_result row (newest for del-abc) — should be kept
|
||||
AddRow("act-2", "delegation", "ws-1", "ws-2",
|
||||
"Delegation completed", "completed", "",
|
||||
`{"text":"the answer is 42"}`, "del-abc", t1).
|
||||
// delegate row (oldest for del-abc) — should be dropped by DISTINCT ON
|
||||
AddRow("act-1", "delegation", "ws-1", "ws-2",
|
||||
"Delegating to ws-2", "in_progress", "",
|
||||
"", "del-abc", t0)
|
||||
mock.ExpectQuery("SELECT .+ FROM activity_logs").
|
||||
WithArgs("ws-1").
|
||||
WillReturnRows(rows)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
dh := NewDelegationHandler(wh, broadcaster)
|
||||
|
||||
got := dh.listDelegationsFromActivityLogs(context.Background(), "ws-1")
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("expected 1 entry after deduplication, got %d: %v", len(got), got)
|
||||
}
|
||||
e := got[0]
|
||||
if e["delegation_id"] != "del-abc" {
|
||||
t.Errorf("delegation_id: got %v, want del-abc", e["delegation_id"])
|
||||
}
|
||||
if e["status"] != "completed" {
|
||||
t.Errorf("status: got %v, want completed (newest row kept)", e["status"])
|
||||
}
|
||||
if e["response_preview"] != `{"text":"the answer is 42"}` {
|
||||
t.Errorf("response_preview: got %v", e["response_preview"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestListDelegationsFromActivityLogs_DeduplicationDistinctDelegations verifies that
|
||||
// rows with different delegation_ids are all returned (deduplication is per-id, not global).
|
||||
func TestListDelegationsFromActivityLogs_DeduplicationDistinctDelegations(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create sqlmock: %v", err)
|
||||
}
|
||||
prevDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
|
||||
|
||||
now := time.Now()
|
||||
rows := sqlmock.NewRows([]string{
|
||||
"id", "activity_type", "source_id", "target_id",
|
||||
"summary", "status", "error_detail",
|
||||
"response_preview", "delegation_id", "created_at",
|
||||
}).
|
||||
AddRow("act-a", "delegation", "ws-1", "ws-2", "task a", "completed", "", "", "del-A", now).
|
||||
AddRow("act-b", "delegation", "ws-1", "ws-3", "task b", "failed", "timeout", "", "del-B", now).
|
||||
AddRow("act-c", "delegation", "ws-1", "ws-4", "task c", "in_progress", "", "", "del-C", now)
|
||||
mock.ExpectQuery("SELECT .+ FROM activity_logs").
|
||||
WithArgs("ws-1").
|
||||
WillReturnRows(rows)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
dh := NewDelegationHandler(wh, broadcaster)
|
||||
|
||||
got := dh.listDelegationsFromActivityLogs(context.Background(), "ws-1")
|
||||
if len(got) != 3 {
|
||||
t.Fatalf("expected 3 entries (all distinct delegation_ids), got %d", len(got))
|
||||
}
|
||||
seen := make(map[string]bool)
|
||||
for _, e := range got {
|
||||
id := e["delegation_id"].(string)
|
||||
if seen[id] {
|
||||
t.Errorf("duplicate delegation_id %q in result", id)
|
||||
}
|
||||
seen[id] = true
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestListDelegationsFromActivityLogs_DeduplicationMixedTerminalStatuses verifies that
|
||||
// a failed delegate_result row (newest) is kept over the initial in_progress row,
|
||||
// and a completed delegate_result is kept over the initial pending row.
|
||||
func TestListDelegationsFromActivityLogs_DeduplicationMixedTerminalStatuses(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create sqlmock: %v", err)
|
||||
}
|
||||
prevDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
|
||||
|
||||
tOld := time.Now().Add(-3 * time.Hour)
|
||||
tMid := time.Now().Add(-2 * time.Hour)
|
||||
tNew := time.Now().Add(-1 * time.Hour)
|
||||
rows := sqlmock.NewRows([]string{
|
||||
"id", "activity_type", "source_id", "target_id",
|
||||
"summary", "status", "error_detail",
|
||||
"response_preview", "delegation_id", "created_at",
|
||||
}).
|
||||
// del-X: newest is completed → keep completed
|
||||
AddRow("act-x2", "delegation", "ws-1", "ws-2", "task X done", "completed", "", `{"text":"X result"}`, "del-X", tNew).
|
||||
AddRow("act-x1", "delegation", "ws-1", "ws-2", "Delegating to ws-2", "pending", "", "", "del-X", tOld).
|
||||
// del-Y: newest is failed → keep failed
|
||||
AddRow("act-y2", "delegation", "ws-1", "ws-3", "task Y done", "failed", "network error", "", "del-Y", tMid).
|
||||
AddRow("act-y1", "delegation", "ws-1", "ws-3", "Delegating to ws-3", "dispatched", "", "", "del-Y", tOld)
|
||||
mock.ExpectQuery("SELECT .+ FROM activity_logs").
|
||||
WithArgs("ws-1").
|
||||
WillReturnRows(rows)
|
||||
|
||||
broadcaster := newTestBroadcaster()
|
||||
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
dh := NewDelegationHandler(wh, broadcaster)
|
||||
|
||||
got := dh.listDelegationsFromActivityLogs(context.Background(), "ws-1")
|
||||
if len(got) != 2 {
|
||||
t.Fatalf("expected 2 entries after deduplication, got %d: %v", len(got), got)
|
||||
}
|
||||
byID := make(map[string]map[string]interface{})
|
||||
for _, e := range got {
|
||||
byID[e["delegation_id"].(string)] = e
|
||||
}
|
||||
x := byID["del-X"]
|
||||
if x["status"] != "completed" {
|
||||
t.Errorf("del-X: got status %v, want completed", x["status"])
|
||||
}
|
||||
y := byID["del-Y"]
|
||||
if y["status"] != "failed" {
|
||||
t.Errorf("del-Y: got status %v, want failed", y["status"])
|
||||
}
|
||||
if y["error"] != "network error" {
|
||||
t.Errorf("del-Y: got error %v, want 'network error'", y["error"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,20 +156,3 @@ func equalStrings(a, b []string) bool {
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// TestEmitOrgEvent_NilPayload exercises the `if payload == nil` branch that
|
||||
// re-initializes payload to an empty map before marshaling.
|
||||
func TestEmitOrgEvent_NilPayloadInitializesEmptyMap(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectExec(`INSERT INTO structure_events`).
|
||||
WithArgs("org.import.started", sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
// Passing nil triggers: if payload == nil { payload = map[string]any{} }
|
||||
emitOrgEvent(context.Background(), "org.import.started", nil)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user