Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e2291051b7 | |||
| 3c0d00b43f | |||
| 360321db53 | |||
| 1a9168d632 | |||
| 03689e3d9a | |||
| 67840629eb |
@@ -180,7 +180,7 @@ jobs:
|
||||
# environment pypi-publish. The action mints a short-lived OIDC
|
||||
# token and exchanges it for a PyPI upload credential — no static
|
||||
# API token in this repo's secrets.
|
||||
uses: pypa/gh-action-pypi-publish@release/v1
|
||||
uses: pypa/gh-action-pypi-publish@cef221092ed1bacb1cc03d23a2d87d1d172e277b # release/v1
|
||||
with:
|
||||
packages-dir: ${{ runner.temp }}/runtime-build/dist/
|
||||
|
||||
|
||||
@@ -48,7 +48,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 5
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
|
||||
- uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
|
||||
with:
|
||||
|
||||
@@ -332,13 +332,18 @@ func main() {
|
||||
cronSched.SetChannels(channelMgr)
|
||||
|
||||
// Router
|
||||
// Plugin registry — created before Setup so the same registry is shared
|
||||
// between the PluginsHandler (for installs) and the drift sweeper (for
|
||||
// drift detection). github:// sources always work; local:// sources
|
||||
// require a plugins/ dir on disk (nil in CP/SaaS mode).
|
||||
// Plugin resolver + registry — the github:// resolver is shared between
|
||||
// the PluginsHandler (for installs, via router.Setup → WithSourceResolver)
|
||||
// and a local registry that the drift sweeper consumes (for drift
|
||||
// detection). Sharing the resolver instance preserves the documented
|
||||
// intent of unified resolver state (e.g. rate limiters, in-process
|
||||
// caches) across both surfaces. local:// sources require a plugins/
|
||||
// dir on disk and live entirely inside PluginsHandler's internal
|
||||
// registry, which is fine — drift detection is github-only by design.
|
||||
githubResolver := plugins.NewGithubResolver()
|
||||
pluginRegistry := plugins.NewRegistry()
|
||||
pluginRegistry.Register(plugins.NewGithubResolver())
|
||||
r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr, memBundle, pluginRegistry)
|
||||
pluginRegistry.Register(githubResolver)
|
||||
r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr, memBundle, githubResolver)
|
||||
|
||||
// Plugin drift sweeper — periodic detection of upstream plugin version drift
|
||||
// (core#123). Scans workspace_plugins rows where tracked_ref != 'none',
|
||||
|
||||
@@ -4,7 +4,6 @@ go 1.25.0
|
||||
|
||||
require (
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2
|
||||
go.moleculesai.app/plugin/gh-identity v0.0.0-20260509010445-788988195fce
|
||||
github.com/alicebob/miniredis/v2 v2.37.0
|
||||
github.com/creack/pty v1.1.24
|
||||
github.com/docker/docker v28.5.2+incompatible
|
||||
@@ -19,6 +18,7 @@ require (
|
||||
github.com/opencontainers/image-spec v1.1.1
|
||||
github.com/redis/go-redis/v9 v9.19.0
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
go.moleculesai.app/plugin/gh-identity v0.0.0-20260509010445-788988195fce
|
||||
golang.org/x/crypto v0.50.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
@@ -4,8 +4,6 @@ github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7Oputl
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
|
||||
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
|
||||
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
|
||||
github.com/Molecule-AI/molecule-ai-plugin-gh-identity v0.0.0-20260424033845-4fd5ac7be30f h1:YkLRhUg+9qr9OV9N8dG1Hj0Ml7TThHlRwh5F//oUJVs=
|
||||
github.com/Molecule-AI/molecule-ai-plugin-gh-identity v0.0.0-20260424033845-4fd5ac7be30f/go.mod h1:NqdtlWZDJvpXNJRHnMkPhTKHdA1LZTNH+63TB66JSOU=
|
||||
github.com/alicebob/miniredis/v2 v2.37.0 h1:RheObYW32G1aiJIj81XVt78ZHJpHonHLHW7OLIshq68=
|
||||
github.com/alicebob/miniredis/v2 v2.37.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||
@@ -154,6 +152,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M
|
||||
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
|
||||
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
|
||||
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
|
||||
go.moleculesai.app/plugin/gh-identity v0.0.0-20260509010445-788988195fce h1:ftm0ba0ukLlfqeFes+/jWnXH8XULXmRpMy3fOCZ83/U=
|
||||
go.moleculesai.app/plugin/gh-identity v0.0.0-20260509010445-788988195fce/go.mod h1:0aAqoDle2V7Cywso94MXdv1DH/HEe/0oZmcbqWYMK7g=
|
||||
go.mongodb.org/mongo-driver/v2 v2.5.0 h1:yXUhImUjjAInNcpTcAlPHiT7bIXhshCTL3jVBkF3xaE=
|
||||
go.mongodb.org/mongo-driver/v2 v2.5.0/go.mod h1:yOI9kBsufol30iFsl1slpdq1I0eHPzybRWdyYUs8K/0=
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
||||
|
||||
@@ -8,7 +8,6 @@ package handlers
|
||||
// POST /admin/plugin-updates/:id/apply — apply a queued drift update
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
@@ -1262,4 +1262,3 @@ func TestExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,7 +112,15 @@ func (h *PluginsHandler) WithInstanceIDLookup(lookup InstanceIDLookup) *PluginsH
|
||||
|
||||
// Sources returns the underlying plugin source registry. Used by main.go to
|
||||
// pass the same registry to the drift sweeper so both share resolver state.
|
||||
func (h *PluginsHandler) Sources() plugins.SourceResolver {
|
||||
//
|
||||
// Returns plugins.RegistryResolver (the slim Schemes()-only interface the
|
||||
// drift sweeper consumes), not plugins.SourceResolver — `h.sources` is the
|
||||
// registry shape (Register/Resolve/Schemes), not the per-scheme fetcher
|
||||
// shape (Scheme/Fetch). The previous return type was a leftover from
|
||||
// before #1814 narrowed `h.sources` to the `pluginSources` interface and
|
||||
// caused a compile error once the masking duplicate `SourceResolver` in
|
||||
// `internal/plugins/drift_sweeper.go` was removed.
|
||||
func (h *PluginsHandler) Sources() plugins.RegistryResolver {
|
||||
return h.sources
|
||||
}
|
||||
|
||||
|
||||
@@ -120,7 +120,7 @@ func (h *WorkspaceHandler) resolveAgentURLForRestartSignal(ctx context.Context,
|
||||
// Try Redis cache first.
|
||||
agentURL, err := db.GetCachedURL(ctx, workspaceID)
|
||||
if err == nil && agentURL != "" {
|
||||
return rewriteForDocker(agentURL, workspaceID), nil
|
||||
return h.rewriteForDocker(agentURL, workspaceID), nil
|
||||
}
|
||||
|
||||
// Cache miss — fall back to DB.
|
||||
@@ -136,13 +136,18 @@ func (h *WorkspaceHandler) resolveAgentURLForRestartSignal(ctx context.Context,
|
||||
}
|
||||
agentURL = *urlNullable
|
||||
_ = db.CacheURL(ctx, workspaceID, agentURL)
|
||||
return rewriteForDocker(agentURL, workspaceID), nil
|
||||
return h.rewriteForDocker(agentURL, workspaceID), nil
|
||||
}
|
||||
|
||||
// rewriteForDocker rewrites a 127.0.0.1 agent URL to the Docker-DNS form
|
||||
// when the platform is running inside a Docker container. When platform is
|
||||
// on the host (non-Docker), 127.0.0.1 IS the host and the original URL works.
|
||||
func rewriteForDocker(agentURL, workspaceID string) string {
|
||||
//
|
||||
// Method receiver `h` is required to access h.provisioner; was previously
|
||||
// declared as a package-level function which referred to an undefined `h`
|
||||
// — compile error masked by the upstream `SourceResolver` redeclaration in
|
||||
// internal/plugins/drift_sweeper.go.
|
||||
func (h *WorkspaceHandler) rewriteForDocker(agentURL, workspaceID string) string {
|
||||
if platformInDocker && h.provisioner != nil {
|
||||
// Only rewrite if the URL points to localhost (the ephemeral port
|
||||
// binding the container published to the host). Internal Docker
|
||||
|
||||
@@ -324,7 +324,6 @@ func setupTestRedisWithURL(t *testing.T, url string) *miniredis.Miniredis {
|
||||
return mr
|
||||
}
|
||||
|
||||
// rewriteForDocker is exported from restart_signals.go so it can be tested here.
|
||||
func (h *WorkspaceHandler) rewriteForDocker(agentURL, workspaceID string) string {
|
||||
return rewriteForDocker(agentURL, workspaceID)
|
||||
}
|
||||
// (the previous test-only shim wrapping a package-level rewriteForDocker
|
||||
// has been removed: production rewriteForDocker is now a *WorkspaceHandler
|
||||
// method directly — see internal/handlers/restart_signals.go.)
|
||||
|
||||
@@ -9,7 +9,8 @@ package plugins
|
||||
// 1. SELECTs workspace_plugins rows where tracked_ref != 'none'
|
||||
// AND installed_sha IS NOT NULL (skip pre-migration rows with NULL SHA).
|
||||
// 2. For each row, resolves the tracked ref to its current upstream SHA
|
||||
// using the appropriate SourceResolver.
|
||||
// using the appropriate per-scheme SourceResolver (via the
|
||||
// RegistryResolver passed in at sweeper start).
|
||||
// 3. If the resolved SHA differs from installed_sha → drift detected.
|
||||
// 4. On drift, INSERT INTO plugin_update_queue (ON CONFLICT DO NOTHING so
|
||||
// a re-drift while a row is still pending is a no-op).
|
||||
@@ -61,10 +62,21 @@ const DriftSweepInterval = 1 * time.Hour
|
||||
// that handles Gitea instances on high-latency links.
|
||||
const ResolveRefDeadline = 60 * time.Second
|
||||
|
||||
// SourceResolver resolves plugin sources to installable directories.
|
||||
// Satisfied by *Registry (which wraps GithubResolver + LocalResolver).
|
||||
type SourceResolver interface {
|
||||
Resolve(source Source) (SourceResolver, error)
|
||||
// RegistryResolver is the slim shape of a plugin source-scheme registry that
|
||||
// the drift sweeper depends on. It exists distinct from `SourceResolver` (the
|
||||
// per-scheme fetcher interface in source.go) so the two type names don't
|
||||
// collide inside this package — historically both were named `SourceResolver`,
|
||||
// which broke `go build` (issue: docker build fails with "SourceResolver
|
||||
// redeclared in this block"). Satisfied by *Registry, which holds the set of
|
||||
// per-scheme resolvers and exposes the list of registered scheme prefixes.
|
||||
//
|
||||
// Only `Schemes()` is used by the sweeper today (to strip the scheme prefix
|
||||
// from `source_raw` before handing the spec to GithubResolver). If the
|
||||
// sweeper ever needs to dispatch via the registry (e.g. to support
|
||||
// non-github schemes for drift detection), add the resolution method back
|
||||
// here — but keep it returning `SourceResolver` so it stays compatible with
|
||||
// `*Registry.Resolve`.
|
||||
type RegistryResolver interface {
|
||||
Schemes() []string
|
||||
}
|
||||
|
||||
@@ -74,7 +86,7 @@ type SourceResolver interface {
|
||||
//
|
||||
// Registers itself via atexits in cmd/server/main.go so the process
|
||||
// shuts down cleanly on SIGTERM.
|
||||
func StartPluginDriftSweeper(ctx context.Context, resolver SourceResolver) {
|
||||
func StartPluginDriftSweeper(ctx context.Context, resolver RegistryResolver) {
|
||||
if resolver == nil {
|
||||
log.Println("Plugin drift sweeper: resolver is nil — sweeper disabled")
|
||||
return
|
||||
@@ -107,7 +119,7 @@ func StartPluginDriftSweeper(ctx context.Context, resolver SourceResolver) {
|
||||
// sweepDriftOnce runs one full drift-detection cycle.
|
||||
// Errors are non-fatal — each row is handled independently so a single
|
||||
// slow row doesn't block the rest of the sweep.
|
||||
func sweepDriftOnce(parent context.Context, resolver SourceResolver) {
|
||||
func sweepDriftOnce(parent context.Context, resolver RegistryResolver) {
|
||||
ctx, cancel := context.WithTimeout(parent, 10*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
@@ -170,7 +182,7 @@ func sweepDriftOnce(parent context.Context, resolver SourceResolver) {
|
||||
// resolveLatestSHA resolves the tracked ref to its current upstream SHA.
|
||||
// Handles both github:// and local:// sources; local sources are skipped
|
||||
// (no meaningful upstream to drift against).
|
||||
func resolveLatestSHA(ctx context.Context, resolver SourceResolver, sourceRaw, trackedRef string) (string, error) {
|
||||
func resolveLatestSHA(ctx context.Context, resolver RegistryResolver, sourceRaw, trackedRef string) (string, error) {
|
||||
// Strip the scheme prefix to get the raw spec.
|
||||
// sourceRaw is stored as the full string, e.g. "github://owner/repo#tag:v1.0.0"
|
||||
spec := sourceRaw
|
||||
@@ -231,7 +243,7 @@ func queueDriftEntry(ctx context.Context, workspaceID, pluginName, trackedRef, c
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// SweepDriftOnceForTest exposes sweepDriftOnce for package-level testing.
|
||||
func SweepDriftOnceForTest(parent context.Context, resolver SourceResolver) {
|
||||
func SweepDriftOnceForTest(parent context.Context, resolver RegistryResolver) {
|
||||
sweepDriftOnce(parent, resolver)
|
||||
}
|
||||
|
||||
|
||||
@@ -2,20 +2,17 @@ package plugins
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// stubResolver is a SourceResolver that always returns a stub github resolver.
|
||||
// stubResolver satisfies plugins.RegistryResolver — the slim shape the
|
||||
// drift sweeper consumes (just `Schemes()`). Tests that dispatch by scheme
|
||||
// build a per-scheme SourceResolver directly via NewGithubResolver().
|
||||
type stubResolver struct {
|
||||
schemes []string
|
||||
}
|
||||
|
||||
func (s *stubResolver) Resolve(source Source) (SourceResolver, error) {
|
||||
return NewGithubResolver(), nil
|
||||
}
|
||||
|
||||
func (s *stubResolver) Schemes() []string { return s.schemes }
|
||||
|
||||
func TestResolveRef_RejectsBareSpec(t *testing.T) {
|
||||
@@ -156,8 +153,10 @@ func TestPluginUpdateQueueRow_Struct(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestSourceResolverInterface_StubResolver verifies that a stub resolver
|
||||
// satisfies the SourceResolver interface.
|
||||
func TestSourceResolverInterface_StubResolver(t *testing.T) {
|
||||
var _ SourceResolver = (*stubResolver)(nil)
|
||||
// TestRegistryResolverInterface_StubResolver verifies that a stub resolver
|
||||
// satisfies the RegistryResolver interface — the slim shape the drift
|
||||
// sweeper consumes. (The per-scheme SourceResolver interface lives in
|
||||
// source.go and is exercised by GithubResolver / LocalResolver tests.)
|
||||
func TestRegistryResolverInterface_StubResolver(t *testing.T) {
|
||||
var _ RegistryResolver = (*stubResolver)(nil)
|
||||
}
|
||||
|
||||
@@ -501,12 +501,10 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
|
||||
// Admin — plugin version-subscription drift queue (core#123).
|
||||
// List pending drift entries and apply approved updates.
|
||||
{
|
||||
driftH := handlers.NewAdminPluginDriftHandler(plgh)
|
||||
adminAuth := r.Group("", middleware.AdminAuth(db.DB))
|
||||
adminAuth.GET("/admin/plugin-updates-pending", driftH.ListPending)
|
||||
adminAuth.POST("/admin/plugin-updates/:id/apply", driftH.Apply)
|
||||
}
|
||||
// Moved below plgh declaration — was previously here but referenced
|
||||
// `plgh` before its `:=` at the same function level, which started
|
||||
// failing once the upstream `SourceResolver` redeclaration was fixed
|
||||
// (the prior compile-time block was masking the forward reference).
|
||||
|
||||
// Admin — test token minting (issue #6). Hidden in production via TestTokensEnabled().
|
||||
// NOT behind AdminAuth — this is the bootstrap endpoint E2E tests and
|
||||
@@ -646,6 +644,19 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
// unpack locally instead of going through Docker exec.
|
||||
wsAuth.GET("/plugins/:name/download", plgh.Download)
|
||||
|
||||
// Admin — plugin version-subscription drift queue (core#123).
|
||||
// List pending drift entries and apply approved updates.
|
||||
// Wired here (after plgh) rather than in the admin block above so the
|
||||
// `plgh` reference resolves — the previous placement was a forward
|
||||
// reference, masked by the upstream `SourceResolver` redeclaration in
|
||||
// internal/plugins/drift_sweeper.go.
|
||||
{
|
||||
driftH := handlers.NewAdminPluginDriftHandler(plgh)
|
||||
adminAuth := r.Group("", middleware.AdminAuth(db.DB))
|
||||
adminAuth.GET("/admin/plugin-updates-pending", driftH.ListPending)
|
||||
adminAuth.POST("/admin/plugin-updates/:id/apply", driftH.Apply)
|
||||
}
|
||||
|
||||
// Bundles — #164 + #165: both gated behind AdminAuth.
|
||||
// POST /bundles/import — CRITICAL: anon creation of arbitrary workspaces
|
||||
// with user-supplied config (system prompts,
|
||||
|
||||
Reference in New Issue
Block a user