Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 62 additions & 29 deletions ws/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ import (

type CheckClientHandler func(id string, r *http.Request) bool

type DuplicateConnectionBehavior uint8

const (
DuplicateConnectionBehaviorKeepCurrent DuplicateConnectionBehavior = iota
DuplicateConnectionBehaviorKeepNew
)

// Server defines a websocket server, which passively listens for incoming connections on ws or wss protocol.
// The offered API are of asynchronous nature, and each incoming connection/message is handled using callbacks.
//
Expand Down Expand Up @@ -117,6 +124,13 @@ type Server interface {
//
// Changes to the http request at runtime may lead to undefined behavior.
SetCheckClientHandler(handler CheckClientHandler)
// SetDuplicateConnectionBehavior sets the behavior for how duplicate connections from the same charge point ID should be
// handled. The default is to not allow the new connection, as long as the charge point ID is already connected, but this can
// be overridden to instead close the current connection and allow the new connection.
//
// This has some important security considerations; it could allow malicious parties from forcefully disconnecting valid
// chargers from the server, especially if not running with aut
SetDuplicateConnectionBehavior(behavior DuplicateConnectionBehavior)
// Addr gives the address on which the server is listening, useful if, for
// example, the port is system-defined (set to 0).
Addr() *net.TCPAddr
Expand All @@ -130,22 +144,23 @@ type Server interface {
//
// Use the NewServer function to create a new server.
type server struct {
connections map[string]*webSocket
httpServer *http.Server
messageHandler func(ws Channel, data []byte) error
chargePointIdResolver func(*http.Request) (string, error)
checkClientHandler CheckClientHandler
newClientHandler func(ws Channel)
disconnectedHandler func(ws Channel)
basicAuthHandler func(username string, password string) bool
tlsCertificatePath string
tlsCertificateKey string
timeoutConfig ServerTimeoutConfig
upgrader websocket.Upgrader
errC chan error
connMutex sync.RWMutex
addr *net.TCPAddr
httpHandler *mux.Router
connections map[string]*webSocket
httpServer *http.Server
messageHandler func(ws Channel, data []byte) error
chargePointIdResolver func(*http.Request) (string, error)
checkClientHandler CheckClientHandler
duplicateConnectionBehavior DuplicateConnectionBehavior
newClientHandler func(ws Channel)
disconnectedHandler func(ws Channel)
basicAuthHandler func(username string, password string) bool
tlsCertificatePath string
tlsCertificateKey string
timeoutConfig ServerTimeoutConfig
upgrader websocket.Upgrader
errC chan error
connMutex sync.RWMutex
addr *net.TCPAddr
httpHandler *mux.Router
}

// ServerOpt is a function that can be used to set options on a server during creation.
Expand Down Expand Up @@ -183,10 +198,11 @@ func WithServerTLSConfig(certificatePath string, certificateKey string, tlsConfi
func NewServer(opts ...ServerOpt) Server {
router := mux.NewRouter()
s := &server{
httpServer: &http.Server{},
timeoutConfig: NewServerTimeoutConfig(),
upgrader: websocket.Upgrader{Subprotocols: []string{}},
httpHandler: router,
httpServer: &http.Server{},
timeoutConfig: NewServerTimeoutConfig(),
upgrader: websocket.Upgrader{Subprotocols: []string{}},
httpHandler: router,
duplicateConnectionBehavior: DuplicateConnectionBehaviorKeepCurrent,
chargePointIdResolver: func(r *http.Request) (string, error) {
url := r.URL
return path.Base(url.Path), nil
Expand All @@ -206,6 +222,10 @@ func (s *server) SetCheckClientHandler(handler CheckClientHandler) {
s.checkClientHandler = handler
}

func (s *server) SetDuplicateConnectionBehavior(behavior DuplicateConnectionBehavior) {
s.duplicateConnectionBehavior = behavior
}

func (s *server) SetNewClientHandler(handler ConnectedHandler) {
s.newClientHandler = handler
}
Expand Down Expand Up @@ -425,15 +445,28 @@ out:
}
// Check whether client exists
s.connMutex.Lock()
// There is already a connection with the same ID. Close the new one immediately with a PolicyViolation.
if _, exists := s.connections[id]; exists {
s.connMutex.Unlock()
s.error(fmt.Errorf("client %s already exists, closing duplicate client", id))
_ = conn.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "a connection with this ID already exists"),
time.Now().Add(s.timeoutConfig.WriteWait))
_ = conn.Close()
return
switch s.duplicateConnectionBehavior {
case DuplicateConnectionBehaviorKeepNew:
// There is already a connection with the same ID. Close the old one, and allow the new connection. This has security
// implications, see the note on the SetDuplicateConnectionBehavior func.
if currentConn, exists := s.connections[id]; exists {
s.error(fmt.Errorf("client %s already exists, closing existing client", id))
_ = currentConn.connection.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "a connection with this ID has reconnected"),
time.Now().Add(s.timeoutConfig.WriteWait))
_ = currentConn.connection.Close()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question; am I understanding the codeflow correctly here when I assume that the entry from s.connections for the chargepoint ID will be removed from the map as a result of me simply closing the connection here, so the new entry will be added correctly after passing this switch statement?

Or is that onClosed callback on the somehow not synchronous, risking that the entry from s.connections for that charge point ID is removed from the map after the new connection has been accepted?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid it's the latter:

  1. currentCon.connection.Close() tells the underlying websocket to close
  2. the cleanup closure is invoked asynchronously
  3. the callback eventually deletes the element from the map in server.go/handleDisconnect

So the new element will (in most cases) be deleted right after being added. To verify add these lines to the end of the unit test:

s.Equal(1, len(s.server.connections))
testWS, ok := s.server.connections["testws"]
s.True(ok, "expected testws connection to exist")
s.NotNil(testWS, "expected testws connection to be non-nil")

I guess we would need a dedicated method to force-close a websocket without triggering any callback

}
default:
// There is already a connection with the same ID. Close the new one immediately with a PolicyViolation.
if _, exists := s.connections[id]; exists {
s.connMutex.Unlock()
s.error(fmt.Errorf("client %s already exists, closing duplicate client", id))
_ = conn.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "a connection with this ID already exists"),
time.Now().Add(s.timeoutConfig.WriteWait))
_ = conn.Close()
return
}
}
// Create web socket for client, state is automatically set to connected
ws := newWebSocket(
Expand Down
54 changes: 50 additions & 4 deletions ws/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func (s *WebSocketSuite) TestServerStartErrors() {
s.NotNil(r)
}

func (s *WebSocketSuite) TestClientDuplicateConnection() {
func (s *WebSocketSuite) TestClientDuplicateConnectionWithKeepCurrentConnectionBehavior() {
s.server = newWebsocketServer(s.T(), nil)
s.server.SetNewClientHandler(func(ws Channel) {
})
Expand Down Expand Up @@ -531,9 +531,55 @@ func (s *WebSocketSuite) TestClientDuplicateConnection() {
})
err = wsClient2.Start(u.String())
s.NoError(err)
// Expect connection to be closed immediately
_, ok := <-disconnectC
s.True(ok)
// Expect new connection to be closed immediately
select {
case _, ok := <-disconnectC:
s.True(ok, "expected new client to have been disconnected")
case <-time.After(1 * time.Second):
s.Fail("timeout waiting for new client to disconnect")
}
}

func (s *WebSocketSuite) TestClientDuplicateConnectionWithKeepNewConnectionBehavior() {
s.server = newWebsocketServer(s.T(), nil)
s.server.SetNewClientHandler(func(ws Channel) {
})
s.server.SetDuplicateConnectionBehavior(DuplicateConnectionBehaviorKeepNew)
// Start server
go s.server.Start(serverPort, serverPath)
time.Sleep(100 * time.Millisecond)
// Connect client 1
disconnectC := make(chan struct{})
s.client = newWebsocketClient(s.T(), func(data []byte) ([]byte, error) {
return nil, nil
})
s.client.SetDisconnectedHandler(func(err error) {
s.IsType(&websocket.CloseError{}, err)
var wsErr *websocket.CloseError
ok := errors.As(err, &wsErr)
s.True(ok)
s.Equal(websocket.ClosePolicyViolation, wsErr.Code)
s.Equal("a connection with this ID has reconnected", wsErr.Text)
s.client.SetDisconnectedHandler(nil)
disconnectC <- struct{}{}
})
host := fmt.Sprintf("localhost:%v", serverPort)
u := url.URL{Scheme: "ws", Host: host, Path: testPath}
err := s.client.Start(u.String())
s.NoError(err)
// Connect client 2
wsClient2 := newWebsocketClient(s.T(), func(data []byte) ([]byte, error) {
return nil, nil
})
err = wsClient2.Start(u.String())
s.NoError(err)
// Expect current connection to be closed immediately
select {
case _, ok := <-disconnectC:
s.True(ok, "expected current client to have been disconnected")
case <-time.After(1 * time.Second):
s.Fail("timeout waiting for current client to disconnect")
}
}

func (s *WebSocketSuite) TestServerStopConnection() {
Expand Down
Loading