Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions tools/ctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ var tui = &cobra.Command{
}
}
}
lfs, err := localfiles.NewLocalFileStore(*assetDir)
if err != nil {
log.Fatal(errors.Wrap(err, "creating local file store"))
}
var dex rundex.Reader
var watcher rundex.Watcher
{
Expand All @@ -166,7 +170,11 @@ var tui = &cobra.Command{
log.Fatal(err)
}
} else {
lc := rundex.NewLocalClient(localfiles.Rundex())
rundexFS, err := lfs.Rundex()
if err != nil {
log.Fatal(errors.Wrap(err, "getting rundex filesystem"))
}
lc := rundex.NewLocalClient(rundexFS)
dex = lc
watcher = lc
}
Expand All @@ -180,7 +188,7 @@ var tui = &cobra.Command{
}
} else {
var err error
if buildDefs, err = localfiles.BuildDefs(); err != nil {
if buildDefs, err = lfs.BuildDefs(); err != nil {
log.Fatal(errors.Wrap(err, "failed to create local build def asset store"))
}
}
Expand All @@ -191,7 +199,7 @@ var tui = &cobra.Command{
NPM: npmreg.HTTPRegistry{Client: regclient},
PyPI: pypireg.HTTPRegistry{Client: regclient},
}
butler := localfiles.NewButler(*metadataBucket, *logsBucket, *debugStorage, mux)
butler := localfiles.NewButler(lfs, *metadataBucket, *logsBucket, *debugStorage, mux)
var aiClient *genai.Client
{
aiProject := *project
Expand All @@ -216,7 +224,7 @@ var tui = &cobra.Command{
}
}
benches := benchmark.NewFSRepository(osfs.New(*benchmarkDir))
tapp := ide.NewTuiApp(dex, watcher, rundex.FetchRebuildOpts{Clean: *clean}, benches, buildDefs, butler, aiClient)
tapp := ide.NewTuiApp(dex, watcher, rundex.FetchRebuildOpts{Clean: *clean}, benches, buildDefs, butler, aiClient, lfs)
if err := tapp.Run(cmd.Context()); err != nil {
// TODO: This cleanup will be unnecessary once NewTuiApp does split logging.
log.Default().SetOutput(os.Stdout)
Expand Down Expand Up @@ -313,14 +321,18 @@ var getResults = &cobra.Command{
}
}
case "assets":
lfs, err := localfiles.NewLocalFileStore(*assetDir)
if err != nil {
log.Fatal(errors.Wrap(err, "creating local file store"))
}
regclient := http.DefaultClient
mux := rebuild.RegistryMux{
Debian: debianreg.HTTPRegistry{Client: regclient},
CratesIO: cratesreg.HTTPRegistry{Client: regclient},
NPM: npmreg.HTTPRegistry{Client: regclient},
PyPI: pypireg.HTTPRegistry{Client: regclient},
}
butler := localfiles.NewButler(*metadataBucket, *logsBucket, *debugStorage, mux)
butler := localfiles.NewButler(lfs, *metadataBucket, *logsBucket, *debugStorage, mux)
atype := rebuild.AssetType(*assetType)
ctx := cmd.Context()
for _, r := range rebuilds {
Expand Down Expand Up @@ -368,20 +380,28 @@ var runBenchmark = &cobra.Command{
var executor run.ExecutionService
concurrency := *maxConcurrency
if *buildLocal {
lfs, err := localfiles.NewLocalFileStore(*assetDir)
if err != nil {
log.Fatal(errors.Wrap(err, "creating local file store"))
}
now := time.Now().UTC()
runID = now.Format(time.RFC3339)
if concurrency != 1 {
log.Println("Note: dropping max concurrency to 1 due to local execution")
}
concurrency = 1
store, err := localfiles.AssetStore(runID)
store, err := lfs.AssetStore(runID)
if err != nil {
log.Fatalf("Failed to create temp directory: %v", err)
}
// TODO: Validate this.
prebuildURL := fmt.Sprintf("https://%s.storage.googleapis.com/%s", *prebuildBucket, *prebuildVersion)
executor = run.NewLocalExecutionService(prebuildURL, store, cmd.OutOrStdout())
dex = rundex.NewLocalClient(localfiles.Rundex())
rundexFS, err := lfs.Rundex()
if err != nil {
log.Fatal(errors.Wrap(err, "getting rundex filesystem"))
}
dex = rundex.NewLocalClient(rundexFS)
if err := dex.WriteRun(ctx, rundex.FromRun(schema.Run{
ID: runID,
BenchmarkName: filepath.Base(args[1]),
Expand Down Expand Up @@ -962,6 +982,7 @@ var (
metadataBucket = flag.String("metadata-bucket", "", "the gcs bucket where rebuild output is stored")
useNetworkProxy = flag.Bool("use-network-proxy", false, "request the newtwork proxy")
useSyscallMonitor = flag.Bool("use-syscall-monitor", false, "request syscall monitoring")
assetDir = flag.String("asset-dir", "/tmp/oss-rebuild", "the directory to store local assets")
// run-bench
maxConcurrency = flag.Int("max-concurrency", 90, "maximum number of inflight requests")
buildLocal = flag.Bool("local", false, "true if this request is going direct to build-local (not through API first)")
Expand Down Expand Up @@ -991,6 +1012,8 @@ var (
)

func init() {
rootCmd.PersistentFlags().AddGoFlag(flag.Lookup("asset-dir"))

runBenchmark.Flags().AddGoFlag(flag.Lookup("api"))
runBenchmark.Flags().AddGoFlag(flag.Lookup("max-concurrency"))
runBenchmark.Flags().AddGoFlag(flag.Lookup("local"))
Expand Down Expand Up @@ -1062,6 +1085,7 @@ func init() {
rootCmd.AddCommand(getTrackedPackagesCmd)
}


func main() {
flag.Parse()
if err := rootCmd.Execute(); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions tools/ctl/ide/commands/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func NewRebuildCmds(app *tview.Application, rb *rebuilder.Rebuilder, modalFn mod
}
}

func NewRebuildGroupCmds(app *tview.Application, rb *rebuilder.Rebuilder, modalFn modal.Fn, butler localfiles.Butler, aiClient *genai.Client, buildDefs rebuild.LocatableAssetStore, dex rundex.Reader, benches benchmark.Repository, cmdReg *commandreg.Registry) []commandreg.RebuildGroupCmd {
func NewRebuildGroupCmds(app *tview.Application, rb *rebuilder.Rebuilder, modalFn modal.Fn, butler localfiles.Butler, aiClient *genai.Client, buildDefs rebuild.LocatableAssetStore, dex rundex.Reader, benches benchmark.Repository, cmdReg *commandreg.Registry, lfs *localfiles.LocalFileStore) []commandreg.RebuildGroupCmd {
return []commandreg.RebuildGroupCmd{
{
Short: "Find pattern",
Expand All @@ -238,7 +238,7 @@ func NewRebuildGroupCmds(app *tview.Application, rb *rebuilder.Rebuilder, modalF
})
var found int
p = p.Do(func(in rundex.Rebuild, out chan<- rundex.Rebuild) {
assets, err := localfiles.AssetStore(in.RunID)
assets, err := lfs.AssetStore(in.RunID)
if err != nil {
log.Println(errors.Wrapf(err, "creating asset store for runid: %s", in.RunID))
return
Expand Down Expand Up @@ -305,7 +305,7 @@ func NewRebuildGroupCmds(app *tview.Application, rb *rebuilder.Rebuilder, modalF
}
p2 := pipe.ParInto(LLMRequestParallelism, p1, func(in rundex.Rebuild, out chan<- summarizedRebuild) {
const uploadBytesLimit = 100_000
assets, err := localfiles.AssetStore(in.RunID)
assets, err := lfs.AssetStore(in.RunID)
if err != nil {
log.Println(errors.Wrapf(err, "creating asset store for runid: %s", in.RunID))
return
Expand Down Expand Up @@ -353,15 +353,15 @@ func NewRebuildGroupCmds(app *tview.Application, rb *rebuilder.Rebuilder, modalF
parts = append([]*genai.Part{{Text: "Based on the following error summaries, please provide 1 to 5 classes of failures you think are happening."}}, parts...)
<-ticker
rawFailureClasses, err := llm.GenerateTextContent(ctx, aiClient, llm.GeminiFlash, config, parts...)
if err != nil {
if err != nil {
log.Println(errors.Wrap(err, "classifying summaries"))
return
}
log.Println(rawFailureClasses)
parts = []*genai.Part{{Text: "Please format this list of summaries into one line per summary. Do not include any of the following: syntax highlighting, numbering, bullet points, or other markdown."}, {Text: rawFailureClasses}}
<-ticker
failuresClasses, err := llm.GenerateTextContent(ctx, aiClient, llm.GeminiFlash, config, parts...)
if err != nil {
if err != nil {
log.Println(errors.Wrap(err, "formatting classes"))
return
}
Expand Down
14 changes: 11 additions & 3 deletions tools/ctl/ide/rebuilder/rebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
type Instance struct {
ID string
URL *url.URL
lfs *localfiles.LocalFileStore
cancel func()
state instanceState
}
Expand All @@ -83,7 +84,7 @@ func (in *Instance) Run(ctx context.Context) {
in.state = running
idchan := make(chan string)
go func() {
assetDir := localfiles.AssetsPath()
assetDir := in.lfs.AssetsPath()
err = docker.RunServer(
ctx,
"rebuilder",
Expand Down Expand Up @@ -149,9 +150,15 @@ func (in *Instance) Wait(ctx context.Context) <-chan error {
// Rebuilder manages a local instance of the rebuilder docker container.
type Rebuilder struct {
instance *Instance
lfs *localfiles.LocalFileStore
m sync.Mutex
}

// NewRebuilder creates a new Rebuilder.
func NewRebuilder(lfs *localfiles.LocalFileStore) *Rebuilder {
return &Rebuilder{lfs: lfs}
}

// Kill does a non-blocking shutdown of the rebuilder container.
func (rb *Rebuilder) Kill() {
rb.m.Lock()
Expand All @@ -169,7 +176,7 @@ func (rb *Rebuilder) Instance() *Instance {
rb.m.Lock()
defer rb.m.Unlock()
if rb.instance == nil || rb.instance.Dead() {
rb.instance = &Instance{}
rb.instance = &Instance{lfs: rb.lfs}
}
return rb.instance
}
Expand Down Expand Up @@ -231,7 +238,8 @@ func (rb *Rebuilder) RunLocal(ctx context.Context, r rundex.Rebuild, opts RunLoc
}

// RunBench executes the benchmark against the local rebuilder.
func (rb *Rebuilder) RunBench(ctx context.Context, set benchmark.PackageSet, runID string) (<-chan schema.Verdict, error) {
func (rb *Rebuilder) RunBench(ctx context.Context, set benchmark.PackageSet, runID string) (<-
chan schema.Verdict, error) {
inst, err := rb.runningInstance(ctx)
if err != nil {
return nil, errors.Wrap(err, "getting running instance")
Expand Down
6 changes: 3 additions & 3 deletions tools/ctl/ide/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type TuiApp struct {
}

// NewTuiApp creates a new tuiApp object.
func NewTuiApp(dex rundex.Reader, watcher rundex.Watcher, rundexOpts rundex.FetchRebuildOpts, benches benchmark.Repository, buildDefs rebuild.LocatableAssetStore, butler localfiles.Butler, aiClient *genai.Client) *TuiApp {
func NewTuiApp(dex rundex.Reader, watcher rundex.Watcher, rundexOpts rundex.FetchRebuildOpts, benches benchmark.Repository, buildDefs rebuild.LocatableAssetStore, butler localfiles.Butler, aiClient *genai.Client, lfs *localfiles.LocalFileStore) *TuiApp {
var t *TuiApp
{
app := tview.NewApplication()
Expand All @@ -48,7 +48,7 @@ func NewTuiApp(dex rundex.Reader, watcher rundex.Watcher, rundexOpts rundex.Fetc
log.Default().SetFlags(0)
logs.SetBorder(true).SetTitle("Logs")
logs.ScrollToEnd()
rb := &rebuilder.Rebuilder{}
rb := rebuilder.NewRebuilder(lfs)
t = &TuiApp{
app: app,
// When the widgets are updated, we should refresh the application.
Expand All @@ -69,7 +69,7 @@ func NewTuiApp(dex rundex.Reader, watcher rundex.Watcher, rundexOpts rundex.Fetc
if err := cmdReg.AddBenchmarks(commands.NewBenchmarkCmds(t.app, t.rb, modalFn, butler, aiClient, buildDefs, dex, benches, cmdReg)...); err != nil {
log.Fatal(err)
}
if err := cmdReg.AddRebuildGroups(commands.NewRebuildGroupCmds(t.app, t.rb, modalFn, butler, aiClient, buildDefs, dex, benches, cmdReg)...); err != nil {
if err := cmdReg.AddRebuildGroups(commands.NewRebuildGroupCmds(t.app, t.rb, modalFn, butler, aiClient, buildDefs, dex, benches, cmdReg, lfs)...); err != nil {
log.Fatal(err)
}
if err := cmdReg.AddRebuilds(commands.NewRebuildCmds(t.app, t.rb, modalFn, butler, aiClient, buildDefs, dex, benches, cmdReg)...); err != nil {
Expand Down
6 changes: 4 additions & 2 deletions tools/ctl/localfiles/butler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ type Butler interface {
}

type butler struct {
lfs *LocalFileStore
metaAssetstore assetlocator.MetaAssetStore
}

func NewButler(metadataBucket, logsBucket, debugBucket string, mux rebuild.RegistryMux) Butler {
func NewButler(lfs *LocalFileStore, metadataBucket, logsBucket, debugBucket string, mux rebuild.RegistryMux) Butler {
return &butler{
lfs: lfs,
metaAssetstore: assetlocator.MetaAssetStore{
MetadataBucket: metadataBucket,
LogsBucket: logsBucket,
Expand All @@ -33,7 +35,7 @@ func NewButler(metadataBucket, logsBucket, debugBucket string, mux rebuild.Regis
}

func (b *butler) Fetch(ctx context.Context, runID string, wasSmoketest bool, want rebuild.Asset) (path string, err error) {
localAssets, err := AssetStore(runID)
localAssets, err := b.lfs.AssetStore(runID)
if err != nil {
return "", err
}
Expand Down
38 changes: 25 additions & 13 deletions tools/ctl/localfiles/localfiles.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
// Copyright 2025 Google LLC
// SPDX-License-Identifier: Apache-2.0

package localfiles

import (
"os"
"path"
"path/filepath"

"github.com/go-git/go-billy/v5"
Expand All @@ -14,24 +10,39 @@ import (
"github.com/pkg/errors"
)

// LocalFileStore provides access to local file-based resources.
type LocalFileStore struct {
root billy.Filesystem
}

// NewLocalFileStore creates a new LocalFileStore rooted at the given path.
func NewLocalFileStore(rootPath string) (*LocalFileStore, error) {
if err := os.MkdirAll(rootPath, 0755); err != nil {
return nil, errors.Wrapf(err, "failed to create directory %s", rootPath)
}
return &LocalFileStore{root: osfs.New(rootPath)}, nil
}

const (
tempRoot = "/tmp/oss-rebuild/" // The directory where all the local files are stored
// Subdirectories
assets = "assets" // AssetStore used to cache logs, artifacts, and other assets manipulated by ctl.
rundex = "rundex" // The local metadata about runs and attempts, normally generated by local smoketets benchmarks.
buildDefs = "build-defs" // Working copy of build definitions.
)

func Rundex() billy.Filesystem {
return osfs.New(path.Join(tempRoot, rundex))
// Rundex returns a filesystem for the rundex subdirectory.
func (lfs *LocalFileStore) Rundex() (billy.Filesystem, error) {
return lfs.root.Chroot(rundex)
}

func AssetsPath() string {
return filepath.Join(tempRoot, assets)
// AssetsPath returns the path to the assets subdirectory.
func (lfs *LocalFileStore) AssetsPath() string {
return filepath.Join(lfs.root.Root(), assets)
}

func BuildDefs() (*rebuild.FilesystemAssetStore, error) {
dir := filepath.Join(tempRoot, assets)
// BuildDefs returns a filesystem asset store for build definitions.
func (lfs *LocalFileStore) BuildDefs() (*rebuild.FilesystemAssetStore, error) {
dir := filepath.Join(lfs.root.Root(), assets)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, errors.Wrapf(err, "failed to create directory %s", dir)
}
Expand All @@ -42,8 +53,9 @@ func BuildDefs() (*rebuild.FilesystemAssetStore, error) {
return rebuild.NewFilesystemAssetStore(assetsFS), nil
}

func AssetStore(runID string) (*rebuild.FilesystemAssetStore, error) {
dir := filepath.Join(tempRoot, assets, runID)
// AssetStore returns a filesystem asset store for a specific run.
func (lfs *LocalFileStore) AssetStore(runID string) (*rebuild.FilesystemAssetStore, error) {
dir := filepath.Join(lfs.root.Root(), assets, runID)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, errors.Wrapf(err, "failed to create directory %s", dir)
}
Expand Down
Loading