Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions configs/config.default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,9 @@ RailMsgSQL:
Telemetry:
ServiceName: "rail-msg-sql"
Search:
Sqlite:
Directory: "."
BusyTimeout: "10s"
MaxOpenConns: 10
MaxIdleConns: 5
BackgroundPrepare: true
12 changes: 11 additions & 1 deletion internal/search/model_config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
package search

import (
"time"

"github.com/moov-io/ach/cmd/achcli/describe/mask"
)

type Config struct {
SqliteFilepath string
Sqlite *SqliteConfig

BackgroundPrepare bool

AchMasking mask.Options
}

type SqliteConfig struct {
Directory string
BusyTimeout time.Duration

MaxOpenConns int
MaxIdleConns int
}
49 changes: 33 additions & 16 deletions internal/search/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"database/sql"
"errors"
"fmt"
"path/filepath"
"strings"
"sync"

"github.com/moov-io/ach"
"github.com/moov-io/ach/cmd/achcli/describe/mask"
Expand Down Expand Up @@ -35,7 +35,7 @@ type Service interface {
func NewService(logger log.Logger, config Config, fileStorage *storage.Repository) (Service, error) {
db, err := openSqliteDB(config)
if err != nil {
return nil, fmt.Errorf("opening %s (as sqlite) failed: %w", config.SqliteFilepath, err)
return nil, fmt.Errorf("opening sqlite db failed: %w", err)
}

// Run migrations
Expand All @@ -57,7 +57,36 @@ func NewService(logger log.Logger, config Config, fileStorage *storage.Repositor
}

func openSqliteDB(config Config) (*sql.DB, error) {
return sql.Open("sqlite3", fmt.Sprintf("file:%s?_foreign_keys=on", config.SqliteFilepath))
if config.Sqlite == nil {
return nil, errors.New("no sqlite config")
}

ctx, span := telemetry.StartSpan(context.Background(), "open-sqlite-db")
defer span.End()

path := filepath.Join(config.Sqlite.Directory, "ach.db")

db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_foreign_keys=on", path))
if err != nil {
return nil, fmt.Errorf("opening %s as sqlite db: %w", path, err)
}

// Set some options for concurrency
_, err = db.ExecContext(ctx, "PRAGMA journal_mode=WAL;")
if err != nil {
return nil, fmt.Errorf("setting WAL journal mode: %w", err)
}

// Wait time before failing
_, err = db.ExecContext(ctx, fmt.Sprintf("PRAGMA busy_timeout = %d;", config.Sqlite.BusyTimeout.Milliseconds()))
if err != nil {
return nil, fmt.Errorf("setting busy_timeout: %w", err)
}

db.SetMaxOpenConns(config.Sqlite.MaxOpenConns)
db.SetMaxIdleConns(config.Sqlite.MaxIdleConns)

return db, nil
}

type service struct {
Expand All @@ -66,14 +95,10 @@ type service struct {

fileStorage *storage.Repository

mu sync.Mutex
db *sql.DB
}

func (s *service) Close() error {
s.mu.Lock()
defer s.mu.Unlock()

if s != nil && s.db != nil {
return s.db.Close()
}
Expand Down Expand Up @@ -196,7 +221,7 @@ func (s *service) insertFile(ctx context.Context, filename string, file *ach.Fil
file.Control.TotalCreditEntryDollarAmountInFile,
)
if err != nil {
return fmt.Errorf("failed to insert %s into ach_file: %w", filename, err)
return fmt.Errorf("failed to insert %s into ach_files: %w", filename, err)
}
return nil
}
Expand Down Expand Up @@ -482,11 +507,6 @@ func (s *service) IngestACHFile(ctx context.Context, filename string, file *ach.
))
defer span.End()

s.mu.Lock()
defer s.mu.Unlock()

span.AddEvent("done waiting")

// Mask the file before storage
file = mask.File(file, s.config.AchMasking)

Expand Down Expand Up @@ -568,9 +588,6 @@ func (s *service) IngestACHFile(ctx context.Context, filename string, file *ach.

// Search executes a user-provided SQL query with parameters.
func (s *service) Search(ctx context.Context, query string, params storage.FilterParams) (*Results, error) {
s.mu.Lock()
defer s.mu.Unlock()

if query == "" {
return nil, fmt.Errorf("query cannot be empty")
}
Expand Down
4 changes: 3 additions & 1 deletion internal/search/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func TestSearch_ACH(t *testing.T) {
require.NoError(t, err)

conf := search.Config{
SqliteFilepath: filepath.Join(t.TempDir(), "ach.db"),
Sqlite: &search.SqliteConfig{
Directory: t.TempDir(),
},
}

svc, err := search.NewService(logger, conf, fileStorage)
Expand Down
Loading