From df67c19a3084ad3c68c994958724be81c855a3fc Mon Sep 17 00:00:00 2001 From: Adam Shannon Date: Tue, 15 Jul 2025 15:39:50 -0500 Subject: [PATCH] search: set WAL mode on sqlite with configurable busy_timeout --- configs/config.default.yml | 5 ++++ internal/search/model_config.go | 12 +++++++- internal/search/service.go | 49 ++++++++++++++++++++++----------- internal/search/service_test.go | 4 ++- 4 files changed, 52 insertions(+), 18 deletions(-) diff --git a/configs/config.default.yml b/configs/config.default.yml index b447f88..3cb022c 100644 --- a/configs/config.default.yml +++ b/configs/config.default.yml @@ -9,4 +9,9 @@ RailMsgSQL: Telemetry: ServiceName: "rail-msg-sql" Search: + Sqlite: + Directory: "." + BusyTimeout: "10s" + MaxOpenConns: 10 + MaxIdleConns: 5 BackgroundPrepare: true diff --git a/internal/search/model_config.go b/internal/search/model_config.go index 2632413..57bf5f7 100644 --- a/internal/search/model_config.go +++ b/internal/search/model_config.go @@ -1,13 +1,23 @@ package search import ( + "time" + "github.com/moov-io/ach/cmd/achcli/describe/mask" ) type Config struct { - SqliteFilepath string + Sqlite *SqliteConfig BackgroundPrepare bool AchMasking mask.Options } + +type SqliteConfig struct { + Directory string + BusyTimeout time.Duration + + MaxOpenConns int + MaxIdleConns int +} diff --git a/internal/search/service.go b/internal/search/service.go index 2d69299..d051e62 100644 --- a/internal/search/service.go +++ b/internal/search/service.go @@ -5,8 +5,8 @@ import ( "database/sql" "errors" "fmt" + "path/filepath" "strings" - "sync" "github.com/moov-io/ach" "github.com/moov-io/ach/cmd/achcli/describe/mask" @@ -35,7 +35,7 @@ type Service interface { func NewService(logger log.Logger, config Config, fileStorage *storage.Repository) (Service, error) { db, err := openSqliteDB(config) if err != nil { - return nil, fmt.Errorf("opening %s (as sqlite) failed: %w", config.SqliteFilepath, err) + return nil, fmt.Errorf("opening sqlite db failed: %w", err) } // Run migrations @@ -57,7 +57,36 @@ func NewService(logger log.Logger, config Config, fileStorage *storage.Repositor } func openSqliteDB(config Config) (*sql.DB, error) { - return sql.Open("sqlite3", fmt.Sprintf("file:%s?_foreign_keys=on", config.SqliteFilepath)) + if config.Sqlite == nil { + return nil, errors.New("no sqlite config") + } + + ctx, span := telemetry.StartSpan(context.Background(), "open-sqlite-db") + defer span.End() + + path := filepath.Join(config.Sqlite.Directory, "ach.db") + + db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_foreign_keys=on", path)) + if err != nil { + return nil, fmt.Errorf("opening %s as sqlite db: %w", path, err) + } + + // Set some options for concurrency + _, err = db.ExecContext(ctx, "PRAGMA journal_mode=WAL;") + if err != nil { + return nil, fmt.Errorf("setting WAL journal mode: %w", err) + } + + // Wait time before failing + _, err = db.ExecContext(ctx, fmt.Sprintf("PRAGMA busy_timeout = %d;", config.Sqlite.BusyTimeout.Milliseconds())) + if err != nil { + return nil, fmt.Errorf("setting busy_timeout: %w", err) + } + + db.SetMaxOpenConns(config.Sqlite.MaxOpenConns) + db.SetMaxIdleConns(config.Sqlite.MaxIdleConns) + + return db, nil } type service struct { @@ -66,14 +95,10 @@ type service struct { fileStorage *storage.Repository - mu sync.Mutex db *sql.DB } func (s *service) Close() error { - s.mu.Lock() - defer s.mu.Unlock() - if s != nil && s.db != nil { return s.db.Close() } @@ -196,7 +221,7 @@ func (s *service) insertFile(ctx context.Context, filename string, file *ach.Fil file.Control.TotalCreditEntryDollarAmountInFile, ) if err != nil { - return fmt.Errorf("failed to insert %s into ach_file: %w", filename, err) + return fmt.Errorf("failed to insert %s into ach_files: %w", filename, err) } return nil } @@ -482,11 +507,6 @@ func (s *service) IngestACHFile(ctx context.Context, filename string, file *ach. )) defer span.End() - s.mu.Lock() - defer s.mu.Unlock() - - span.AddEvent("done waiting") - // Mask the file before storage file = mask.File(file, s.config.AchMasking) @@ -568,9 +588,6 @@ func (s *service) IngestACHFile(ctx context.Context, filename string, file *ach. // Search executes a user-provided SQL query with parameters. func (s *service) Search(ctx context.Context, query string, params storage.FilterParams) (*Results, error) { - s.mu.Lock() - defer s.mu.Unlock() - if query == "" { return nil, fmt.Errorf("query cannot be empty") } diff --git a/internal/search/service_test.go b/internal/search/service_test.go index e805cf1..99526d7 100644 --- a/internal/search/service_test.go +++ b/internal/search/service_test.go @@ -37,7 +37,9 @@ func TestSearch_ACH(t *testing.T) { require.NoError(t, err) conf := search.Config{ - SqliteFilepath: filepath.Join(t.TempDir(), "ach.db"), + Sqlite: &search.SqliteConfig{ + Directory: t.TempDir(), + }, } svc, err := search.NewService(logger, conf, fileStorage)