Skip to content

Commit 60cfc86

Browse files
committed
feat: sync to libovsdb with reconnection
1 parent a9175ec commit 60cfc86

File tree

6 files changed

+44
-183
lines changed

6 files changed

+44
-183
lines changed

cmd/everoute-agent/main.go

Lines changed: 0 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,8 @@ package main
1919
import (
2020
"flag"
2121
"net"
22-
"os"
2322
"time"
2423

25-
"github.com/contiv/libovsdb"
26-
"gopkg.in/fsnotify.v1"
2724
corev1 "k8s.io/api/core/v1"
2825
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2926
"k8s.io/apimachinery/pkg/util/wait"
@@ -78,9 +75,6 @@ func main() {
7875
if err != nil {
7976
klog.Fatalf("unable to create ovsdb monitor: %s", err.Error())
8077
}
81-
82-
go watchFile(libovsdb.DEFAULT_SOCK, stopChan, datapathManager.OvsdbReconnectChan, ovsdbMonitor.OvsdbReconnChan)
83-
8478
ovsdbMonitor.RegisterOvsdbEventHandler(monitor.OvsdbEventHandlerFuncs{
8579
LocalEndpointAddFunc: func(endpoint *datapath.Endpoint) {
8680
err := datapathManager.AddLocalEndpoint(endpoint)
@@ -181,85 +175,3 @@ func startManager(mgr manager.Manager, datapathManager *datapath.DpManager, stop
181175

182176
return nil
183177
}
184-
185-
func watchFile(fileName string, stopChan <-chan struct{}, recoveryEventChan ...chan struct{}) {
186-
watcher, err := fsnotify.NewWatcher()
187-
if err != nil {
188-
klog.Fatalf("Failed to watch file: %v", fileName)
189-
}
190-
191-
if err := addWatchFile(watcher, fileName); err != nil {
192-
klog.Fatalf("Failed to add file to watcher, error: %v", err)
193-
}
194-
195-
createChan := make(chan bool)
196-
removeChan := make(chan bool)
197-
go func() {
198-
for {
199-
select {
200-
case event, ok := <-watcher.Events:
201-
if !ok {
202-
continue
203-
}
204-
if event.Op&fsnotify.Create == fsnotify.Create {
205-
createChan <- true
206-
}
207-
if event.Op&fsnotify.Remove == fsnotify.Remove {
208-
removeChan <- true
209-
}
210-
case err := <-watcher.Errors:
211-
// Error chan need handle
212-
klog.Errorf("File watcher error: %v", err)
213-
}
214-
}
215-
}()
216-
217-
go func(watcher *fsnotify.Watcher) {
218-
for {
219-
select {
220-
case <-removeChan:
221-
klog.Infof("Deleted unix domain sock : %v", fileName)
222-
223-
// wait for watched file recovery (e.g ovsdb/vswitchd failover). we need timeout constant, 10s is temporary value. FIXME
224-
if err := waitUntilFileCreate(fileName, 10*time.Second); err != nil {
225-
klog.Infof("Time out for wait file restore")
226-
}
227-
// watch vswitchd doamain socket again
228-
if err := addWatchFile(watcher, fileName); err != nil {
229-
klog.Fatalf("Failed to watch file after removed file was re-added")
230-
}
231-
232-
// trigger datapathManager faileover event (e.g flow replay or ovsdb connection reset)
233-
for _, c := range recoveryEventChan {
234-
select {
235-
case c <- struct{}{}:
236-
default:
237-
}
238-
}
239-
case <-createChan:
240-
klog.Infof("Created unix domain sock : %v", fileName)
241-
}
242-
}
243-
}(watcher)
244-
245-
<-stopChan
246-
watcher.Close()
247-
}
248-
249-
func addWatchFile(watcher *fsnotify.Watcher, filepath string) error {
250-
if err := watcher.Add(filepath); err != nil {
251-
return err
252-
}
253-
klog.Infof("Add file watcher for file: %v", filepath)
254-
255-
return nil
256-
}
257-
258-
func waitUntilFileCreate(fileName string, timeout time.Duration) error {
259-
return wait.PollImmediate(10*time.Millisecond, timeout, func() (do bool, err error) {
260-
if _, err := os.Stat(fileName); os.IsNotExist(err) {
261-
return false, nil
262-
}
263-
return true, nil
264-
})
265-
}

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ require (
3434
golang.org/x/sys v0.1.0
3535
google.golang.org/grpc v1.38.0
3636
google.golang.org/protobuf v1.26.0
37-
gopkg.in/fsnotify.v1 v1.4.7
3837
gopkg.in/yaml.v2 v2.4.0
3938
k8s.io/api v0.22.2
4039
k8s.io/apimachinery v0.22.2
@@ -103,6 +102,7 @@ require (
103102
gomodules.xyz/jsonpatch/v2 v2.0.1 // indirect
104103
google.golang.org/appengine v1.6.5 // indirect
105104
google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a // indirect
105+
gopkg.in/fsnotify.v1 v1.4.7 // indirect
106106
gopkg.in/inf.v0 v0.9.1 // indirect
107107
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
108108
k8s.io/apiextensions-apiserver v0.18.2 // indirect
@@ -114,8 +114,8 @@ require (
114114

115115
replace (
116116
github.com/contiv/libOpenflow => github.com/everoute/libOpenflow v0.0.0-20221227081521-581066a8b4dc
117-
github.com/contiv/libovsdb => github.com/everoute/libovsdb v0.0.0-20210326110222-6c508538aa65
118-
github.com/contiv/ofnet => github.com/everoute/ofnet v0.0.0-20230109035650-66800da0a8a2
117+
github.com/contiv/libovsdb => github.com/everoute/libovsdb v0.0.0-20230109020235-5be40f26b455
118+
github.com/contiv/ofnet => github.com/everoute/ofnet v0.0.0-20230113044907-b82060d6fd13
119119
github.com/osrg/gobgp => github.com/everoute/gobgp v0.0.0-20210127101833-12edfc1f4514
120120
github.com/vishvananda/netlink => github.com/everoute/netlink v0.0.0-20221013091203-83dd58b9dc66
121121
k8s.io/api v0.22.2 => k8s.io/api v0.20.6

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -300,12 +300,12 @@ github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQo
300300
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
301301
github.com/everoute/libOpenflow v0.0.0-20221227081521-581066a8b4dc h1:eiZtTE7sDy4Jx3IAYzKlXPqagddqmagyMYMurPn+u/k=
302302
github.com/everoute/libOpenflow v0.0.0-20221227081521-581066a8b4dc/go.mod h1:xsbTUkWYuDqVv4jMxwinRGdjlYwj0aZS8Rzf6HoNLdg=
303-
github.com/everoute/libovsdb v0.0.0-20210326110222-6c508538aa65 h1:a3dwYQt/Y9Yfz1XKpUelQKS5YDgmG4NtVpBrRguQY9k=
304-
github.com/everoute/libovsdb v0.0.0-20210326110222-6c508538aa65/go.mod h1:N8fR1bYMqi4pQ7jL17QlBlpJ0hHmqRAXVKkBwuIjClw=
303+
github.com/everoute/libovsdb v0.0.0-20230109020235-5be40f26b455 h1:x6peVyYZSmva1OJJZ2CCNPs4m+tvWEHJ1s5tX5j629s=
304+
github.com/everoute/libovsdb v0.0.0-20230109020235-5be40f26b455/go.mod h1:qV7bn/XogGqNiuk52aU4x8WqOb2JEU1CYDpf8D1jkRg=
305305
github.com/everoute/netlink v0.0.0-20221013091203-83dd58b9dc66 h1:/QretgciXKji8tf5ZqaqHL17hkAk+EiNmwc3nnrB5ok=
306306
github.com/everoute/netlink v0.0.0-20221013091203-83dd58b9dc66/go.mod h1:cAAsePK2e15YDAMJNyOpGYEWNe4sIghTY7gpz4cX/Ik=
307-
github.com/everoute/ofnet v0.0.0-20230109035650-66800da0a8a2 h1:iJd7EQmGPDsAwnVhIWwR3IjmgzqaDO/Us1kTllGBDj4=
308-
github.com/everoute/ofnet v0.0.0-20230109035650-66800da0a8a2/go.mod h1:1ykhSTKsn+UGs4MnSco+pwbM1pOLvjPfKp9oMysxPz0=
307+
github.com/everoute/ofnet v0.0.0-20230113044907-b82060d6fd13 h1:mcIDzBIJPA3W8azMO1jZKa5XlS+ivE16q2+DUPbvevk=
308+
github.com/everoute/ofnet v0.0.0-20230113044907-b82060d6fd13/go.mod h1:LYMapa6E2tzSkOR9Ebrg7f2fFl11KnUp/chB4gvr1Z0=
309309
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
310310
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
311311
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=

pkg/agent/datapath/multiBridgeDatapath.go

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,6 @@ type DpManager struct {
190190
FlowIDToRules map[uint64]*EveroutePolicyRuleEntry
191191
flowReplayChan chan struct{}
192192
flowReplayMutex sync.RWMutex
193-
OvsdbReconnectChan chan struct{}
194193
cleanConntrackChan chan EveroutePolicyRule // clean conntrack entries for rule in chan
195194

196195
ArpChan chan ArpInfo
@@ -309,7 +308,6 @@ func NewDatapathManager(datapathConfig *DpManagerConfig, ofPortIPAddressUpdateCh
309308
datapathManager.Info = new(DpManagerInfo)
310309
datapathManager.flowReplayChan = make(chan struct{})
311310
datapathManager.flowReplayMutex = sync.RWMutex{}
312-
datapathManager.OvsdbReconnectChan = make(chan struct{})
313311
datapathManager.cleanConntrackChan = make(chan EveroutePolicyRule, MaxCleanConntrackChanSize)
314312
datapathManager.ArpChan = make(chan ArpInfo, MaxArpChanCache)
315313

@@ -352,14 +350,6 @@ func (datapathManager *DpManager) InitializeDatapath(stopChan <-chan struct{}) {
352350
go datapathManager.syncIntenalIPs(stopChan)
353351
}
354352

355-
go func() {
356-
for range datapathManager.OvsdbReconnectChan {
357-
if err := datapathManager.ovsdbConnectionReset(); err != nil {
358-
log.Fatalf("Failed to reset ovsbd connection while ovsdb recovery")
359-
}
360-
}
361-
}()
362-
363353
for i := 0; i < MaxCleanConntrackWorkerNum; i++ {
364354
go wait.Until(datapathManager.cleanConntrackWorker, time.Second, stopChan)
365355
}
@@ -655,18 +645,6 @@ func InitializeVDS(datapathManager *DpManager, vdsID string, ovsbrName string, s
655645
}(vdsID)
656646
}
657647

658-
func (datapathManager *DpManager) ovsdbConnectionReset() error {
659-
for vdsID := range datapathManager.Config.ManagedVDSMap {
660-
for brKeyword := range datapathManager.OvsdbDriverMap[vdsID] {
661-
if err := datapathManager.OvsdbDriverMap[vdsID][brKeyword].ReConnectOvsdb(); err != nil {
662-
return fmt.Errorf("failed to reconnect vds %v localBridge ovsdb, error: %v", vdsID, err)
663-
}
664-
}
665-
}
666-
667-
return nil
668-
}
669-
670648
func (datapathManager *DpManager) replayVDSFlow(vdsID, bridgeKeyword string) error {
671649
datapathManager.flowReplayMutex.Lock()
672650
defer datapathManager.flowReplayMutex.Unlock()

pkg/monitor/ovsdb.go

Lines changed: 30 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,6 @@ type OVSDBMonitor struct {
7979

8080
// syncQueue used to notify ovsdb update
8181
syncQueue workqueue.RateLimitingInterface
82-
83-
OvsdbReconnChan chan struct{}
8482
}
8583

8684
// NewOVSDBMonitor create a new instance of OVSDBMonitor
@@ -97,7 +95,6 @@ func NewOVSDBMonitor() (*OVSDBMonitor, error) {
9795
localEndpointHardwareAddrCacheLock: sync.RWMutex{},
9896
localEndpointHardwareAddrCache: make(map[string]uint32),
9997
syncQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter()),
100-
OvsdbReconnChan: make(chan struct{}),
10198
}
10299

103100
return monitor, nil
@@ -130,32 +127,17 @@ func (monitor *OVSDBMonitor) Run(stopChan <-chan struct{}) {
130127
klog.Infof("start ovsdb monitor")
131128
defer klog.Infof("shutting down ovsdb monitor")
132129

133-
err := monitor.startOvsdbMonitor(false)
130+
err := monitor.startOvsdbMonitor()
134131
if err != nil {
135132
klog.Fatalf("unable start ovsdb monitor: %s", err)
136133
}
137-
for {
138-
select {
139-
case <-monitor.OvsdbReconnChan:
140-
klog.Infof("reconnect ovsdb monitor")
141-
monitor.ovsClient.Disconnect()
142-
monitor.ovsClient, err = ovsdb.ConnectUnix(ovsdb.DEFAULT_SOCK)
143-
if err != nil {
144-
klog.Fatalf("unable reconnect ovsdb socket: %s", err)
145-
}
146-
err := monitor.startOvsdbMonitor(true)
147-
if err != nil {
148-
klog.Fatalf("unable start ovsdb monitor: %s", err)
149-
}
150-
case <-stopChan:
151-
return
152-
}
153-
}
134+
135+
<-stopChan
154136
}
155137

156-
func (monitor *OVSDBMonitor) startOvsdbMonitor(isReconnect bool) error {
138+
func (monitor *OVSDBMonitor) startOvsdbMonitor() error {
157139
klog.Infof("start monitor ovsdb %s", "Open_vSwitch")
158-
monitor.ovsClient.Register(ovsUpdateHandlerFunc(monitor.handleOvsUpdates(false)))
140+
monitor.ovsClient.Register(ovsUpdateHandlerFunc(monitor.handleOvsUpdates))
159141

160142
selectAll := ovsdb.MonitorSelect{
161143
Initial: true,
@@ -170,11 +152,10 @@ func (monitor *OVSDBMonitor) startOvsdbMonitor(isReconnect bool) error {
170152
"Open_vSwitch": {Select: selectAll, Columns: []string{"ovs_version"}},
171153
}
172154

173-
initial, err := monitor.ovsClient.Monitor("Open_vSwitch", nil, requests)
155+
err := monitor.ovsClient.Monitor("Open_vSwitch", nil, requests)
174156
if err != nil {
175157
return fmt.Errorf("monitor ovsdb %s: %s", "Open_vSwitch", err)
176158
}
177-
monitor.handleOvsUpdates(isReconnect)(*initial)
178159

179160
return nil
180161
}
@@ -586,50 +567,34 @@ func (monitor *OVSDBMonitor) processEndpointDel(rowupdate ovsdb.RowUpdate) {
586567
}
587568
}
588569

589-
func (monitor *OVSDBMonitor) handleOvsUpdates(isReconn bool) func(updates ovsdb.TableUpdates) {
590-
return func(updates ovsdb.TableUpdates) {
591-
monitor.cacheLock.Lock()
592-
defer monitor.cacheLock.Unlock()
593-
594-
if isReconn {
595-
for table := range monitor.ovsdbCache {
596-
for uuid, row := range monitor.ovsdbCache[table] {
597-
if _, ok := updates.Updates[table].Rows[uuid]; !ok {
598-
updates.Updates[table].Rows[uuid] = ovsdb.RowUpdate{
599-
Uuid: ovsdb.UUID{GoUuid: uuid},
600-
Old: row,
601-
New: ovsdb.Row{},
602-
}
603-
}
604-
}
605-
}
606-
}
570+
func (monitor *OVSDBMonitor) handleOvsUpdates(updates ovsdb.TableUpdates) {
571+
monitor.cacheLock.Lock()
572+
defer monitor.cacheLock.Unlock()
607573

608-
for table, tableUpdate := range updates.Updates {
609-
if _, ok := monitor.ovsdbCache[table]; !ok {
610-
monitor.ovsdbCache[table] = make(map[string]ovsdb.Row)
611-
}
612-
for uuid, row := range tableUpdate.Rows {
613-
empty := ovsdb.Row{}
614-
if !reflect.DeepEqual(row.New, empty) {
615-
if table == "Interface" {
616-
go monitor.processEndpointUpdate(row)
617-
}
618-
if table == "Port" {
619-
go monitor.processPortUpdate(row)
620-
}
621-
622-
monitor.ovsdbCache[table][uuid] = row.New
623-
} else {
624-
if table == "Interface" {
625-
go monitor.processEndpointDel(row)
626-
}
574+
for table, tableUpdate := range updates.Updates {
575+
if _, ok := monitor.ovsdbCache[table]; !ok {
576+
monitor.ovsdbCache[table] = make(map[string]ovsdb.Row)
577+
}
578+
for uuid, row := range tableUpdate.Rows {
579+
empty := ovsdb.Row{}
580+
if !reflect.DeepEqual(row.New, empty) {
581+
if table == "Interface" {
582+
go monitor.processEndpointUpdate(row)
583+
}
584+
if table == "Port" {
585+
go monitor.processPortUpdate(row)
586+
}
627587

628-
delete(monitor.ovsdbCache[table], uuid)
588+
monitor.ovsdbCache[table][uuid] = row.New
589+
} else {
590+
if table == "Interface" {
591+
go monitor.processEndpointDel(row)
629592
}
593+
594+
delete(monitor.ovsdbCache[table], uuid)
630595
}
631596
}
632-
633-
monitor.syncQueue.Add("ovsdb-event")
634597
}
598+
599+
monitor.syncQueue.Add("ovsdb-event")
635600
}

tests/e2e/framework/node/agent.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"strconv"
2323
"strings"
2424

25+
"k8s.io/apimachinery/pkg/util/rand"
2526
"k8s.io/klog"
2627
)
2728

@@ -31,10 +32,15 @@ type Agent struct {
3132

3233
const (
3334
agentBinaryName = "everoute-agent"
35+
ovsRestart = "systemctl restart openvswitch"
3436
)
3537

3638
func (n *Agent) Restart() error {
37-
return n.reRunProcess(agentBinaryName)
39+
if rand.Intn(2) == 0 {
40+
return n.reRunProcess(agentBinaryName)
41+
}
42+
_, _, err := n.runCommand(ovsRestart)
43+
return err
3844
}
3945

4046
func (n *Agent) FetchLog() ([]byte, error) {

0 commit comments

Comments
 (0)