From 9f031efd615abb60476a4214dcfa922e3a7cb8d0 Mon Sep 17 00:00:00 2001 From: William Burton Date: Thu, 10 Jul 2025 17:26:55 -0400 Subject: [PATCH] Change localfiles to be configurable --- tools/ctl/ctl.go | 38 +++++++++++++++++++++++----- tools/ctl/ide/commands/commands.go | 10 ++++---- tools/ctl/ide/rebuilder/rebuilder.go | 14 +++++++--- tools/ctl/ide/ui.go | 6 ++--- tools/ctl/localfiles/butler.go | 6 +++-- tools/ctl/localfiles/localfiles.go | 38 ++++++++++++++++++---------- 6 files changed, 79 insertions(+), 33 deletions(-) diff --git a/tools/ctl/ctl.go b/tools/ctl/ctl.go index 76aba91b..64357db0 100644 --- a/tools/ctl/ctl.go +++ b/tools/ctl/ctl.go @@ -154,6 +154,10 @@ var tui = &cobra.Command{ } } } + lfs, err := localfiles.NewLocalFileStore(*assetDir) + if err != nil { + log.Fatal(errors.Wrap(err, "creating local file store")) + } var dex rundex.Reader var watcher rundex.Watcher { @@ -166,7 +170,11 @@ var tui = &cobra.Command{ log.Fatal(err) } } else { - lc := rundex.NewLocalClient(localfiles.Rundex()) + rundexFS, err := lfs.Rundex() + if err != nil { + log.Fatal(errors.Wrap(err, "getting rundex filesystem")) + } + lc := rundex.NewLocalClient(rundexFS) dex = lc watcher = lc } @@ -180,7 +188,7 @@ var tui = &cobra.Command{ } } else { var err error - if buildDefs, err = localfiles.BuildDefs(); err != nil { + if buildDefs, err = lfs.BuildDefs(); err != nil { log.Fatal(errors.Wrap(err, "failed to create local build def asset store")) } } @@ -191,7 +199,7 @@ var tui = &cobra.Command{ NPM: npmreg.HTTPRegistry{Client: regclient}, PyPI: pypireg.HTTPRegistry{Client: regclient}, } - butler := localfiles.NewButler(*metadataBucket, *logsBucket, *debugStorage, mux) + butler := localfiles.NewButler(lfs, *metadataBucket, *logsBucket, *debugStorage, mux) var aiClient *genai.Client { aiProject := *project @@ -216,7 +224,7 @@ var tui = &cobra.Command{ } } benches := benchmark.NewFSRepository(osfs.New(*benchmarkDir)) - tapp := ide.NewTuiApp(dex, watcher, rundex.FetchRebuildOpts{Clean: *clean}, benches, buildDefs, butler, aiClient) + tapp := ide.NewTuiApp(dex, watcher, rundex.FetchRebuildOpts{Clean: *clean}, benches, buildDefs, butler, aiClient, lfs) if err := tapp.Run(cmd.Context()); err != nil { // TODO: This cleanup will be unnecessary once NewTuiApp does split logging. log.Default().SetOutput(os.Stdout) @@ -313,6 +321,10 @@ var getResults = &cobra.Command{ } } case "assets": + lfs, err := localfiles.NewLocalFileStore(*assetDir) + if err != nil { + log.Fatal(errors.Wrap(err, "creating local file store")) + } regclient := http.DefaultClient mux := rebuild.RegistryMux{ Debian: debianreg.HTTPRegistry{Client: regclient}, @@ -320,7 +332,7 @@ var getResults = &cobra.Command{ NPM: npmreg.HTTPRegistry{Client: regclient}, PyPI: pypireg.HTTPRegistry{Client: regclient}, } - butler := localfiles.NewButler(*metadataBucket, *logsBucket, *debugStorage, mux) + butler := localfiles.NewButler(lfs, *metadataBucket, *logsBucket, *debugStorage, mux) atype := rebuild.AssetType(*assetType) ctx := cmd.Context() for _, r := range rebuilds { @@ -368,20 +380,28 @@ var runBenchmark = &cobra.Command{ var executor run.ExecutionService concurrency := *maxConcurrency if *buildLocal { + lfs, err := localfiles.NewLocalFileStore(*assetDir) + if err != nil { + log.Fatal(errors.Wrap(err, "creating local file store")) + } now := time.Now().UTC() runID = now.Format(time.RFC3339) if concurrency != 1 { log.Println("Note: dropping max concurrency to 1 due to local execution") } concurrency = 1 - store, err := localfiles.AssetStore(runID) + store, err := lfs.AssetStore(runID) if err != nil { log.Fatalf("Failed to create temp directory: %v", err) } // TODO: Validate this. prebuildURL := fmt.Sprintf("https://%s.storage.googleapis.com/%s", *prebuildBucket, *prebuildVersion) executor = run.NewLocalExecutionService(prebuildURL, store, cmd.OutOrStdout()) - dex = rundex.NewLocalClient(localfiles.Rundex()) + rundexFS, err := lfs.Rundex() + if err != nil { + log.Fatal(errors.Wrap(err, "getting rundex filesystem")) + } + dex = rundex.NewLocalClient(rundexFS) if err := dex.WriteRun(ctx, rundex.FromRun(schema.Run{ ID: runID, BenchmarkName: filepath.Base(args[1]), @@ -962,6 +982,7 @@ var ( metadataBucket = flag.String("metadata-bucket", "", "the gcs bucket where rebuild output is stored") useNetworkProxy = flag.Bool("use-network-proxy", false, "request the newtwork proxy") useSyscallMonitor = flag.Bool("use-syscall-monitor", false, "request syscall monitoring") + assetDir = flag.String("asset-dir", "/tmp/oss-rebuild", "the directory to store local assets") // run-bench maxConcurrency = flag.Int("max-concurrency", 90, "maximum number of inflight requests") buildLocal = flag.Bool("local", false, "true if this request is going direct to build-local (not through API first)") @@ -991,6 +1012,8 @@ var ( ) func init() { + rootCmd.PersistentFlags().AddGoFlag(flag.Lookup("asset-dir")) + runBenchmark.Flags().AddGoFlag(flag.Lookup("api")) runBenchmark.Flags().AddGoFlag(flag.Lookup("max-concurrency")) runBenchmark.Flags().AddGoFlag(flag.Lookup("local")) @@ -1062,6 +1085,7 @@ func init() { rootCmd.AddCommand(getTrackedPackagesCmd) } + func main() { flag.Parse() if err := rootCmd.Execute(); err != nil { diff --git a/tools/ctl/ide/commands/commands.go b/tools/ctl/ide/commands/commands.go index 3d8f29c2..4cade5f9 100644 --- a/tools/ctl/ide/commands/commands.go +++ b/tools/ctl/ide/commands/commands.go @@ -212,7 +212,7 @@ func NewRebuildCmds(app *tview.Application, rb *rebuilder.Rebuilder, modalFn mod } } -func NewRebuildGroupCmds(app *tview.Application, rb *rebuilder.Rebuilder, modalFn modal.Fn, butler localfiles.Butler, aiClient *genai.Client, buildDefs rebuild.LocatableAssetStore, dex rundex.Reader, benches benchmark.Repository, cmdReg *commandreg.Registry) []commandreg.RebuildGroupCmd { +func NewRebuildGroupCmds(app *tview.Application, rb *rebuilder.Rebuilder, modalFn modal.Fn, butler localfiles.Butler, aiClient *genai.Client, buildDefs rebuild.LocatableAssetStore, dex rundex.Reader, benches benchmark.Repository, cmdReg *commandreg.Registry, lfs *localfiles.LocalFileStore) []commandreg.RebuildGroupCmd { return []commandreg.RebuildGroupCmd{ { Short: "Find pattern", @@ -238,7 +238,7 @@ func NewRebuildGroupCmds(app *tview.Application, rb *rebuilder.Rebuilder, modalF }) var found int p = p.Do(func(in rundex.Rebuild, out chan<- rundex.Rebuild) { - assets, err := localfiles.AssetStore(in.RunID) + assets, err := lfs.AssetStore(in.RunID) if err != nil { log.Println(errors.Wrapf(err, "creating asset store for runid: %s", in.RunID)) return @@ -305,7 +305,7 @@ func NewRebuildGroupCmds(app *tview.Application, rb *rebuilder.Rebuilder, modalF } p2 := pipe.ParInto(LLMRequestParallelism, p1, func(in rundex.Rebuild, out chan<- summarizedRebuild) { const uploadBytesLimit = 100_000 - assets, err := localfiles.AssetStore(in.RunID) + assets, err := lfs.AssetStore(in.RunID) if err != nil { log.Println(errors.Wrapf(err, "creating asset store for runid: %s", in.RunID)) return @@ -353,7 +353,7 @@ func NewRebuildGroupCmds(app *tview.Application, rb *rebuilder.Rebuilder, modalF parts = append([]*genai.Part{{Text: "Based on the following error summaries, please provide 1 to 5 classes of failures you think are happening."}}, parts...) <-ticker rawFailureClasses, err := llm.GenerateTextContent(ctx, aiClient, llm.GeminiFlash, config, parts...) - if err != nil { + if err != nil { log.Println(errors.Wrap(err, "classifying summaries")) return } @@ -361,7 +361,7 @@ func NewRebuildGroupCmds(app *tview.Application, rb *rebuilder.Rebuilder, modalF parts = []*genai.Part{{Text: "Please format this list of summaries into one line per summary. Do not include any of the following: syntax highlighting, numbering, bullet points, or other markdown."}, {Text: rawFailureClasses}} <-ticker failuresClasses, err := llm.GenerateTextContent(ctx, aiClient, llm.GeminiFlash, config, parts...) - if err != nil { + if err != nil { log.Println(errors.Wrap(err, "formatting classes")) return } diff --git a/tools/ctl/ide/rebuilder/rebuilder.go b/tools/ctl/ide/rebuilder/rebuilder.go index cbb27a70..72b8cc7c 100644 --- a/tools/ctl/ide/rebuilder/rebuilder.go +++ b/tools/ctl/ide/rebuilder/rebuilder.go @@ -58,6 +58,7 @@ const ( type Instance struct { ID string URL *url.URL + lfs *localfiles.LocalFileStore cancel func() state instanceState } @@ -83,7 +84,7 @@ func (in *Instance) Run(ctx context.Context) { in.state = running idchan := make(chan string) go func() { - assetDir := localfiles.AssetsPath() + assetDir := in.lfs.AssetsPath() err = docker.RunServer( ctx, "rebuilder", @@ -149,9 +150,15 @@ func (in *Instance) Wait(ctx context.Context) <-chan error { // Rebuilder manages a local instance of the rebuilder docker container. type Rebuilder struct { instance *Instance + lfs *localfiles.LocalFileStore m sync.Mutex } +// NewRebuilder creates a new Rebuilder. +func NewRebuilder(lfs *localfiles.LocalFileStore) *Rebuilder { + return &Rebuilder{lfs: lfs} +} + // Kill does a non-blocking shutdown of the rebuilder container. func (rb *Rebuilder) Kill() { rb.m.Lock() @@ -169,7 +176,7 @@ func (rb *Rebuilder) Instance() *Instance { rb.m.Lock() defer rb.m.Unlock() if rb.instance == nil || rb.instance.Dead() { - rb.instance = &Instance{} + rb.instance = &Instance{lfs: rb.lfs} } return rb.instance } @@ -231,7 +238,8 @@ func (rb *Rebuilder) RunLocal(ctx context.Context, r rundex.Rebuild, opts RunLoc } // RunBench executes the benchmark against the local rebuilder. -func (rb *Rebuilder) RunBench(ctx context.Context, set benchmark.PackageSet, runID string) (<-chan schema.Verdict, error) { +func (rb *Rebuilder) RunBench(ctx context.Context, set benchmark.PackageSet, runID string) (<- + chan schema.Verdict, error) { inst, err := rb.runningInstance(ctx) if err != nil { return nil, errors.Wrap(err, "getting running instance") diff --git a/tools/ctl/ide/ui.go b/tools/ctl/ide/ui.go index c03cc21d..0aa3b4d7 100644 --- a/tools/ctl/ide/ui.go +++ b/tools/ctl/ide/ui.go @@ -36,7 +36,7 @@ type TuiApp struct { } // NewTuiApp creates a new tuiApp object. -func NewTuiApp(dex rundex.Reader, watcher rundex.Watcher, rundexOpts rundex.FetchRebuildOpts, benches benchmark.Repository, buildDefs rebuild.LocatableAssetStore, butler localfiles.Butler, aiClient *genai.Client) *TuiApp { +func NewTuiApp(dex rundex.Reader, watcher rundex.Watcher, rundexOpts rundex.FetchRebuildOpts, benches benchmark.Repository, buildDefs rebuild.LocatableAssetStore, butler localfiles.Butler, aiClient *genai.Client, lfs *localfiles.LocalFileStore) *TuiApp { var t *TuiApp { app := tview.NewApplication() @@ -48,7 +48,7 @@ func NewTuiApp(dex rundex.Reader, watcher rundex.Watcher, rundexOpts rundex.Fetc log.Default().SetFlags(0) logs.SetBorder(true).SetTitle("Logs") logs.ScrollToEnd() - rb := &rebuilder.Rebuilder{} + rb := rebuilder.NewRebuilder(lfs) t = &TuiApp{ app: app, // When the widgets are updated, we should refresh the application. @@ -69,7 +69,7 @@ func NewTuiApp(dex rundex.Reader, watcher rundex.Watcher, rundexOpts rundex.Fetc if err := cmdReg.AddBenchmarks(commands.NewBenchmarkCmds(t.app, t.rb, modalFn, butler, aiClient, buildDefs, dex, benches, cmdReg)...); err != nil { log.Fatal(err) } - if err := cmdReg.AddRebuildGroups(commands.NewRebuildGroupCmds(t.app, t.rb, modalFn, butler, aiClient, buildDefs, dex, benches, cmdReg)...); err != nil { + if err := cmdReg.AddRebuildGroups(commands.NewRebuildGroupCmds(t.app, t.rb, modalFn, butler, aiClient, buildDefs, dex, benches, cmdReg, lfs)...); err != nil { log.Fatal(err) } if err := cmdReg.AddRebuilds(commands.NewRebuildCmds(t.app, t.rb, modalFn, butler, aiClient, buildDefs, dex, benches, cmdReg)...); err != nil { diff --git a/tools/ctl/localfiles/butler.go b/tools/ctl/localfiles/butler.go index 014e9570..a0ffcefc 100644 --- a/tools/ctl/localfiles/butler.go +++ b/tools/ctl/localfiles/butler.go @@ -18,11 +18,13 @@ type Butler interface { } type butler struct { + lfs *LocalFileStore metaAssetstore assetlocator.MetaAssetStore } -func NewButler(metadataBucket, logsBucket, debugBucket string, mux rebuild.RegistryMux) Butler { +func NewButler(lfs *LocalFileStore, metadataBucket, logsBucket, debugBucket string, mux rebuild.RegistryMux) Butler { return &butler{ + lfs: lfs, metaAssetstore: assetlocator.MetaAssetStore{ MetadataBucket: metadataBucket, LogsBucket: logsBucket, @@ -33,7 +35,7 @@ func NewButler(metadataBucket, logsBucket, debugBucket string, mux rebuild.Regis } func (b *butler) Fetch(ctx context.Context, runID string, wasSmoketest bool, want rebuild.Asset) (path string, err error) { - localAssets, err := AssetStore(runID) + localAssets, err := b.lfs.AssetStore(runID) if err != nil { return "", err } diff --git a/tools/ctl/localfiles/localfiles.go b/tools/ctl/localfiles/localfiles.go index e5b1d590..3105e756 100644 --- a/tools/ctl/localfiles/localfiles.go +++ b/tools/ctl/localfiles/localfiles.go @@ -1,11 +1,7 @@ -// Copyright 2025 Google LLC -// SPDX-License-Identifier: Apache-2.0 - package localfiles import ( "os" - "path" "path/filepath" "github.com/go-git/go-billy/v5" @@ -14,24 +10,39 @@ import ( "github.com/pkg/errors" ) +// LocalFileStore provides access to local file-based resources. +type LocalFileStore struct { + root billy.Filesystem +} + +// NewLocalFileStore creates a new LocalFileStore rooted at the given path. +func NewLocalFileStore(rootPath string) (*LocalFileStore, error) { + if err := os.MkdirAll(rootPath, 0755); err != nil { + return nil, errors.Wrapf(err, "failed to create directory %s", rootPath) + } + return &LocalFileStore{root: osfs.New(rootPath)}, nil +} + const ( - tempRoot = "/tmp/oss-rebuild/" // The directory where all the local files are stored // Subdirectories assets = "assets" // AssetStore used to cache logs, artifacts, and other assets manipulated by ctl. rundex = "rundex" // The local metadata about runs and attempts, normally generated by local smoketets benchmarks. buildDefs = "build-defs" // Working copy of build definitions. ) -func Rundex() billy.Filesystem { - return osfs.New(path.Join(tempRoot, rundex)) +// Rundex returns a filesystem for the rundex subdirectory. +func (lfs *LocalFileStore) Rundex() (billy.Filesystem, error) { + return lfs.root.Chroot(rundex) } -func AssetsPath() string { - return filepath.Join(tempRoot, assets) +// AssetsPath returns the path to the assets subdirectory. +func (lfs *LocalFileStore) AssetsPath() string { + return filepath.Join(lfs.root.Root(), assets) } -func BuildDefs() (*rebuild.FilesystemAssetStore, error) { - dir := filepath.Join(tempRoot, assets) +// BuildDefs returns a filesystem asset store for build definitions. +func (lfs *LocalFileStore) BuildDefs() (*rebuild.FilesystemAssetStore, error) { + dir := filepath.Join(lfs.root.Root(), assets) if err := os.MkdirAll(dir, 0755); err != nil { return nil, errors.Wrapf(err, "failed to create directory %s", dir) } @@ -42,8 +53,9 @@ func BuildDefs() (*rebuild.FilesystemAssetStore, error) { return rebuild.NewFilesystemAssetStore(assetsFS), nil } -func AssetStore(runID string) (*rebuild.FilesystemAssetStore, error) { - dir := filepath.Join(tempRoot, assets, runID) +// AssetStore returns a filesystem asset store for a specific run. +func (lfs *LocalFileStore) AssetStore(runID string) (*rebuild.FilesystemAssetStore, error) { + dir := filepath.Join(lfs.root.Root(), assets, runID) if err := os.MkdirAll(dir, 0755); err != nil { return nil, errors.Wrapf(err, "failed to create directory %s", dir) }