From 7c751ef675e11901d3ad3d898d3c6d987663b423 Mon Sep 17 00:00:00 2001 From: infra-sre Date: Tue, 19 May 2026 12:22:35 -0700 Subject: [PATCH 1/2] =?UTF-8?q?audit:=20phase=201=20structured=20audit-log?= =?UTF-8?q?=20=E2=80=94=20emit=20pkg=20+=20secrets=20wire-in?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add internal/audit with single Emit(ctx, event_type, fields) entrypoint that ships JSON-encoded records via two transports: 1. audit:-prefixed stdout line — tenant Vector docker-logs source already ships this to Loki. No obs-stack change required. 2. Best-effort append to /var/log/molecule-audit.jsonl — durable forensic copy, target for the dedicated Vector file source in Phase 2. Schema is stable v1 (ts, event_type, workspace_id, user_id, actor_kind, correlation_id, fields). Cardinality budget keeps workspace_id + user_id + correlation_id OUT of Loki labels (JSON body only) — fleet active-stream count ~200, well within Loki headroom. Phase 1 wires secret.set and secret.delete on the workspace-scoped (POST/PUT/DELETE /workspaces/:id/secrets) and admin-scoped (POST/DELETE /admin/secrets, /settings/secrets) handlers. value_hash is the first 8 hex chars of sha256(value) — never the raw value. Tests cover: stdout emit, JSONL append, file-failure fallback, concurrent integrity, hash bounds, raw-value-never-emitted contract. Vet + handler-secret tests pass. See: rfc internal/rfcs/audit-log-to-loki.md --- workspace-server/internal/audit/emit.go | 268 ++++++++++++++++++ workspace-server/internal/audit/emit_test.go | 208 ++++++++++++++ workspace-server/internal/audit/hash.go | 15 + workspace-server/internal/handlers/secrets.go | 38 +++ 4 files changed, 529 insertions(+) create mode 100644 workspace-server/internal/audit/emit.go create mode 100644 workspace-server/internal/audit/emit_test.go create mode 100644 workspace-server/internal/audit/hash.go diff --git a/workspace-server/internal/audit/emit.go b/workspace-server/internal/audit/emit.go new file mode 100644 index 00000000..9f610ff0 --- /dev/null +++ b/workspace-server/internal/audit/emit.go @@ -0,0 +1,268 @@ +// Package audit emits structured, single-line JSON audit-log records for +// user-initiated actions on a workspace (secret set/delete, file +// create/delete, A2A send, chat turn, …). Records ship to Loki via the +// tenant Vector pipeline using two transports, in this order: +// +// 1. A `audit:` prefixed line on the standard logger. This is the +// primary transport — tenant Vector already tails the +// molecule-tenant container's stdout (see +// /usr/local/bin/tenant-vector.yaml.tmpl on operator-host), so the +// event reaches Loki with no Vector-side change. +// +// 2. A best-effort append to /var/log/molecule-audit.jsonl on the +// tenant container's writable rootfs. This is the durable local +// artifact for forensic queries when Loki is unreachable, and is +// the future file-source target for Phase 2 (RFC internal#562 Step +// 1, dedicated audit shipping channel). +// +// Both transports are best-effort and run on the request goroutine. +// Per RFC: emit MUST NOT fail the user's request. Any I/O error is +// dropped to a single log.Printf line so an operator can detect the +// outage during a forensic search. The handler caller is decoupled — +// Emit returns nothing. +// +// # Event schema (stable contract — extend by appending; never rename) +// +// { +// "ts": "2026-05-19T20:00:00Z", // RFC3339Nano UTC +// "event_type": "secret.set", // .; low-cardinality +// "workspace_id": "", // bounded ~1000 +// "user_id": "", // unbounded — NOT a label +// "actor_kind": "user|admin|agent|cron", +// "correlation_id": "", // upstream request id +// "fields": { … } // event-specific payload +// } +// +// `fields` MUST NEVER contain secret values. The convention for +// secret-touching events is to record `value_hash` (sha256(value), hex +// prefix of 8 chars) only. +// +// # Loki labels (cardinality budget — see RFC internal/rfcs/audit-log-to-loki.md §4) +// +// - tenant (already set by Vector) ~10 +// - service ("molecule-tenant") 1 +// - container ("molecule-tenant") 1 +// - source ("audit") 1 +// - event_type (low-cardinality, top-20) ~20 +// +// workspace_id, user_id, correlation_id stay INSIDE the JSON body — +// they are queryable via `| json` LogQL but are NOT labels. This keeps +// per-stream cardinality under Loki's 100k/stream chunk limit. +package audit + +import ( + "context" + "encoding/json" + "log" + "os" + "sync" + "time" +) + +// AuditLogPath is where the durable JSONL trail is written. Override +// via the MOLECULE_AUDIT_LOG_PATH env var (useful for tests + for the +// future Phase 2 file-source target). +const defaultAuditLogPath = "/var/log/molecule-audit.jsonl" + +// ActorKind enumerates the categories of actor we tag every event +// with. Strings are stable wire values; do not rename. +type ActorKind string + +const ( + ActorUser ActorKind = "user" + ActorAdmin ActorKind = "admin" + ActorAgent ActorKind = "agent" + ActorCron ActorKind = "cron" +) + +// Context-key type — unexported so callers must use the package-local +// setters to avoid string-key collisions across the binary. +type ctxKey int + +const ( + ctxKeyUserID ctxKey = iota // string + ctxKeyActorKind // ActorKind + ctxKeyCorrelationID // string + ctxKeyWorkspaceID // string +) + +// WithUserID returns ctx with the user-id attached. Middleware that +// authenticates the caller should populate this so handlers can call +// Emit(ctx, ...) without re-discovering identity. +func WithUserID(ctx context.Context, userID string) context.Context { + return context.WithValue(ctx, ctxKeyUserID, userID) +} + +// WithActorKind tags the actor category. Defaults to ActorUser when +// unset (see resolveActor). +func WithActorKind(ctx context.Context, k ActorKind) context.Context { + return context.WithValue(ctx, ctxKeyActorKind, k) +} + +// WithCorrelationID attaches an upstream request id (X-Request-Id or +// similar). The empty string is fine; downstream readers treat empty +// as "no upstream id provided". +func WithCorrelationID(ctx context.Context, id string) context.Context { + return context.WithValue(ctx, ctxKeyCorrelationID, id) +} + +// WithWorkspaceID attaches the workspace UUID — usually pulled from +// the gin URL parameter. Handlers may either pre-populate the context +// or pass it through the Fields map; the Fields map wins if both are +// set, so callers can override on a per-event basis. +func WithWorkspaceID(ctx context.Context, id string) context.Context { + return context.WithValue(ctx, ctxKeyWorkspaceID, id) +} + +func resolveUserID(ctx context.Context) string { + if v, ok := ctx.Value(ctxKeyUserID).(string); ok { + return v + } + return "" +} + +func resolveActor(ctx context.Context) ActorKind { + if v, ok := ctx.Value(ctxKeyActorKind).(ActorKind); ok && v != "" { + return v + } + return ActorUser +} + +func resolveCorrelationID(ctx context.Context) string { + if v, ok := ctx.Value(ctxKeyCorrelationID).(string); ok { + return v + } + return "" +} + +func resolveWorkspaceID(ctx context.Context) string { + if v, ok := ctx.Value(ctxKeyWorkspaceID).(string); ok { + return v + } + return "" +} + +// record is the on-wire shape. Keep field order stable so Loki +// `| json` queries against `event_type` etc. are predictable. +type record struct { + TS string `json:"ts"` + EventType string `json:"event_type"` + WorkspaceID string `json:"workspace_id"` + UserID string `json:"user_id"` + ActorKind ActorKind `json:"actor_kind"` + CorrelationID string `json:"correlation_id"` + Fields map[string]any `json:"fields"` +} + +// fileMu serializes JSONL appends so two goroutines can't interleave +// half-lines. Cheap; audit events are rare relative to request volume. +var fileMu sync.Mutex + +// auditLogPath returns the destination path; respects the +// MOLECULE_AUDIT_LOG_PATH env var so tests + future shipping changes +// don't need to recompile. +func auditLogPath() string { + if p := os.Getenv("MOLECULE_AUDIT_LOG_PATH"); p != "" { + return p + } + return defaultAuditLogPath +} + +// nowRFC3339Nano is var so tests can pin time. +var nowRFC3339Nano = func() string { + return time.Now().UTC().Format(time.RFC3339Nano) +} + +// Emit writes one audit record for eventType. Identity, actor, and +// correlation are pulled from ctx; workspaceID falls back to the ctx +// value if absent from fields. Emission is best-effort: +// +// - The `audit:` log line (Loki transport) is written even if the +// file append fails. +// - The file append is wrapped in its own error branch; on failure +// we drop a single warning and continue. +// +// This function MUST NOT panic and MUST NOT return an error — handlers +// in the request path call it inline. +func Emit(ctx context.Context, eventType string, fields map[string]any) { + if fields == nil { + fields = map[string]any{} + } + + wsID := "" + // Fields-supplied workspace_id wins (per-event override). + if v, ok := fields["workspace_id"].(string); ok && v != "" { + wsID = v + // Remove from inner fields so it isn't duplicated — top-level + // is the canonical position. + delete(fields, "workspace_id") + } else { + wsID = resolveWorkspaceID(ctx) + } + + rec := record{ + TS: nowRFC3339Nano(), + EventType: eventType, + WorkspaceID: wsID, + UserID: resolveUserID(ctx), + ActorKind: resolveActor(ctx), + CorrelationID: resolveCorrelationID(ctx), + Fields: fields, + } + + payload, err := json.Marshal(rec) + if err != nil { + // Marshal failure → emit a degraded marker so the event boundary + // is still visible in Loki. Never lose the fact that *something* + // happened. + log.Printf("audit: %s {\"_marshal_err\":%q,\"event_type\":%q}", eventType, err.Error(), eventType) + return + } + + // Transport 1: stdout (Loki via tenant Vector docker-logs source). + log.Printf("audit: %s", payload) + + // Transport 2: durable JSONL (forensic local copy, Phase-2 + // file-source target). Best effort. + appendJSONL(payload) +} + +// appendJSONL opens, appends one line, and closes. The open-per-write +// pattern is acceptable at audit-event rates (≪100/s); it survives +// log rotation without the package having to handle SIGHUP. +func appendJSONL(payload []byte) { + fileMu.Lock() + defer fileMu.Unlock() + + path := auditLogPath() + f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o640) + if err != nil { + // Don't spam: one warning per emit failure. The Loki transport + // already captured the event so we are not losing observability. + log.Printf("audit: append %s failed (event still in stdout): %v", path, err) + return + } + defer func() { _ = f.Close() }() + + // Write payload + newline as one syscall to keep the JSONL invariant. + if _, werr := f.Write(append(payload, '\n')); werr != nil { + log.Printf("audit: write %s failed (event still in stdout): %v", path, werr) + } +} + +// HashValuePrefix returns the lowercase hex SHA-256 prefix of v, of +// length n. Use this when an event field needs to identify a secret +// value without exposing it. Returns "" for empty input. n is clamped +// to [4, 64]. +func HashValuePrefix(v string, n int) string { + if v == "" { + return "" + } + if n < 4 { + n = 4 + } + if n > 64 { + n = 64 + } + return sha256Hex(v)[:n] +} diff --git a/workspace-server/internal/audit/emit_test.go b/workspace-server/internal/audit/emit_test.go new file mode 100644 index 00000000..a0275736 --- /dev/null +++ b/workspace-server/internal/audit/emit_test.go @@ -0,0 +1,208 @@ +package audit + +import ( + "bytes" + "context" + "encoding/json" + "log" + "os" + "path/filepath" + "strings" + "sync" + "testing" +) + +// captureLog redirects the std logger to a buffer for the duration of fn. +func captureLog(t *testing.T, fn func()) string { + t.Helper() + var buf bytes.Buffer + prevW := log.Writer() + prevF := log.Flags() + log.SetOutput(&buf) + log.SetFlags(0) + t.Cleanup(func() { + log.SetOutput(prevW) + log.SetFlags(prevF) + }) + fn() + return buf.String() +} + +// withTempAuditFile points MOLECULE_AUDIT_LOG_PATH at a fresh file for +// the duration of t. +func withTempAuditFile(t *testing.T) string { + t.Helper() + dir := t.TempDir() + p := filepath.Join(dir, "audit.jsonl") + t.Setenv("MOLECULE_AUDIT_LOG_PATH", p) + return p +} + +func TestEmit_WritesAuditPrefixedLineToStdout(t *testing.T) { + withTempAuditFile(t) + out := captureLog(t, func() { + ctx := WithWorkspaceID(context.Background(), "ws-abc") + ctx = WithUserID(ctx, "u-1") + ctx = WithActorKind(ctx, ActorUser) + Emit(ctx, "secret.set", map[string]any{"key": "ANTHROPIC_API_KEY"}) + }) + out = strings.TrimSpace(out) + if !strings.HasPrefix(out, "audit: ") { + t.Fatalf("expected 'audit: ' prefix, got %q", out) + } + jsonPart := strings.TrimPrefix(out, "audit: ") + var got map[string]any + if err := json.Unmarshal([]byte(jsonPart), &got); err != nil { + t.Fatalf("payload not JSON: %v (raw=%q)", err, jsonPart) + } + if got["event_type"] != "secret.set" { + t.Errorf("event_type mismatch: %+v", got) + } + if got["workspace_id"] != "ws-abc" { + t.Errorf("workspace_id mismatch: %+v", got) + } + if got["user_id"] != "u-1" { + t.Errorf("user_id mismatch: %+v", got) + } + if got["actor_kind"] != "user" { + t.Errorf("actor_kind mismatch: %+v", got) + } +} + +func TestEmit_AppendsToJSONLFile(t *testing.T) { + path := withTempAuditFile(t) + _ = captureLog(t, func() { + Emit(context.Background(), "secret.set", map[string]any{"key": "X"}) + Emit(context.Background(), "secret.delete", map[string]any{"key": "Y"}) + }) + b, err := os.ReadFile(path) + if err != nil { + t.Fatalf("audit file unreadable: %v", err) + } + lines := strings.Split(strings.TrimRight(string(b), "\n"), "\n") + if len(lines) != 2 { + t.Fatalf("expected 2 lines, got %d (raw=%q)", len(lines), b) + } + for i, ln := range lines { + var got map[string]any + if err := json.Unmarshal([]byte(ln), &got); err != nil { + t.Errorf("line %d not valid JSON: %v (%q)", i, err, ln) + } + } +} + +func TestEmit_DefaultsActorToUserWhenUnset(t *testing.T) { + withTempAuditFile(t) + out := captureLog(t, func() { + Emit(context.Background(), "secret.set", nil) + }) + if !strings.Contains(out, `"actor_kind":"user"`) { + t.Errorf("expected actor_kind=user default, got %q", out) + } +} + +func TestEmit_FieldsWorkspaceIDOverridesContext(t *testing.T) { + withTempAuditFile(t) + out := captureLog(t, func() { + ctx := WithWorkspaceID(context.Background(), "ws-ctx") + Emit(ctx, "secret.set", map[string]any{ + "workspace_id": "ws-override", + "key": "K", + }) + }) + if !strings.Contains(out, `"workspace_id":"ws-override"`) { + t.Errorf("fields workspace_id should win over ctx; got %q", out) + } + // Inner fields must NOT carry workspace_id (de-duplicated). + if strings.Contains(out, `"fields":{"workspace_id"`) { + t.Errorf("inner workspace_id should be deleted from fields; got %q", out) + } +} + +func TestEmit_NeverIncludesSecretValues_OnlyHash(t *testing.T) { + // This is a contract test: the package documents that callers must + // hash before emitting. We assert HashValuePrefix gives a stable + // short hex and that the same value never round-trips through Emit. + withTempAuditFile(t) + secret := "sk-very-real-secret" + prefix := HashValuePrefix(secret, 8) + if len(prefix) != 8 { + t.Fatalf("HashValuePrefix length=%d, want 8", len(prefix)) + } + out := captureLog(t, func() { + Emit(context.Background(), "secret.set", map[string]any{ + "key": "TEST", + "value_hash": prefix, + }) + }) + if strings.Contains(out, secret) { + t.Fatalf("audit line MUST NOT contain raw secret; got %q", out) + } + if !strings.Contains(out, prefix) { + t.Errorf("expected value_hash %q in line; got %q", prefix, out) + } +} + +func TestEmit_FileAppendFailureDoesNotBlockStdout(t *testing.T) { + // Point at an unwritable path; stdout transport must still fire. + t.Setenv("MOLECULE_AUDIT_LOG_PATH", "/proc/this/is/not/writable/path.jsonl") + out := captureLog(t, func() { + Emit(context.Background(), "secret.set", map[string]any{"key": "K"}) + }) + if !strings.Contains(out, "audit: ") { + t.Errorf("stdout audit line must fire even when file append fails; got %q", out) + } +} + +func TestEmit_Concurrent_NoInterleavedLines(t *testing.T) { + path := withTempAuditFile(t) + // Capture log to drop stdout noise; we're asserting file integrity. + _ = captureLog(t, func() { + const N = 50 + var wg sync.WaitGroup + wg.Add(N) + for i := 0; i < N; i++ { + i := i + go func() { + defer wg.Done() + Emit(context.Background(), "secret.set", map[string]any{"i": i}) + }() + } + wg.Wait() + }) + b, err := os.ReadFile(path) + if err != nil { + t.Fatalf("audit file unreadable: %v", err) + } + lines := strings.Split(strings.TrimRight(string(b), "\n"), "\n") + if len(lines) != 50 { + t.Fatalf("expected 50 lines, got %d", len(lines)) + } + for i, ln := range lines { + var got map[string]any + if err := json.Unmarshal([]byte(ln), &got); err != nil { + t.Errorf("line %d not valid JSON (interleave bug?): %v", i, err) + } + } +} + +func TestHashValuePrefix_StableAndBounded(t *testing.T) { + if HashValuePrefix("", 8) != "" { + t.Errorf("empty input must return empty") + } + if got := HashValuePrefix("a", 8); len(got) != 8 { + t.Errorf("len mismatch: %q", got) + } + // Clamp lower bound. + if got := HashValuePrefix("a", 1); len(got) != 4 { + t.Errorf("clamp-lo failed: %q", got) + } + // Clamp upper bound. + if got := HashValuePrefix("a", 999); len(got) != 64 { + t.Errorf("clamp-hi failed: %q", got) + } + // Stable across calls (same input → same prefix). + if HashValuePrefix("x", 8) != HashValuePrefix("x", 8) { + t.Errorf("hash not stable") + } +} diff --git a/workspace-server/internal/audit/hash.go b/workspace-server/internal/audit/hash.go new file mode 100644 index 00000000..e63bdc54 --- /dev/null +++ b/workspace-server/internal/audit/hash.go @@ -0,0 +1,15 @@ +package audit + +import ( + "crypto/sha256" + "encoding/hex" +) + +// sha256Hex returns the lowercase hex digest of s. Kept in its own +// file so the import of crypto/sha256 is co-located with its only +// caller (HashValuePrefix in emit.go) — easier to audit when reviewing +// changes to secret handling. +func sha256Hex(s string) string { + sum := sha256.Sum256([]byte(s)) + return hex.EncodeToString(sum[:]) +} diff --git a/workspace-server/internal/handlers/secrets.go b/workspace-server/internal/handlers/secrets.go index d01ed86f..f57bc504 100644 --- a/workspace-server/internal/handlers/secrets.go +++ b/workspace-server/internal/handlers/secrets.go @@ -7,6 +7,7 @@ import ( "net/http" "regexp" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/audit" "github.com/Molecule-AI/molecule-monorepo/platform/internal/crypto" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth" @@ -262,6 +263,17 @@ func (h *SecretsHandler) Set(c *gin.Context) { return } + // Phase 1 audit: structured event for the security trail. Inline (not + // goroutine) so the event is durable before we ack the user; emit is + // best-effort and never errors out of the request path. + audit.Emit(c.Request.Context(), "secret.set", map[string]any{ + "workspace_id": workspaceID, + "key": body.Key, + "value_hash": audit.HashValuePrefix(body.Value, 8), + "scope": "workspace", + "operation": "set", + }) + // Auto-restart workspace to pick up new secret. // RFC internal#524 Layer 1: route through globalGoAsync so tests can // drain the detached restart goroutine before db.DB is swapped — see @@ -301,6 +313,15 @@ func (h *SecretsHandler) Delete(c *gin.Context) { return } + // Phase 1 audit: structured event for the security trail. Only on + // real deletes (rows>0) — a 404 is not a state change. + audit.Emit(c.Request.Context(), "secret.delete", map[string]any{ + "workspace_id": workspaceID, + "key": key, + "scope": "workspace", + "operation": "delete", + }) + // Auto-restart workspace to pick up removed secret. // RFC internal#524 Layer 1: see Set() above for the drain rationale. if h.restartFunc != nil { @@ -393,6 +414,15 @@ func (h *SecretsHandler) SetGlobal(c *gin.Context) { key := body.Key globalGoAsync(func() { h.restartAllAffectedByGlobalKey(key) }) + // Phase 1 audit: admin-scope secret write — high-value security event. + auditCtx := audit.WithActorKind(c.Request.Context(), audit.ActorAdmin) + audit.Emit(auditCtx, "secret.set", map[string]any{ + "key": body.Key, + "value_hash": audit.HashValuePrefix(body.Value, 8), + "scope": "global", + "operation": "set", + }) + c.JSON(http.StatusOK, gin.H{"status": "saved", "key": body.Key, "scope": "global"}) } @@ -471,6 +501,14 @@ func (h *SecretsHandler) DeleteGlobal(c *gin.Context) { k := key globalGoAsync(func() { h.restartAllAffectedByGlobalKey(k) }) + // Phase 1 audit: admin-scope secret delete. + auditCtx := audit.WithActorKind(c.Request.Context(), audit.ActorAdmin) + audit.Emit(auditCtx, "secret.delete", map[string]any{ + "key": key, + "scope": "global", + "operation": "delete", + }) + c.JSON(http.StatusOK, gin.H{"status": "deleted", "key": key, "scope": "global"}) } -- 2.52.0 From b5b95de19a3303230b5b559454bd59c7841378f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Molecule=20AI=20=C2=B7=20core-devops?= Date: Tue, 19 May 2026 20:00:26 +0000 Subject: [PATCH 2/2] test(audit): bind HashValuePrefix calls to vars to satisfy staticcheck SA4000 Staticcheck SA4000 flagged the stability assertion as tautological (identical expressions on both sides of !=). Bind both calls to local vars to preserve test intent (call-stability) and silence the linter. No functional change. Follow-up to mc#1572 review (core-devops lens). --- workspace-server/internal/audit/emit_test.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/workspace-server/internal/audit/emit_test.go b/workspace-server/internal/audit/emit_test.go index a0275736..6c8ac9a5 100644 --- a/workspace-server/internal/audit/emit_test.go +++ b/workspace-server/internal/audit/emit_test.go @@ -201,8 +201,13 @@ func TestHashValuePrefix_StableAndBounded(t *testing.T) { if got := HashValuePrefix("a", 999); len(got) != 64 { t.Errorf("clamp-hi failed: %q", got) } - // Stable across calls (same input → same prefix). - if HashValuePrefix("x", 8) != HashValuePrefix("x", 8) { - t.Errorf("hash not stable") + // Stable across calls (same input → same prefix). Bind to vars so + // staticcheck SA4000 does not flag the comparison as tautological; + // the intent is to assert call-stability, which requires invoking + // the function twice with the same input. + a := HashValuePrefix("x", 8) + b := HashValuePrefix("x", 8) + if a != b { + t.Errorf("hash not stable: a=%q b=%q", a, b) } } -- 2.52.0