Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions rocketpool/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/rocket-pool/smartnode/rocketpool/node/collectors"
"github.com/rocket-pool/smartnode/shared/services"
"github.com/rocket-pool/smartnode/shared/services/alerting"
"github.com/rocket-pool/smartnode/shared/services/connectivity"
"github.com/rocket-pool/smartnode/shared/services/state"
"github.com/rocket-pool/smartnode/shared/services/wallet/keystore/lighthouse"
"github.com/rocket-pool/smartnode/shared/services/wallet/keystore/nimbus"
Expand Down Expand Up @@ -189,7 +190,8 @@ func run(c *cli.Context) error {
if err != nil {
return err
}
checkPorts, err := newCheckPortConnectivity(c, log.NewColorLogger(CheckPortConnectivityColor))
var checkPorts *connectivity.CheckPortConnectivity
checkPorts, err = connectivity.NewCheckPortConnectivity(c, cfg, log.NewColorLogger(CheckPortConnectivityColor))
if err != nil {
return err
}
Expand Down Expand Up @@ -326,7 +328,7 @@ func run(c *cli.Context) error {
time.Sleep(taskCooldown)

// Run the port connectivity check
if err := checkPorts.run(); err != nil {
if err := checkPorts.Run(); err != nil {
errorLog.Println(err)
}
time.Sleep(taskCooldown)
Expand Down
1 change: 1 addition & 0 deletions shared/services/config/alertmanager-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ func (cfg *AlertmanagerConfig) UpdateConfigurationFiles(configPath string) error
if err != nil {
return fmt.Errorf("error processing alerting rules template: %w", err)
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package node
package connectivity

import (
"context"
Expand All @@ -13,9 +13,9 @@ import (

"github.com/urfave/cli"

"github.com/rocket-pool/smartnode/shared/services"
"github.com/rocket-pool/smartnode/shared/services/alerting"
"github.com/rocket-pool/smartnode/shared/services/config"
cfg "github.com/rocket-pool/smartnode/shared/services/config"
cfgtypes "github.com/rocket-pool/smartnode/shared/types/config"
"github.com/rocket-pool/smartnode/shared/utils/log"
)

Expand All @@ -36,36 +36,45 @@ var publicIPResolvers = []struct {
}

// Check port connectivity task
type checkPortConnectivity struct {
type CheckPortConnectivity struct {
c *cli.Context
log log.ColorLogger
cfg *config.RocketPoolConfig
cfg *cfg.RocketPoolConfig

// Track previous state to avoid flooding with repeated alerts
wasEth1PortOpen bool
wasBeaconP2POpen bool

// Function pointers for network and alerting actions (to facilitate testing)
getPublicIP func() (string, error)
isPortReachableNATReflection func(string, uint16) bool
isPortReachableExternalService func(uint16) (bool, string, error)
alertEth1P2PPortNotOpen func(*cfg.RocketPoolConfig, uint16) error
alertBeaconP2PPortNotOpen func(*cfg.RocketPoolConfig, uint16) error
}

// Create check port connectivity task
func newCheckPortConnectivity(c *cli.Context, logger log.ColorLogger) (*checkPortConnectivity, error) {
cfg, err := services.GetConfig(c)
if err != nil {
return nil, err
}

return &checkPortConnectivity{
func NewCheckPortConnectivity(c *cli.Context, config *cfg.RocketPoolConfig, logger log.ColorLogger) (*CheckPortConnectivity, error) {
return &CheckPortConnectivity{
c: c,
log: logger,
cfg: cfg,
cfg: config,
// Assume ports are open at startup to avoid spurious alerts on first cycle
wasEth1PortOpen: true,
wasBeaconP2POpen: true,

// Default implementations
getPublicIP: getPublicIP,
isPortReachableNATReflection: isPortReachableNATReflection,
isPortReachableExternalService: isPortReachableExternalService,
alertEth1P2PPortNotOpen: alerting.AlertEth1P2PPortNotOpen,
alertBeaconP2PPortNotOpen: alerting.AlertBeaconP2PPortNotOpen,
}, nil
}

// Check whether the configured execution/consensus client P2P ports are
// reachable from the internet. Sends an alert the first time either port is detected as closed.
func (t *checkPortConnectivity) run() error {
func (t *CheckPortConnectivity) Run() error {
if t.cfg.Alertmanager.EnableAlerting.Value != true {
return nil
}
Expand All @@ -74,18 +83,27 @@ func (t *checkPortConnectivity) run() error {
}
t.log.Print("Checking port connectivity...")

// EC and CC are always both local or both external - mixed configurations are rejected during validation.
// We don't check external clients because the node operator is responsible for their
// connectivity, and they may be behind a different firewall or NAT entirely.
isLocalMode := t.cfg.ExecutionClientMode.Value.(cfgtypes.Mode) == cfgtypes.Mode_Local
if !isLocalMode {
return nil
}

ecOpen := false
ccOpen := false
ecP2PPort := t.cfg.ExecutionCommon.P2pPort.Value.(uint16)
ccP2PPort := t.cfg.ConsensusCommon.P2pPort.Value.(uint16)
publicIP, err := getPublicIP()
publicIP, err := t.getPublicIP()
if err == nil {
ecOpen = isPortReachableNATReflection(publicIP, ecP2PPort)
ccOpen = isPortReachableNATReflection(publicIP, ccP2PPort)
ecOpen = t.isPortReachableNATReflection(publicIP, ecP2PPort)
ccOpen = t.isPortReachableNATReflection(publicIP, ccP2PPort)
}

if !ecOpen {
// Fallback to using an external service
ecOpen, _, err = isPortReachableExternalService(ecP2PPort)
ecOpen, _, err = t.isPortReachableExternalService(ecP2PPort)
if err != nil {
return fmt.Errorf("error checking port connectivity: %w", err)
}
Expand All @@ -99,14 +117,15 @@ func (t *checkPortConnectivity) run() error {
if t.wasEth1PortOpen {
t.log.Printlnf("WARNING: Execution client P2P port %d is not accessible from the internet.", ecP2PPort)
}
if err := alerting.AlertEth1P2PPortNotOpen(t.cfg, ecP2PPort); err != nil {
if err := t.alertEth1P2PPortNotOpen(t.cfg, ecP2PPort); err != nil {
t.log.Printlnf("WARNING: Could not send Eth1P2PPortNotOpen alert: %s", err.Error())
}
}
t.wasEth1PortOpen = ecOpen

if !ccOpen {
// Fallback to using an external service
ccOpen, _, err = isPortReachableExternalService(ccP2PPort)
ccOpen, _, err = t.isPortReachableExternalService(ccP2PPort)
if err != nil {
return fmt.Errorf("error checking port connectivity: %w", err)
}
Expand All @@ -120,7 +139,7 @@ func (t *checkPortConnectivity) run() error {
if t.wasBeaconP2POpen {
t.log.Printlnf("WARNING: Consensus client P2P port %d is not accessible from the internet.", ccP2PPort)
}
if err := alerting.AlertBeaconP2PPortNotOpen(t.cfg, ccP2PPort); err != nil {
if err := t.alertBeaconP2PPortNotOpen(t.cfg, ccP2PPort); err != nil {
t.log.Printlnf("WARNING: Could not send BeaconP2PPortNotOpen alert: %s", err.Error())
}
}
Expand Down
95 changes: 95 additions & 0 deletions shared/services/connectivity/check-port-connectivity_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package connectivity

import (
"testing"

cfg "github.com/rocket-pool/smartnode/shared/services/config"
cfgtypes "github.com/rocket-pool/smartnode/shared/types/config"
log "github.com/rocket-pool/smartnode/shared/utils/log"
)

// TestCheckPortConnectivity_Run verifies that the port connectivity task correctly
// decides whether to perform network checks based on the client modes and global
// alerting settings.
func TestCheckPortConnectivity_Run(t *testing.T) {
logger := log.NewColorLogger(0)
tests := []struct {
name string
ecMode cfgtypes.Mode
ccMode cfgtypes.Mode
enableAlerting bool
portAlertingEnabled bool
expectNetCalls bool
}{
{
name: "local EC + local CC -> performs checks",
ecMode: cfgtypes.Mode_Local,
ccMode: cfgtypes.Mode_Local,
enableAlerting: true,
portAlertingEnabled: true,
expectNetCalls: true,
},
{
name: "external EC + external CC -> skips checks",
ecMode: cfgtypes.Mode_External,
ccMode: cfgtypes.Mode_External,
enableAlerting: true,
portAlertingEnabled: true,
expectNetCalls: false,
},
{
name: "alerting disabled -> skips checks",
ecMode: cfgtypes.Mode_Local,
ccMode: cfgtypes.Mode_Local,
enableAlerting: false,
portAlertingEnabled: true,
expectNetCalls: false,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
config := cfg.NewRocketPoolConfig("", false)
config.ExecutionClientMode.Value = tc.ecMode
config.ConsensusClientMode.Value = tc.ccMode
config.Alertmanager.EnableAlerting.Value = tc.enableAlerting
config.Alertmanager.AlertEnabled_PortConnectivityCheck.Value = tc.portAlertingEnabled

netCallsMade := false
mockGetPublicIP := func() (string, error) {
netCallsMade = true
return "1.2.3.4", nil
}
mockIsPortReachable := func(host string, port uint16) bool {
netCallsMade = true
return true
}
mockExternalCheck := func(port uint16) (bool, string, error) {
netCallsMade = true
return true, "Success", nil
}
mockAlert := func(cfg *cfg.RocketPoolConfig, port uint16) error {
return nil
}

task := &CheckPortConnectivity{
cfg: config,
log: logger,
getPublicIP: mockGetPublicIP,
isPortReachableNATReflection: mockIsPortReachable,
isPortReachableExternalService: mockExternalCheck,
alertEth1P2PPortNotOpen: mockAlert,
alertBeaconP2PPortNotOpen: mockAlert,
}

err := task.Run()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if netCallsMade != tc.expectNetCalls {
t.Errorf("expected network calls: %v, got: %v", tc.expectNetCalls, netCallsMade)
}
})
}
}
Loading