audit: phase 1 structured audit-log — emit pkg + secrets wire-in #1572

Merged
core-devops merged 3 commits from infra-sre/audit-log-phase1-emit-secrets into main 2026-05-19 20:30:26 +00:00
4 changed files with 534 additions and 0 deletions
+268
View File
@@ -0,0 +1,268 @@
// Package audit emits structured, single-line JSON audit-log records for
// user-initiated actions on a workspace (secret set/delete, file
// create/delete, A2A send, chat turn, …). Records ship to Loki via the
// tenant Vector pipeline using two transports, in this order:
//
// 1. A `audit:` prefixed line on the standard logger. This is the
// primary transport — tenant Vector already tails the
// molecule-tenant container's stdout (see
// /usr/local/bin/tenant-vector.yaml.tmpl on operator-host), so the
// event reaches Loki with no Vector-side change.
//
// 2. A best-effort append to /var/log/molecule-audit.jsonl on the
// tenant container's writable rootfs. This is the durable local
// artifact for forensic queries when Loki is unreachable, and is
// the future file-source target for Phase 2 (RFC internal#562 Step
// 1, dedicated audit shipping channel).
//
// Both transports are best-effort and run on the request goroutine.
// Per RFC: emit MUST NOT fail the user's request. Any I/O error is
// dropped to a single log.Printf line so an operator can detect the
// outage during a forensic search. The handler caller is decoupled —
// Emit returns nothing.
//
// # Event schema (stable contract — extend by appending; never rename)
//
// {
// "ts": "2026-05-19T20:00:00Z", // RFC3339Nano UTC
// "event_type": "secret.set", // <noun>.<verb>; low-cardinality
// "workspace_id": "<uuid>", // bounded ~1000
// "user_id": "<uuid|empty>", // unbounded — NOT a label
// "actor_kind": "user|admin|agent|cron",
// "correlation_id": "<req-id|empty>", // upstream request id
// "fields": { … } // event-specific payload
// }
//
// `fields` MUST NEVER contain secret values. The convention for
// secret-touching events is to record `value_hash` (sha256(value), hex
// prefix of 8 chars) only.
//
// # Loki labels (cardinality budget — see RFC internal/rfcs/audit-log-to-loki.md §4)
//
// - tenant (already set by Vector) ~10
// - service ("molecule-tenant") 1
// - container ("molecule-tenant") 1
// - source ("audit") 1
// - event_type (low-cardinality, top-20) ~20
//
// workspace_id, user_id, correlation_id stay INSIDE the JSON body —
// they are queryable via `| json` LogQL but are NOT labels. This keeps
// per-stream cardinality under Loki's 100k/stream chunk limit.
package audit
import (
"context"
"encoding/json"
"log"
"os"
"sync"
"time"
)
// AuditLogPath is where the durable JSONL trail is written. Override
// via the MOLECULE_AUDIT_LOG_PATH env var (useful for tests + for the
// future Phase 2 file-source target).
const defaultAuditLogPath = "/var/log/molecule-audit.jsonl"
// ActorKind enumerates the categories of actor we tag every event
// with. Strings are stable wire values; do not rename.
type ActorKind string
const (
ActorUser ActorKind = "user"
ActorAdmin ActorKind = "admin"
ActorAgent ActorKind = "agent"
ActorCron ActorKind = "cron"
)
// Context-key type — unexported so callers must use the package-local
// setters to avoid string-key collisions across the binary.
type ctxKey int
const (
ctxKeyUserID ctxKey = iota // string
ctxKeyActorKind // ActorKind
ctxKeyCorrelationID // string
ctxKeyWorkspaceID // string
)
// WithUserID returns ctx with the user-id attached. Middleware that
// authenticates the caller should populate this so handlers can call
// Emit(ctx, ...) without re-discovering identity.
func WithUserID(ctx context.Context, userID string) context.Context {
return context.WithValue(ctx, ctxKeyUserID, userID)
}
// WithActorKind tags the actor category. Defaults to ActorUser when
// unset (see resolveActor).
func WithActorKind(ctx context.Context, k ActorKind) context.Context {
return context.WithValue(ctx, ctxKeyActorKind, k)
}
// WithCorrelationID attaches an upstream request id (X-Request-Id or
// similar). The empty string is fine; downstream readers treat empty
// as "no upstream id provided".
func WithCorrelationID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, ctxKeyCorrelationID, id)
}
// WithWorkspaceID attaches the workspace UUID — usually pulled from
// the gin URL parameter. Handlers may either pre-populate the context
// or pass it through the Fields map; the Fields map wins if both are
// set, so callers can override on a per-event basis.
func WithWorkspaceID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, ctxKeyWorkspaceID, id)
}
func resolveUserID(ctx context.Context) string {
if v, ok := ctx.Value(ctxKeyUserID).(string); ok {
return v
}
return ""
}
func resolveActor(ctx context.Context) ActorKind {
if v, ok := ctx.Value(ctxKeyActorKind).(ActorKind); ok && v != "" {
return v
}
return ActorUser
}
func resolveCorrelationID(ctx context.Context) string {
if v, ok := ctx.Value(ctxKeyCorrelationID).(string); ok {
return v
}
return ""
}
func resolveWorkspaceID(ctx context.Context) string {
if v, ok := ctx.Value(ctxKeyWorkspaceID).(string); ok {
return v
}
return ""
}
// record is the on-wire shape. Keep field order stable so Loki
// `| json` queries against `event_type` etc. are predictable.
type record struct {
TS string `json:"ts"`
EventType string `json:"event_type"`
WorkspaceID string `json:"workspace_id"`
UserID string `json:"user_id"`
ActorKind ActorKind `json:"actor_kind"`
CorrelationID string `json:"correlation_id"`
Fields map[string]any `json:"fields"`
}
// fileMu serializes JSONL appends so two goroutines can't interleave
// half-lines. Cheap; audit events are rare relative to request volume.
var fileMu sync.Mutex
// auditLogPath returns the destination path; respects the
// MOLECULE_AUDIT_LOG_PATH env var so tests + future shipping changes
// don't need to recompile.
func auditLogPath() string {
if p := os.Getenv("MOLECULE_AUDIT_LOG_PATH"); p != "" {
return p
}
return defaultAuditLogPath
}
// nowRFC3339Nano is var so tests can pin time.
var nowRFC3339Nano = func() string {
return time.Now().UTC().Format(time.RFC3339Nano)
}
// Emit writes one audit record for eventType. Identity, actor, and
// correlation are pulled from ctx; workspaceID falls back to the ctx
// value if absent from fields. Emission is best-effort:
//
// - The `audit:` log line (Loki transport) is written even if the
// file append fails.
// - The file append is wrapped in its own error branch; on failure
// we drop a single warning and continue.
//
// This function MUST NOT panic and MUST NOT return an error — handlers
// in the request path call it inline.
func Emit(ctx context.Context, eventType string, fields map[string]any) {
if fields == nil {
fields = map[string]any{}
}
wsID := ""
// Fields-supplied workspace_id wins (per-event override).
if v, ok := fields["workspace_id"].(string); ok && v != "" {
wsID = v
// Remove from inner fields so it isn't duplicated — top-level
// is the canonical position.
delete(fields, "workspace_id")
} else {
wsID = resolveWorkspaceID(ctx)
}
rec := record{
TS: nowRFC3339Nano(),
EventType: eventType,
WorkspaceID: wsID,
UserID: resolveUserID(ctx),
ActorKind: resolveActor(ctx),
CorrelationID: resolveCorrelationID(ctx),
Fields: fields,
}
payload, err := json.Marshal(rec)
if err != nil {
// Marshal failure → emit a degraded marker so the event boundary
// is still visible in Loki. Never lose the fact that *something*
// happened.
log.Printf("audit: %s {\"_marshal_err\":%q,\"event_type\":%q}", eventType, err.Error(), eventType)
return
}
// Transport 1: stdout (Loki via tenant Vector docker-logs source).
log.Printf("audit: %s", payload)
// Transport 2: durable JSONL (forensic local copy, Phase-2
// file-source target). Best effort.
appendJSONL(payload)
}
// appendJSONL opens, appends one line, and closes. The open-per-write
// pattern is acceptable at audit-event rates (≪100/s); it survives
// log rotation without the package having to handle SIGHUP.
func appendJSONL(payload []byte) {
fileMu.Lock()
defer fileMu.Unlock()
path := auditLogPath()
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o640)
if err != nil {
// Don't spam: one warning per emit failure. The Loki transport
// already captured the event so we are not losing observability.
log.Printf("audit: append %s failed (event still in stdout): %v", path, err)
return
}
defer func() { _ = f.Close() }()
// Write payload + newline as one syscall to keep the JSONL invariant.
if _, werr := f.Write(append(payload, '\n')); werr != nil {
log.Printf("audit: write %s failed (event still in stdout): %v", path, werr)
}
}
// HashValuePrefix returns the lowercase hex SHA-256 prefix of v, of
// length n. Use this when an event field needs to identify a secret
// value without exposing it. Returns "" for empty input. n is clamped
// to [4, 64].
func HashValuePrefix(v string, n int) string {
if v == "" {
return ""
}
if n < 4 {
n = 4
}
if n > 64 {
n = 64
}
return sha256Hex(v)[:n]
}
@@ -0,0 +1,213 @@
package audit
import (
"bytes"
"context"
"encoding/json"
"log"
"os"
"path/filepath"
"strings"
"sync"
"testing"
)
// captureLog redirects the std logger to a buffer for the duration of fn.
func captureLog(t *testing.T, fn func()) string {
t.Helper()
var buf bytes.Buffer
prevW := log.Writer()
prevF := log.Flags()
log.SetOutput(&buf)
log.SetFlags(0)
t.Cleanup(func() {
log.SetOutput(prevW)
log.SetFlags(prevF)
})
fn()
return buf.String()
}
// withTempAuditFile points MOLECULE_AUDIT_LOG_PATH at a fresh file for
// the duration of t.
func withTempAuditFile(t *testing.T) string {
t.Helper()
dir := t.TempDir()
p := filepath.Join(dir, "audit.jsonl")
t.Setenv("MOLECULE_AUDIT_LOG_PATH", p)
return p
}
func TestEmit_WritesAuditPrefixedLineToStdout(t *testing.T) {
withTempAuditFile(t)
out := captureLog(t, func() {
ctx := WithWorkspaceID(context.Background(), "ws-abc")
ctx = WithUserID(ctx, "u-1")
ctx = WithActorKind(ctx, ActorUser)
Emit(ctx, "secret.set", map[string]any{"key": "ANTHROPIC_API_KEY"})
})
out = strings.TrimSpace(out)
if !strings.HasPrefix(out, "audit: ") {
t.Fatalf("expected 'audit: ' prefix, got %q", out)
}
jsonPart := strings.TrimPrefix(out, "audit: ")
var got map[string]any
if err := json.Unmarshal([]byte(jsonPart), &got); err != nil {
t.Fatalf("payload not JSON: %v (raw=%q)", err, jsonPart)
}
if got["event_type"] != "secret.set" {
t.Errorf("event_type mismatch: %+v", got)
}
if got["workspace_id"] != "ws-abc" {
t.Errorf("workspace_id mismatch: %+v", got)
}
if got["user_id"] != "u-1" {
t.Errorf("user_id mismatch: %+v", got)
}
if got["actor_kind"] != "user" {
t.Errorf("actor_kind mismatch: %+v", got)
}
}
func TestEmit_AppendsToJSONLFile(t *testing.T) {
path := withTempAuditFile(t)
_ = captureLog(t, func() {
Emit(context.Background(), "secret.set", map[string]any{"key": "X"})
Emit(context.Background(), "secret.delete", map[string]any{"key": "Y"})
})
b, err := os.ReadFile(path)
if err != nil {
t.Fatalf("audit file unreadable: %v", err)
}
lines := strings.Split(strings.TrimRight(string(b), "\n"), "\n")
if len(lines) != 2 {
t.Fatalf("expected 2 lines, got %d (raw=%q)", len(lines), b)
}
for i, ln := range lines {
var got map[string]any
if err := json.Unmarshal([]byte(ln), &got); err != nil {
t.Errorf("line %d not valid JSON: %v (%q)", i, err, ln)
}
}
}
func TestEmit_DefaultsActorToUserWhenUnset(t *testing.T) {
withTempAuditFile(t)
out := captureLog(t, func() {
Emit(context.Background(), "secret.set", nil)
})
if !strings.Contains(out, `"actor_kind":"user"`) {
t.Errorf("expected actor_kind=user default, got %q", out)
}
}
func TestEmit_FieldsWorkspaceIDOverridesContext(t *testing.T) {
withTempAuditFile(t)
out := captureLog(t, func() {
ctx := WithWorkspaceID(context.Background(), "ws-ctx")
Emit(ctx, "secret.set", map[string]any{
"workspace_id": "ws-override",
"key": "K",
})
})
if !strings.Contains(out, `"workspace_id":"ws-override"`) {
t.Errorf("fields workspace_id should win over ctx; got %q", out)
}
// Inner fields must NOT carry workspace_id (de-duplicated).
if strings.Contains(out, `"fields":{"workspace_id"`) {
t.Errorf("inner workspace_id should be deleted from fields; got %q", out)
}
}
func TestEmit_NeverIncludesSecretValues_OnlyHash(t *testing.T) {
// This is a contract test: the package documents that callers must
// hash before emitting. We assert HashValuePrefix gives a stable
// short hex and that the same value never round-trips through Emit.
withTempAuditFile(t)
secret := "sk-very-real-secret"
prefix := HashValuePrefix(secret, 8)
if len(prefix) != 8 {
t.Fatalf("HashValuePrefix length=%d, want 8", len(prefix))
}
out := captureLog(t, func() {
Emit(context.Background(), "secret.set", map[string]any{
"key": "TEST",
"value_hash": prefix,
})
})
if strings.Contains(out, secret) {
t.Fatalf("audit line MUST NOT contain raw secret; got %q", out)
}
if !strings.Contains(out, prefix) {
t.Errorf("expected value_hash %q in line; got %q", prefix, out)
}
}
func TestEmit_FileAppendFailureDoesNotBlockStdout(t *testing.T) {
// Point at an unwritable path; stdout transport must still fire.
t.Setenv("MOLECULE_AUDIT_LOG_PATH", "/proc/this/is/not/writable/path.jsonl")
out := captureLog(t, func() {
Emit(context.Background(), "secret.set", map[string]any{"key": "K"})
})
if !strings.Contains(out, "audit: ") {
t.Errorf("stdout audit line must fire even when file append fails; got %q", out)
}
}
func TestEmit_Concurrent_NoInterleavedLines(t *testing.T) {
path := withTempAuditFile(t)
// Capture log to drop stdout noise; we're asserting file integrity.
_ = captureLog(t, func() {
const N = 50
var wg sync.WaitGroup
wg.Add(N)
for i := 0; i < N; i++ {
i := i
go func() {
defer wg.Done()
Emit(context.Background(), "secret.set", map[string]any{"i": i})
}()
}
wg.Wait()
})
b, err := os.ReadFile(path)
if err != nil {
t.Fatalf("audit file unreadable: %v", err)
}
lines := strings.Split(strings.TrimRight(string(b), "\n"), "\n")
if len(lines) != 50 {
t.Fatalf("expected 50 lines, got %d", len(lines))
}
for i, ln := range lines {
var got map[string]any
if err := json.Unmarshal([]byte(ln), &got); err != nil {
t.Errorf("line %d not valid JSON (interleave bug?): %v", i, err)
}
}
}
func TestHashValuePrefix_StableAndBounded(t *testing.T) {
if HashValuePrefix("", 8) != "" {
t.Errorf("empty input must return empty")
}
if got := HashValuePrefix("a", 8); len(got) != 8 {
t.Errorf("len mismatch: %q", got)
}
// Clamp lower bound.
if got := HashValuePrefix("a", 1); len(got) != 4 {
t.Errorf("clamp-lo failed: %q", got)
}
// Clamp upper bound.
if got := HashValuePrefix("a", 999); len(got) != 64 {
t.Errorf("clamp-hi failed: %q", got)
}
// Stable across calls (same input → same prefix). Bind to vars so
// staticcheck SA4000 does not flag the comparison as tautological;
// the intent is to assert call-stability, which requires invoking
// the function twice with the same input.
a := HashValuePrefix("x", 8)
b := HashValuePrefix("x", 8)
if a != b {
t.Errorf("hash not stable: a=%q b=%q", a, b)
}
}
+15
View File
@@ -0,0 +1,15 @@
package audit
import (
"crypto/sha256"
"encoding/hex"
)
// sha256Hex returns the lowercase hex digest of s. Kept in its own
// file so the import of crypto/sha256 is co-located with its only
// caller (HashValuePrefix in emit.go) — easier to audit when reviewing
// changes to secret handling.
func sha256Hex(s string) string {
sum := sha256.Sum256([]byte(s))
return hex.EncodeToString(sum[:])
}
@@ -7,6 +7,7 @@ import (
"net/http"
"regexp"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/audit"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/crypto"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
@@ -262,6 +263,17 @@ func (h *SecretsHandler) Set(c *gin.Context) {
return
}
// Phase 1 audit: structured event for the security trail. Inline (not
// goroutine) so the event is durable before we ack the user; emit is
// best-effort and never errors out of the request path.
audit.Emit(c.Request.Context(), "secret.set", map[string]any{
"workspace_id": workspaceID,
"key": body.Key,
"value_hash": audit.HashValuePrefix(body.Value, 8),
"scope": "workspace",
"operation": "set",
})
// Auto-restart workspace to pick up new secret.
// RFC internal#524 Layer 1: route through globalGoAsync so tests can
// drain the detached restart goroutine before db.DB is swapped — see
@@ -301,6 +313,15 @@ func (h *SecretsHandler) Delete(c *gin.Context) {
return
}
// Phase 1 audit: structured event for the security trail. Only on
// real deletes (rows>0) — a 404 is not a state change.
audit.Emit(c.Request.Context(), "secret.delete", map[string]any{
"workspace_id": workspaceID,
"key": key,
"scope": "workspace",
"operation": "delete",
})
// Auto-restart workspace to pick up removed secret.
// RFC internal#524 Layer 1: see Set() above for the drain rationale.
if h.restartFunc != nil {
@@ -393,6 +414,15 @@ func (h *SecretsHandler) SetGlobal(c *gin.Context) {
key := body.Key
globalGoAsync(func() { h.restartAllAffectedByGlobalKey(key) })
// Phase 1 audit: admin-scope secret write — high-value security event.
auditCtx := audit.WithActorKind(c.Request.Context(), audit.ActorAdmin)
audit.Emit(auditCtx, "secret.set", map[string]any{
"key": body.Key,
"value_hash": audit.HashValuePrefix(body.Value, 8),
"scope": "global",
"operation": "set",
})
c.JSON(http.StatusOK, gin.H{"status": "saved", "key": body.Key, "scope": "global"})
}
@@ -471,6 +501,14 @@ func (h *SecretsHandler) DeleteGlobal(c *gin.Context) {
k := key
globalGoAsync(func() { h.restartAllAffectedByGlobalKey(k) })
// Phase 1 audit: admin-scope secret delete.
auditCtx := audit.WithActorKind(c.Request.Context(), audit.ActorAdmin)
audit.Emit(auditCtx, "secret.delete", map[string]any{
"key": key,
"scope": "global",
"operation": "delete",
})
c.JSON(http.StatusOK, gin.H{"status": "deleted", "key": key, "scope": "global"})
}