Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 82 additions & 6 deletions electrum/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"log"
"net/url"
"sync"
"sync/atomic"
)
Expand Down Expand Up @@ -70,6 +71,28 @@ type Client struct {
nextID uint64
}

// NewClient initializes a new client for remote server and connects to it using
// a transport protocol resolved from the URL's protocol scheme.
// A remote server URL should be provided in the `scheme://hostname:port` format
// (e.g. `tcp://electrum.io:50001`).
func NewClient(ctx context.Context, urlStr string, tlsConfig *tls.Config) (*Client, error) {
u, err := url.Parse(urlStr)
if err != nil {
return nil, fmt.Errorf("failed to parse url [%s]: [%w]", urlStr, err)
}

switch u.Scheme {
case "tcp":
return NewClientTCP(ctx, u.Host)
case "ssl":
return NewClientSSL(ctx, u.Host, tlsConfig)
case "ws", "wss":
return NewClientWebSocket(ctx, u.String(), tlsConfig)
}

return nil, fmt.Errorf("unsupported protocol scheme: [%s]", u.Scheme)
}

// NewClientTCP initialize a new client for remote server and connects to the remote server using TCP
func NewClientTCP(ctx context.Context, addr string) (*Client, error) {
transport, err := NewTCPTransport(ctx, addr)
Expand Down Expand Up @@ -112,6 +135,30 @@ func NewClientSSL(ctx context.Context, addr string, config *tls.Config) (*Client
return c, nil
}

// NewClientWebSocket initialize a new client for remote server and connects to
// the remote server using WebSocket.
func NewClientWebSocket(ctx context.Context, url string, config *tls.Config) (*Client, error) {
transport, err := NewWebSocketTransport(ctx, url, config)
if err != nil {
return nil, err
}

c := &Client{
handlers: make(map[uint64]chan *container),
pushHandlers: make(map[string][]chan *container),

Error: make(chan error),
quit: make(chan struct{}),
}

c.transport = transport
go c.listen()

return c, nil
}

// JSON-RPC 2.0 Error Object
// See: https://www.jsonrpc.org/specificationJSON#error_object
type apiErr struct {
Code int `json:"code"`
Message string `json:"message"`
Expand All @@ -121,10 +168,39 @@ func (e *apiErr) Error() string {
return fmt.Sprintf("errNo: %d, errMsg: %s", e.Code, e.Message)
}

// UnmarshalJSON defines a workaround for servers that respond with error
// that doesn't follow the JSON-RPC 2.0 Error Object format, i.e. electrs/esplora.
// See: https://github.com/Blockstream/esplora/issues/453
func (e *apiErr) UnmarshalJSON(data []byte) error {
var v interface{}
if err := json.Unmarshal(data, &v); err != nil {
return fmt.Errorf("failed to unmarshal error [%s]: %v", data, err)
}

switch v := v.(type) {
case string:
e.Message = v
case map[string]interface{}:
if _, ok := v["code"]; ok {
e.Code = int(v["code"].(float64))
}

if _, ok := v["message"]; ok {
e.Message = fmt.Sprint(v["message"])
}
default:
return fmt.Errorf("unsupported type: %v", v)
}

return nil
}

// JSON-RPC 2.0 Response Object
// See: https://www.jsonrpc.org/specification#response_object
type response struct {
ID uint64 `json:"id"`
Method string `json:"method"`
Error string `json:"error"`
ID uint64 `json:"id"`
Method string `json:"method"`
Error *apiErr `json:"error"`
}

func (s *Client) listen() {
Expand All @@ -150,11 +226,11 @@ func (s *Client) listen() {
err := json.Unmarshal(bytes, msg)
if err != nil {
if DebugMode {
log.Printf("Unmarshal received message failed: %v", err)
log.Printf("unmarshal received message [%s] failed: [%v]", bytes, err)
}
result.err = fmt.Errorf("Unmarshal received message failed: %v", err)
} else if msg.Error != "" {
result.err = errors.New(msg.Error)
} else if msg.Error != nil {
result.err = msg.Error
}

if len(msg.Method) > 0 {
Expand Down
120 changes: 120 additions & 0 deletions electrum/transport_ws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package electrum

import (
"context"
"crypto/tls"
"log"
"time"

"github.com/gorilla/websocket"
)

type WebSocketTransport struct {
conn *websocket.Conn
responses chan []byte
errors chan error
// close is a channel used for graceful connection closure
close chan struct{}
}

const webSocketClosingTimeout = 2 * time.Second

// NewWebSocketTransport initializes new WebSocket transport.
func NewWebSocketTransport(
ctx context.Context,
url string,
tlsConfig *tls.Config,
) (*WebSocketTransport, error) {
dialer := websocket.Dialer{
TLSClientConfig: tlsConfig,
}

conn, response, err := dialer.DialContext(ctx, url, nil)
if err != nil {
if DebugMode {
log.Printf(
"%s [debug] connect -> status: %v, error: %v",
time.Now().Format("2006-01-02 15:04:05"),
response.Status,
err,
)
}
return nil, err
}

ws := &WebSocketTransport{
conn: conn,
responses: make(chan []byte),
errors: make(chan error),
close: make(chan struct{}),
}

go ws.listen()

return ws, nil
}

func (t *WebSocketTransport) listen() {
defer t.conn.Close()
defer close(t.close)

for {
_, msg, err := t.conn.ReadMessage()
if DebugMode {
log.Printf(
"%s [debug] %s -> msg: %s, err: %v",
time.Now().Format("2006-01-02 15:04:05"),
t.conn.RemoteAddr(),
msg,
err,
)
}
if err != nil {
isNormalClose := websocket.IsCloseError(err, websocket.CloseNormalClosure)
if !isNormalClose {
t.errors <- err
}

break
}

t.responses <- msg
}
}

// SendMessage sends a message to the remote server through the WebSocket transport.
func (t *WebSocketTransport) SendMessage(body []byte) error {
if DebugMode {
log.Printf("%s [debug] %s <- %s", time.Now().Format("2006-01-02 15:04:05"), t.conn.RemoteAddr(), body)
}

return t.conn.WriteMessage(websocket.TextMessage, body)
}

// Responses returns chan to WebSocket transport responses.
func (t *WebSocketTransport) Responses() <-chan []byte {
return t.responses
}

// Errors returns chan to WebSocket transport errors.
func (t *WebSocketTransport) Errors() <-chan error {
return t.errors
}

// Close closes WebSocket transport.
func (t *WebSocketTransport) Close() error {
// Cleanly close the connection by sending a close message and then
// waiting (with timeout) for the server to close the connection.
err := t.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Printf("%s [error] %s -> close error: %s", time.Now().Format("2006-01-02 15:04:05"), t.conn.RemoteAddr(), err)
}

select {
case <-t.close:
case <-time.After(webSocketClosingTimeout):
return t.conn.Close()
}

return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.18
require (
github.com/btcsuite/btcd v0.23.1
github.com/btcsuite/btcd/btcutil v1.1.1
github.com/gorilla/websocket v1.5.0
github.com/stretchr/testify v1.7.0
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
Expand Down