Skip to content

Commit 38c18f0

Browse files
Merge pull request #141 from gofr-dev/FE/migrations_clickhouse
Add migration support for clickhouse | Enhance example using migrations
2 parents 38a6949 + 8e9e41e commit 38c18f0

File tree

8 files changed

+578
-8
lines changed

8 files changed

+578
-8
lines changed

.github/workflows/go.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,17 @@ jobs:
224224
- 9041:9042
225225
options: --health-cmd "cqlsh --debug"
226226

227+
clickhouse:
228+
image: clickhouse/clickhouse-server
229+
ports:
230+
- "8123:8123"
231+
- "9000:9000"
232+
env:
233+
CLICKHOUSE_DB: "default"
234+
CLICKHOUSE_USER: "root"
235+
CLICKHOUSE_PASSWORD: "password"
236+
CLICKHOUSE_HTTP_PORT: "8123"
237+
227238
steps:
228239
- name: Checkout code into go module directory
229240
uses: actions/checkout@v4
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package dbmigration
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"strings"
8+
"time"
9+
10+
"gofr.dev/pkg/datastore"
11+
"gofr.dev/pkg/errors"
12+
"gofr.dev/pkg/log"
13+
)
14+
15+
type Clickhouse struct {
16+
datastore.ClickHouseDB
17+
existingMigration []gofrMigration
18+
}
19+
20+
func NewClickhouse(c datastore.ClickHouseDB) *Clickhouse {
21+
return &Clickhouse{c, make([]gofrMigration, 0)}
22+
}
23+
24+
// Run executes a migration
25+
func (c *Clickhouse) Run(m Migrator, app, name, methods string, logger log.Logger) error {
26+
if c.Conn == nil {
27+
return errors.DataStoreNotInitialized{DBName: datastore.ClickHouse}
28+
}
29+
30+
err := c.preRun(app, methods, name)
31+
if err != nil {
32+
return err
33+
}
34+
35+
ds := &datastore.DataStore{ClickHouse: c.ClickHouseDB}
36+
37+
if methods == UP {
38+
err = m.Up(ds, logger)
39+
} else {
40+
err = m.Down(ds, logger)
41+
}
42+
43+
if err != nil {
44+
return &errors.Response{Reason: "error encountered in running the migration", Detail: err}
45+
}
46+
47+
err = c.postRun(app, methods, name)
48+
if err != nil {
49+
return err
50+
}
51+
52+
return nil
53+
}
54+
55+
func (c *Clickhouse) preRun(app, method, name string) error {
56+
const migrationTableSchema = `CREATE TABLE IF NOT EXISTS gofr_migrations (
57+
app String,
58+
version Int64,
59+
start_time String DEFAULT now(),
60+
end_time Nullable(String),
61+
method String
62+
) ENGINE = MergeTree()
63+
ORDER BY (app, version, method);
64+
`
65+
66+
err := c.ClickHouseDB.Exec(context.Background(), migrationTableSchema)
67+
if err != nil {
68+
return &errors.Response{Reason: "unable to create table", Detail: err.Error()}
69+
}
70+
71+
ver, _ := strconv.Atoi(name)
72+
73+
c.existingMigration = append(c.existingMigration, gofrMigration{
74+
App: app,
75+
Version: int64(ver),
76+
StartTime: time.Now(),
77+
Method: method,
78+
})
79+
80+
return nil
81+
}
82+
83+
func (c *Clickhouse) postRun(app, method, name string) error {
84+
ver, _ := strconv.Atoi(name)
85+
86+
for i, v := range c.existingMigration {
87+
if v.App == app && v.Method == method && v.Version == int64(ver) {
88+
c.existingMigration[i].EndTime = time.Now()
89+
}
90+
}
91+
92+
return nil
93+
}
94+
95+
// LastRunVersion retrieves the last run migration version
96+
func (c *Clickhouse) LastRunVersion(app, method string) (lv int) {
97+
if c.Conn == nil {
98+
return -1
99+
}
100+
101+
query := fmt.Sprintf(`
102+
SELECT MAX(version) AS version
103+
FROM gofr_migrations
104+
WHERE app = '%s' AND method = '%s'
105+
`, app, method)
106+
107+
row := c.Conn.QueryRow(context.Background(), query)
108+
109+
var version int64
110+
_ = row.Scan(&version)
111+
112+
return int(version)
113+
}
114+
115+
// GetAllMigrations retrieves all migrations
116+
func (c *Clickhouse) GetAllMigrations(app string) (upMigration, downMigration []int) {
117+
if c.Conn == nil {
118+
return []int{-1}, nil
119+
}
120+
121+
rows, err := c.Conn.Query(context.Background(), fmt.Sprintf(`
122+
SELECT version, method
123+
FROM gofr_migrations
124+
WHERE app = '%s'
125+
`, app))
126+
127+
if err != nil {
128+
return nil, nil
129+
}
130+
131+
defer rows.Close()
132+
133+
for rows.Next() {
134+
var (
135+
version int64
136+
method string
137+
)
138+
139+
if err := rows.Scan(&version, &method); err != nil {
140+
return nil, nil
141+
}
142+
143+
if method == UP {
144+
upMigration = append(upMigration, int(version))
145+
} else {
146+
downMigration = append(downMigration, int(version))
147+
}
148+
}
149+
150+
if err := rows.Err(); err != nil {
151+
return nil, nil
152+
}
153+
154+
return upMigration, downMigration
155+
}
156+
157+
// FinishMigration completes the migration
158+
func (c *Clickhouse) FinishMigration() error {
159+
if c.Conn == nil {
160+
return errors.DataStoreNotInitialized{DBName: datastore.ClickHouse}
161+
}
162+
163+
if len(c.existingMigration) != 0 {
164+
query := "INSERT INTO gofr_migrations (app, version, start_time, end_time, method) VALUES"
165+
166+
for _, v := range c.existingMigration {
167+
query += fmt.Sprintf(" ('%s', %d, '%s', '%s', '%s'),", v.App, v.Version, v.StartTime, v.EndTime, v.Method)
168+
}
169+
170+
query = strings.TrimSuffix(query, ",")
171+
172+
err := c.Conn.Exec(context.Background(), query)
173+
if err != nil {
174+
return err
175+
}
176+
}
177+
178+
return nil
179+
}

0 commit comments

Comments
 (0)