Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ services:
postgresql:
condition: service_healthy # Wait for postgres to be healthy
temporal:
condition: service_started # Or service_healthy if temporal has a healthcheck
condition: service_healthy # Or service_healthy if temporal has a healthcheck
restart: unless-stopped
healthcheck: # Updated healthcheck for olake-ui
test: ["CMD-SHELL", "nc -z localhost 8000"] # Check if port 8000 is listening
Expand Down Expand Up @@ -101,7 +101,7 @@ services:
PERSISTENT_DIR: *hostPersistencePath
depends_on:
temporal:
condition: service_started # Or service_healthy if temporal has a healthcheck
condition: service_healthy # Or service_healthy if temporal has a healthcheck
olake-ui:
condition: service_healthy
restart: unless-stopped
Expand Down Expand Up @@ -142,9 +142,16 @@ services:
- ES_VERSION=v7
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CLI_ADDRESS=temporal:7233
- BIND_ON_IP=0.0.0.0
- TEMPORAL_BROADCAST_ADDRESS=127.0.0.1
networks:
- olake-network
restart: unless-stopped
healthcheck:
test: ['CMD', 'tctl', '--address', '127.0.0.1:7233', 'cluster', 'health']
interval: 1s
timeout: 5s
start_period: 2s

temporal-ui:
container_name: temporal-ui
Expand Down
19 changes: 13 additions & 6 deletions server/cmd/temporal-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,31 @@ func main() {
err := database.Init(postgresDB)
if err != nil {
logs.Critical("Failed to initialize database: %s", err)
os.Exit(1)
return
}

logs.Info("Starting Olake Temporal worker...")

// Create a new worker
worker, err := temporal.NewWorker()
// create temporal client
tClient, err := temporal.NewClient()
if err != nil {
logs.Critical("Failed to create worker: %v", err)
os.Exit(1)
logs.Critical("Failed to create Temporal client: %v", err)
return
}
defer tClient.Close()
// create temporal worker
worker, err := temporal.NewWorker(tClient)
if err != nil {
logs.Critical("Failed to create Temporal worker: %v", err)
return
}

// Start the worker in a goroutine
go func() {
err := worker.Start()
if err != nil {
logs.Critical("Failed to start worker: %v", err)
os.Exit(1)
return
}
}()

Expand Down
1 change: 1 addition & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.31.0
github.com/aws/aws-sdk-go-v2/service/ecr v1.49.0
github.com/aws/aws-sdk-go-v2/service/kms v1.41.1
github.com/cenkalti/backoff/v4 v4.3.0
github.com/lib/pq v1.10.9
github.com/oklog/ulid v1.3.1
github.com/spf13/viper v1.20.1
Expand Down
2 changes: 2 additions & 0 deletions server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
1 change: 1 addition & 0 deletions server/internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var (
TableNameMap = map[TableType]string{}
DefaultConfigDir = "/tmp/olake-config"
DefaultLogRetentionPeriod = 30
MaxRetries = 4
)

var RequiredConfigVariable = []string{"postgresdb", "copyrequestbody", "logsdir"}
Expand Down
19 changes: 13 additions & 6 deletions server/internal/handlers/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,19 @@ func (c *DestHandler) TestConnection() {
utils.ErrorResponse(&c.Controller, http.StatusInternalServerError, "Failed to encrypt destination config: "+err.Error())
return
}

result, err := c.tempClient.TestConnection(c.Ctx.Request.Context(), "destination", driver, version, encryptedConfig)
if result == nil {
result = map[string]interface{}{
"message": err.Error(),
"status": "failed",
var result map[string]interface{}
if c.tempClient != nil {
result, err = c.tempClient.TestConnection(c.Ctx.Request.Context(), "destination", driver, version, encryptedConfig)

if result == nil {
result = map[string]interface{}{
"message": err.Error(),
"status": "failed",
}
}
if err != nil {
utils.ErrorResponse(&c.Controller, http.StatusInternalServerError, fmt.Sprintf("Failed to test connection: %s", err))
return
}
}
utils.SuccessResponse(&c.Controller, result)
Expand Down
4 changes: 4 additions & 0 deletions server/internal/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,10 @@ func (c *JobHandler) GetJobTasks() {
// Construct a query for workflows related to this project and job
query := fmt.Sprintf("WorkflowId between 'sync-%s-%d' and 'sync-%s-%d-~'", projectIDStr, job.ID, projectIDStr, job.ID)
// List workflows using the direct query
if c.tempClient == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In destination.go and source.go we are using

if c.tempClient != nil { 

}

Here we are checking

if c.tempClient == nil {
		utils.ErrorResponse(&c.Controller, http.StatusInternalServerError, "Temporal client is not initialized")
		return
	}

Can we make it consistent

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid extra lines of code, at some places we not using response from function

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if c.tempClient == nil {  
        utils.ErrorResponse(&c.Controller, http.StatusInternalServerError, "Temporal client is not initialized")  
        return  
    }

I mean we are throwing this error only in job, and not in source and destination. Just to make it consistent.

utils.ErrorResponse(&c.Controller, http.StatusInternalServerError, "Temporal client is not initialized")
return
}
resp, err := c.tempClient.ListWorkflow(context.Background(), &workflowservice.ListWorkflowExecutionsRequest{
Query: query,
})
Expand Down
18 changes: 13 additions & 5 deletions server/internal/handlers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,21 @@ func (c *SourceHandler) TestConnection() {
utils.ErrorResponse(&c.Controller, http.StatusInternalServerError, "Failed to encrypt config")
return
}
result, err := c.tempClient.TestConnection(context.Background(), "config", req.Type, req.Version, encryptedConfig)
if result == nil {
result = map[string]interface{}{
"message": err.Error(),
"status": "failed",
var result map[string]interface{}
if c.tempClient != nil {
result, err = c.tempClient.TestConnection(context.Background(), "config", req.Type, req.Version, encryptedConfig)
if result == nil {
result = map[string]interface{}{
"message": err.Error(),
"status": "failed",
}
}
if err != nil {
utils.ErrorResponse(&c.Controller, http.StatusInternalServerError, fmt.Sprintf("Failed to test connection: %s", err))
return
}
}

utils.SuccessResponse(&c.Controller, result)
}

Expand Down
33 changes: 29 additions & 4 deletions server/internal/temporal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"time"

"github.com/beego/beego/v2/core/logs"
"github.com/beego/beego/v2/server/web"
"github.com/datazip/olake-frontend/server/internal/constants"
"github.com/datazip/olake-frontend/server/internal/docker"
"github.com/datazip/olake-frontend/server/utils"
"go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -56,13 +58,31 @@ type Client struct {

// NewClient creates a new Temporal client
func NewClient() (*Client, error) {
c, err := client.Dial(client.Options{
HostPort: TemporalAddress,
})
var c client.Client

err := utils.RetryWithBackoff(
func() error {
var err error
c, err = client.Dial(client.Options{
HostPort: TemporalAddress,
})
if err != nil {
return fmt.Errorf("failed to connect to Temporal: %w", err)
}
return nil
},
constants.MaxRetries, // max retries
1*time.Second, // initial delay
2*time.Minute, // max delay
nil, // use default notify
)

if err != nil {
return nil, fmt.Errorf("failed to create Temporal client: %v", err)
return nil, fmt.Errorf("❌ Temporal connection failed after %d retries: %s", constants.MaxRetries, err)
}

logs.Info("✅ Successfully connected to Temporal:", TemporalAddress)

return &Client{
temporalClient: c,
}, nil
Expand All @@ -75,6 +95,11 @@ func (c *Client) Close() {
}
}

// GetClient returns the Temporal client
func (c *Client) GetClient() client.Client {
return c.temporalClient
}

// GetCatalog runs a workflow to discover catalog data
func (c *Client) GetCatalog(ctx context.Context, sourceType, version, config, streamsConfig string) (map[string]interface{}, error) {
params := &ActivityParams{
Expand Down
25 changes: 7 additions & 18 deletions server/internal/temporal/worker.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,18 @@
package temporal

import (
"fmt"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

// Worker handles Temporal worker functionality
type Worker struct {
temporalClient client.Client
worker worker.Worker
client *Client
worker worker.Worker
}

// NewWorker creates a new Temporal worker
func NewWorker() (*Worker, error) {
c, err := client.Dial(client.Options{
HostPort: TemporalAddress,
})
if err != nil {
return nil, fmt.Errorf("failed to create Temporal client: %v", err)
}

// Create a worker
w := worker.New(c, TaskQueue, worker.Options{})
func NewWorker(c *Client) (*Worker, error) {
w := worker.New(c.GetClient(), TaskQueue, worker.Options{})

// Register workflows
w.RegisterWorkflow(DiscoverCatalogWorkflow)
Expand All @@ -36,8 +25,8 @@ func NewWorker() (*Worker, error) {
w.RegisterActivity(SyncActivity)

return &Worker{
temporalClient: c,
worker: w,
client: c,
worker: w,
}, nil
}

Expand All @@ -49,5 +38,5 @@ func (w *Worker) Start() error {
// Stop stops the worker
func (w *Worker) Stop() {
w.worker.Stop()
w.temporalClient.Close()
w.client.Close()
}
41 changes: 41 additions & 0 deletions server/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/beego/beego/v2/core/logs"
"github.com/beego/beego/v2/server/web"
"github.com/cenkalti/backoff/v4"
"github.com/oklog/ulid"
"github.com/robfig/cron"

Expand Down Expand Up @@ -321,3 +322,43 @@ func GetLogRetentionPeriod() int {
}
return constants.DefaultLogRetentionPeriod
}

// RetryWithBackoff retries the given operation with exponential backoff.
//
// Params:
// - operation: function to retry (return error to trigger retry)
// - maxRetries: maximum number of retries (0 = unlimited)
// - initialInterval: initial delay before retrying (e.g., 1s)
// - maxInterval: maximum delay between retries (e.g., 2m)
// - notify: optional callback called on every retry
//
// Returns:
// - error: last error if retries failed, or nil if successful.
func RetryWithBackoff(
operation func() error,
maxRetries int,
initialInterval, maxInterval time.Duration,
notify func(err error, next time.Duration),
) error {
// Configure exponential backoff
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = initialInterval
expBackoff.MaxInterval = maxInterval

// If maxRetries > 0, we cap the elapsed time
if maxRetries > 0 {
expBackoff.MaxElapsedTime = time.Duration(maxRetries) * maxInterval
} else {
expBackoff.MaxElapsedTime = 0 // unlimited
}

// Default notify if not provided
if notify == nil {
notify = func(err error, next time.Duration) {
logs.Info("Operation failed: %s. Retrying in %s...\n", err, next)
}
}

// Execute the operation with retry
return backoff.RetryNotify(operation, expBackoff, notify)
}
Loading