Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import (
"sync"
"time"

aa_log "github.com/aaronland/go-log/v2"
"github.com/aaronland/go-http-server"
server "github.com/aaronland/go-http-server"
Comment thread
thisisaaronland marked this conversation as resolved.
aa_log "github.com/aaronland/go-log/v2"
"github.com/whosonfirst/go-webhookd/v3"
"github.com/whosonfirst/go-webhookd/v3/config"
"github.com/whosonfirst/go-webhookd/v3/dispatcher"
"github.com/whosonfirst/go-webhookd/v3/receiver"
"github.com/whosonfirst/go-webhookd/v3/transformation"
"github.com/whosonfirst/go-webhookd/v3/webhook"
"github.com/whosonfirst/go-webhookd/v3/webhook"
)

// type WebhookDaemon is a struct that implements a long-running daemon to listen for and process webhooks.
// type WebhookDaemon is a struct that implements a long-running daemon to listen for and process webhooks.
type WebhookDaemon struct {
// server is a `aaronland/go-http-server.Server` instance that handles HTTP requests and responses.
server server.Server
Expand Down Expand Up @@ -198,7 +198,7 @@ func (d *WebhookDaemon) AddWebhook(ctx context.Context, wh webhook.Webhook) erro
_, ok := d.webhooks[endpoint]

if ok {
return fmt.Errorf("endpoint already configured")
return fmt.Errorf("Endpoint already configured")
}

d.webhooks[endpoint] = wh
Expand Down Expand Up @@ -357,8 +357,8 @@ func (d *WebhookDaemon) HandlerFuncWithLogger(logger *log.Logger) (http.HandlerF
aa_log.Debug(logger, "Time to receive: %v", ttr)
aa_log.Debug(logger, "Time to transform: %v", ttt)
aa_log.Debug(logger, "Time to dispatch: %v", ttd)
aa_log.Debug(logger, "Time to process: %v", t2)
aa_log.Debug(logger, "Time to process: %v", t2)

rsp.Header().Set("X-Webhookd-Time-To-Receive", fmt.Sprintf("%v", ttr))
rsp.Header().Set("X-Webhookd-Time-To-Transform", fmt.Sprintf("%v", ttt))
rsp.Header().Set("X-Webhookd-Time-To-Dispatch", fmt.Sprintf("%v", ttd))
Expand All @@ -375,8 +375,6 @@ func (d *WebhookDaemon) HandlerFuncWithLogger(logger *log.Logger) (http.HandlerF
rsp.Write(body)
}
}

return
}

return http.HandlerFunc(handler), nil
Expand All @@ -402,7 +400,7 @@ func (d *WebhookDaemon) StartWithLogger(ctx context.Context, logger *log.Logger)

svr := d.server

aa_log.Info(logger, "webhookd listening for requests on %s\n", svr.Address())
aa_log.Info(logger, "Webhookd listening for requests on %s\n", svr.Address())

err = svr.ListenAndServe(ctx, mux)

Expand Down
5 changes: 3 additions & 2 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ package dispatcher
import (
"context"
"fmt"
"github.com/aaronland/go-roster"
"github.com/whosonfirst/go-webhookd/v3"
"net/url"
"sort"

"github.com/aaronland/go-roster"
"github.com/whosonfirst/go-webhookd/v3"
)

// dispatcher is a `aaronland/go-roster.Roster` instance used to maintain a list of registered `webhookd.WebhookDispatcher` initialization functions.
Expand Down
146 changes: 146 additions & 0 deletions dispatcher/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package dispatcher

import (
"bytes"
"context"
"fmt"
"io"
"log"
"net/http"
"net/url"

"github.com/whosonfirst/go-webhookd/v3"
)

func init() {

ctx := context.Background()
err := RegisterDispatcher(ctx, "http", NewHTTPDispatcher)
Comment thread
thisisaaronland marked this conversation as resolved.
if err != nil {
panic(err)
}

err = RegisterDispatcher(ctx, "https", NewHTTPDispatcher)
if err != nil {
panic(err)
}
}

// GET and POST are the only supported HTTP methods
const GET = "GET"
const POST = "POST"

// HTTPDispatcher implements the `webhookd.WebhookDispatcher` interface for dispatching messages to a `log.Logger` instance `http.get` or `http.post`.
type HTTPDispatcher struct {
webhookd.WebhookDispatcher
// logger is the `log.Logger` instance associated with the dispatcher.
logger *log.Logger
// url to send the message to
url url.URL
// method to use when sending the message
method string
// client to use when sending the message
client HTTPClient
}

// HTTPClient is an interface for `http.Client` to allow for mocking in tests.
type HTTPClient interface {
Comment thread
thisisaaronland marked this conversation as resolved.
Get(url string) (*http.Response, error)
Post(url string, contentType string, body io.Reader) (*http.Response, error)
}

// HTTPDispatcherOptions is a struct containing the options for `NewHTTPDispatcherWithOptions`.
type HTTPDispatcherOptions struct {
// Logger is the `log.Logger` instance associated with the dispatcher.
Logger *log.Logger
// URL to send the message to
URL url.URL
// Client to use when sending the message
Client HTTPClient
}

// NewHTTPDispatcher returns a new `HTTPDispatcher` instance configured by 'uri' in the form of:
//
// http://
// https://
//
// Messages are dispatched to the default `log.Default()` instance along with the uri parsed.
func NewHTTPDispatcher(ctx context.Context, uri string) (webhookd.WebhookDispatcher, error) {
logger := log.Default()

parsed, err := url.Parse(uri)

if err != nil {
return nil, fmt.Errorf("Failed to parse URI, %w", err)
}

opts := HTTPDispatcherOptions{
Logger: logger,
URL: *parsed,
Client: &http.Client{},
}

return NewHTTPDispatcherWithOptions(ctx, &opts)
}

// NewHTTPDispatcher returns a new `HTTPDispatcher` instance that dispatches messages to `http.Get` or `http.Post`.
func NewHTTPDispatcherWithOptions(ctx context.Context, opts *HTTPDispatcherOptions) (webhookd.WebhookDispatcher, error) {

opts.Logger.Print("Parsed dispatcher URL: ", opts.URL.String())

// check the method and default to POST
method := opts.URL.Query().Get("method")
if method != GET {
method = POST
}

d := HTTPDispatcher{
logger: opts.Logger,
url: opts.URL,
method: method,
client: opts.Client,
}

return &d, nil
}

// Dispatch sends 'body' to the `log.Logger` and `http.Get`/`http.Post` that 'd' has been instantiated with.
func (d *HTTPDispatcher) Dispatch(ctx context.Context, body []byte) *webhookd.WebhookError {
var resp *http.Response
var err error

if d.method == GET {
d.logger.Println("Dispatching GET:", d.url.String(), "not forwarding body: ", string(body))
resp, err = d.client.Get(d.url.String())
} else {
d.logger.Println("Dispatching POST:", d.url.String(), "forwarding body: ", string(body))
resp, err = d.client.Post(d.url.String(), "application/json", bytes.NewBuffer(body))
}

// if we get a nil response the destination is unreachable
if resp == nil {
code := http.StatusRequestTimeout
message := "Timeout likely destination unreachable"
whErr := &webhookd.WebhookError{Code: code, Message: message}
return whErr
}

defer resp.Body.Close()

// if we get any other status code than 200
if resp.StatusCode != http.StatusOK {
code := resp.StatusCode
message := fmt.Sprintf("Failed to dispatch message: %s", resp.Status)
whErr := &webhookd.WebhookError{Code: code, Message: message}
return whErr
}

if err != nil {
code := http.StatusInternalServerError
message := err.Error()
whErr := &webhookd.WebhookError{Code: code, Message: message}
return whErr
}

return nil
}
71 changes: 71 additions & 0 deletions dispatcher/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package dispatcher

import (
"bytes"
"context"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
"testing"
)

type MockHTTPClient struct {
Resp *http.Response
Error error
}

func (m *MockHTTPClient) Get(url string) (*http.Response, error) {
return m.Resp, m.Error
}

func (m *MockHTTPClient) Post(url string, contentType string, body io.Reader) (*http.Response, error) {
return m.Resp, m.Error
}

func TestNewHTTPDispatcherWithOptions(t *testing.T) {

ctx := context.Background()

var buf bytes.Buffer

logger := log.New(&buf, "testing ", log.Lshortfile)

parsed, err := url.Parse("http://testing?method=GET")
if err != nil {
t.Fatalf("Failed to parse url, %v", err)
}

// Create a mock response
mockResponse := &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(strings.NewReader("Mock response body")),
}

// Create a mock HTTP client with the desired behavior
mockClient := &MockHTTPClient{
Resp: mockResponse,
Error: nil,
}

d, err := NewHTTPDispatcherWithOptions(ctx, &HTTPDispatcherOptions{logger, *parsed, mockClient})

if err != nil {
t.Fatalf("Failed to create new http dispatcher with logger, %v", err)
}

err2 := d.Dispatch(ctx, []byte("hello world"))

if err2 != nil {
t.Fatalf("Failed to dispatch message, %v", err2)
}

expected := "testing http.go:89: Parsed dispatcher URL: http://testing?method=GET\ntesting http.go:113: Dispatching GET: http://testing?method=GET not forwarding body: hello world"
output := strings.TrimSpace(buf.String())

if output != expected {
t.Fatalf("Unexpected output from custom writer: '%s'", output)
}
}
3 changes: 2 additions & 1 deletion dispatcher/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package dispatcher

import (
"context"
"github.com/whosonfirst/go-webhookd/v3"
"log"

"github.com/whosonfirst/go-webhookd/v3"
)

func init() {
Expand Down
1 change: 1 addition & 0 deletions staticcheck.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
checks = ["all", "-ST1005", "-ST1003", "-ST1020","-ST1021"]