From 221d51b9fd2713024062aabc0d128fc6533bf641 Mon Sep 17 00:00:00 2001 From: Jakub Nowakowski Date: Tue, 16 May 2023 13:56:29 +0200 Subject: [PATCH 1/6] Handle error string returned in the JSON-RPC response Here we provide a workaround for servers that don't follow the JSON-RPC 2.0 specification for error objects. According to the specification, an error should be an object containing `code`, `message`, and `data` properties (see: https://www.jsonrpc.org/specification#error_object). Unfortunately, Electrs returns an error as a string (see: https://github.com/Blockstream/esplora/issues/453). We define an error unmarshaling function to handle both types of errors. --- electrum/network.go | 45 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 40 insertions(+), 5 deletions(-) diff --git a/electrum/network.go b/electrum/network.go index 41825f9..25eb7aa 100644 --- a/electrum/network.go +++ b/electrum/network.go @@ -112,6 +112,8 @@ func NewClientSSL(ctx context.Context, addr string, config *tls.Config) (*Client return c, nil } +// JSON-RPC 2.0 Error Object +// See: https://www.jsonrpc.org/specificationJSON#error_object type apiErr struct { Code int `json:"code"` Message string `json:"message"` @@ -121,10 +123,43 @@ func (e *apiErr) Error() string { return fmt.Sprintf("errNo: %d, errMsg: %s", e.Code, e.Message) } +// UnmarshalJSON defines a workaround for servers that respond with error +// that doesn't follow the JSON-RPC 2.0 Error Object format, i.e. electrs/esplora. +// See: https://github.com/Blockstream/esplora/issues/453 +func (e *apiErr) UnmarshalJSON(data []byte) error { + var v interface{} + if err := json.Unmarshal(data, &v); err != nil { + return fmt.Errorf("failed to unmarshal error [%s]: %v", data, err) + } + + switch v := v.(type) { + case string: + e.Message = v + case map[string]interface{}: + if _, ok := v["code"]; ok { + e.Code = int(v["code"].(float64)) + } + + if _, ok := v["message"]; ok { + e.Message = fmt.Sprint(v["message"]) + } + + // if _, ok := v["data"]; ok { + // e.Data = fmt.Sprint(v["data"]) + // } + default: + return fmt.Errorf("unsupported type: %v", v) + } + + return nil +} + +// JSON-RPC 2.0 Response Object +// See: https://www.jsonrpc.org/specification#response_object type response struct { - ID uint64 `json:"id"` - Method string `json:"method"` - Error string `json:"error"` + ID uint64 `json:"id"` + Method string `json:"method"` + Error *apiErr `json:"error"` } func (s *Client) listen() { @@ -153,8 +188,8 @@ func (s *Client) listen() { log.Printf("Unmarshal received message failed: %v", err) } result.err = fmt.Errorf("Unmarshal received message failed: %v", err) - } else if msg.Error != "" { - result.err = errors.New(msg.Error) + } else if msg.Error != nil { + result.err = msg.Error } if len(msg.Method) > 0 { From 447e9df5318d3238386348422ce915b3b6e36971 Mon Sep 17 00:00:00 2001 From: Jakub Nowakowski Date: Tue, 16 May 2023 15:00:36 +0200 Subject: [PATCH 2/6] Add more context in unmarshall failure debug message It would be usefull to include value that failed unmarshalling in the debug mode log message. --- electrum/network.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/electrum/network.go b/electrum/network.go index 25eb7aa..64d7c27 100644 --- a/electrum/network.go +++ b/electrum/network.go @@ -185,7 +185,7 @@ func (s *Client) listen() { err := json.Unmarshal(bytes, msg) if err != nil { if DebugMode { - log.Printf("Unmarshal received message failed: %v", err) + log.Printf("unmarshal received message [%s] failed: [%v]", bytes, err) } result.err = fmt.Errorf("Unmarshal received message failed: %v", err) } else if msg.Error != nil { From 94663e52d0628860155d358b151a96a912241dc5 Mon Sep 17 00:00:00 2001 From: Jakub Nowakowski Date: Tue, 16 May 2023 16:40:50 +0200 Subject: [PATCH 3/6] Remove commented code --- electrum/network.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/electrum/network.go b/electrum/network.go index 64d7c27..69aee93 100644 --- a/electrum/network.go +++ b/electrum/network.go @@ -143,10 +143,6 @@ func (e *apiErr) UnmarshalJSON(data []byte) error { if _, ok := v["message"]; ok { e.Message = fmt.Sprint(v["message"]) } - - // if _, ok := v["data"]; ok { - // e.Data = fmt.Sprint(v["data"]) - // } default: return fmt.Errorf("unsupported type: %v", v) } From 17ac6f247424ab2a412f8113e22887d1b5f92845 Mon Sep 17 00:00:00 2001 From: Jakub Nowakowski Date: Fri, 19 May 2023 16:40:07 +0200 Subject: [PATCH 4/6] Support WebSocket protocol Here we add support for WebSocket protocol. With this change the client will support all connection types defined by the electrum protocol: `tcp`, `ssl`, `ws` and `wss`. The WebSocket client initialization expects remote server details to be provided as URL including a scheme and host, e.g. `tcp://electrum.io:50001`. --- electrum/network.go | 22 +++++++++ electrum/transport_ws.go | 104 +++++++++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 + 4 files changed, 129 insertions(+) create mode 100644 electrum/transport_ws.go diff --git a/electrum/network.go b/electrum/network.go index 69aee93..2327cee 100644 --- a/electrum/network.go +++ b/electrum/network.go @@ -112,6 +112,28 @@ func NewClientSSL(ctx context.Context, addr string, config *tls.Config) (*Client return c, nil } +// NewClientWebSocket initialize a new client for remote server and connects to +// the remote server using WebSocket. +func NewClientWebSocket(ctx context.Context, url string, config *tls.Config) (*Client, error) { + transport, err := NewWebSocketTransport(ctx, url, config) + if err != nil { + return nil, err + } + + c := &Client{ + handlers: make(map[uint64]chan *container), + pushHandlers: make(map[string][]chan *container), + + Error: make(chan error), + quit: make(chan struct{}), + } + + c.transport = transport + go c.listen() + + return c, nil +} + // JSON-RPC 2.0 Error Object // See: https://www.jsonrpc.org/specificationJSON#error_object type apiErr struct { diff --git a/electrum/transport_ws.go b/electrum/transport_ws.go new file mode 100644 index 0000000..154b1e9 --- /dev/null +++ b/electrum/transport_ws.go @@ -0,0 +1,104 @@ +package electrum + +import ( + "context" + "crypto/tls" + "log" + "time" + + "github.com/gorilla/websocket" +) + +type WebSocketTransport struct { + conn *websocket.Conn + responses chan []byte + errors chan error +} + +// NewWebSocketTransport initializes new WebSocket transport. +func NewWebSocketTransport( + ctx context.Context, + url string, + tlsConfig *tls.Config, +) (*WebSocketTransport, error) { + dialer := websocket.Dialer{ + TLSClientConfig: tlsConfig, + } + + conn, response, err := dialer.DialContext(ctx, url, nil) + if err != nil { + if DebugMode { + log.Printf( + "%s [debug] connect -> status: %v, error: %v", + time.Now().Format("2006-01-02 15:04:05"), + response.Status, + err, + ) + } + return nil, err + } + + ws := &WebSocketTransport{ + conn: conn, + responses: make(chan []byte), + errors: make(chan error), + } + + go ws.listen() + + return ws, nil +} + +func (t *WebSocketTransport) listen() { + defer t.conn.Close() + + for { + _, msg, err := t.conn.ReadMessage() + if DebugMode { + log.Printf( + "%s [debug] %s -> msg: %s, err: %v", + time.Now().Format("2006-01-02 15:04:05"), + t.conn.RemoteAddr(), + msg, + err, + ) + } + if err != nil { + t.errors <- err + break + } + + t.responses <- msg + } +} + +// SendMessage sends a message to the remote server through the WebSocket transport. +func (t *WebSocketTransport) SendMessage(body []byte) error { + if DebugMode { + log.Printf("%s [debug] %s <- %s", time.Now().Format("2006-01-02 15:04:05"), t.conn.RemoteAddr(), body) + } + + return t.conn.WriteMessage(websocket.TextMessage, body) +} + +// Responses returns chan to WebSocket transport responses. +func (t *WebSocketTransport) Responses() <-chan []byte { + return t.responses +} + +// Errors returns chan to WebSocket transport errors. +func (t *WebSocketTransport) Errors() <-chan error { + return t.errors +} + +// Close closes WebSocket transport. +func (t *WebSocketTransport) Close() error { + // Cleanly close the connection by sending a close message and then + // waiting (with timeout) for the server to close the connection. + err := t.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + log.Printf("%s [error] %s -> close error: %s", time.Now().Format("2006-01-02 15:04:05"), t.conn.RemoteAddr(), err) + } + + return t.conn.Close() +} diff --git a/go.mod b/go.mod index 64f832f..3c67103 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.18 require ( github.com/btcsuite/btcd v0.23.1 github.com/btcsuite/btcd/btcutil v1.1.1 + github.com/gorilla/websocket v1.5.0 github.com/stretchr/testify v1.7.0 ) diff --git a/go.sum b/go.sum index 366c20d..6f95641 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= From 5d10dfb01e1895fcd787935f52d820bce72c3722 Mon Sep 17 00:00:00 2001 From: Jakub Nowakowski Date: Fri, 19 May 2023 16:47:53 +0200 Subject: [PATCH 5/6] Expose generic client initialization function The `NewClient` function should be used to initialize the client. The function resolves transport protocol from the URL. It supports all four protocols: `tcp`, `ssl`, `ws` and `wss`. --- electrum/network.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/electrum/network.go b/electrum/network.go index 2327cee..77fc7e0 100644 --- a/electrum/network.go +++ b/electrum/network.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "log" + "net/url" "sync" "sync/atomic" ) @@ -70,6 +71,28 @@ type Client struct { nextID uint64 } +// NewClient initializes a new client for remote server and connects to it using +// a transport protocol resolved from the URL's protocol scheme. +// A remote server URL should be provided in the `scheme://hostname:port` format +// (e.g. `tcp://electrum.io:50001`). +func NewClient(ctx context.Context, urlStr string, tlsConfig *tls.Config) (*Client, error) { + u, err := url.Parse(urlStr) + if err != nil { + return nil, fmt.Errorf("failed to parse url [%s]: [%w]", urlStr, err) + } + + switch u.Scheme { + case "tcp": + return NewClientTCP(ctx, u.Host) + case "ssl": + return NewClientSSL(ctx, u.Host, tlsConfig) + case "ws", "wss": + return NewClientWebSocket(ctx, u.String(), tlsConfig) + } + + return nil, fmt.Errorf("unsupported protocol scheme: [%s]", u.Scheme) +} + // NewClientTCP initialize a new client for remote server and connects to the remote server using TCP func NewClientTCP(ctx context.Context, addr string) (*Client, error) { transport, err := NewTCPTransport(ctx, addr) From d5f2ae327a241e525c5e300a85ab364c8324ce67 Mon Sep 17 00:00:00 2001 From: Jakub Nowakowski Date: Tue, 23 May 2023 15:39:32 +0200 Subject: [PATCH 6/6] Wait for close message response After sending a close message to the server we need to wait for the response or force the conncetion closure after a timeout. --- electrum/transport_ws.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/electrum/transport_ws.go b/electrum/transport_ws.go index 154b1e9..ef1e128 100644 --- a/electrum/transport_ws.go +++ b/electrum/transport_ws.go @@ -13,8 +13,12 @@ type WebSocketTransport struct { conn *websocket.Conn responses chan []byte errors chan error + // close is a channel used for graceful connection closure + close chan struct{} } +const webSocketClosingTimeout = 2 * time.Second + // NewWebSocketTransport initializes new WebSocket transport. func NewWebSocketTransport( ctx context.Context, @@ -42,6 +46,7 @@ func NewWebSocketTransport( conn: conn, responses: make(chan []byte), errors: make(chan error), + close: make(chan struct{}), } go ws.listen() @@ -51,6 +56,7 @@ func NewWebSocketTransport( func (t *WebSocketTransport) listen() { defer t.conn.Close() + defer close(t.close) for { _, msg, err := t.conn.ReadMessage() @@ -64,7 +70,11 @@ func (t *WebSocketTransport) listen() { ) } if err != nil { - t.errors <- err + isNormalClose := websocket.IsCloseError(err, websocket.CloseNormalClosure) + if !isNormalClose { + t.errors <- err + } + break } @@ -100,5 +110,11 @@ func (t *WebSocketTransport) Close() error { log.Printf("%s [error] %s -> close error: %s", time.Now().Format("2006-01-02 15:04:05"), t.conn.RemoteAddr(), err) } - return t.conn.Close() + select { + case <-t.close: + case <-time.After(webSocketClosingTimeout): + return t.conn.Close() + } + + return nil }