@@ -494,20 +494,17 @@ def __init__(
494494 interfaces = [],
495495 ** kwargs ,
496496 )
497+ self ._rss_queues = rss_queues
497498 self ._yaf_settings = self ._settings
498499 self ._interfaces = [interface .name for interface in interfaces ]
499- inf_names = "," .join (self ._interfaces )
500- self ._yafzbalance_settings = YafZCBalanceSettings (
501- in_ = inf_names , cluster = cluster , num = num , core = core , time_ = time_ , stats = stats
502- )
503500
504501 self ._yaf_instances : List [Yaf ] = []
505- self ._executors = self ._duplicate_executor (self ._executor , num )
502+ self ._executors = self ._duplicate_executor (self ._executor , rss_queues )
506503 inf_speed = interfaces [0 ].speed
507- for i in range (num ):
508- in_cluster = f"{ cluster } : { i } "
504+ for i in range (rss_queues ):
505+ in_inf = f"{ self . _interfaces [ 0 ] } @ { i } "
509506 instance = Yaf (
510- interfaces = [InterfaceCfg (in_cluster , inf_speed )],
507+ interfaces = [InterfaceCfg (in_inf , inf_speed )],
511508 target = target ,
512509 protocols = protocols ,
513510 executor = self ._executors [i ],
@@ -518,10 +515,6 @@ def __init__(
518515 instance ._local_workdir , f"settings_{ i } .conf"
519516 )
520517 self ._yaf_instances .append (instance )
521- self ._rss_queues = rss_queues
522- self ._cmd = self ._prepare_zc_cmd ()
523- self ._log_file = path .join (self ._local_workdir , "yafzcbalance.log" )
524- assert_tool_is_installed ("yafzcbalance" , self ._executor )
525518
526519 def _duplicate_executor (self , executor : Executor , num : int = 1 ) -> List [Executor ]:
527520 executors = []
@@ -536,68 +529,17 @@ def _duplicate_executor(self, executor: Executor, num: int = 1) -> List[Executor
536529
537530 return executors
538531
539- def _prepare_zc_cmd (self ) -> str :
540- args = ["yafzcbalance" ]
541- for opt in fields (self ._yafzbalance_settings ):
542- key = opt .name
543- val = getattr (self ._yafzbalance_settings , key )
544- if val is None :
545- continue
546- if key .endswith ("_" ):
547- key = key [:- 1 ]
548- args .extend ([f"--{ key } " , f"'{ val } '" ])
549- return " " .join (args )
550-
551532 def _before_start (self ):
552533 for ifc in self ._interfaces :
553534 if ifc .startswith ("zc:" ):
554535 self ._switch_to_zc (ifc .split (":" , 1 )[- 1 ])
555536
556537 def start (self ):
557538 """Start the probe."""
558- logging .getLogger ().info (
559- "Starting yafzcbalance on %s." , "," .join (self ._interfaces )
560- )
561-
562- # check and stop running yaf instance
563- check_running_cmd = "pidof 'yafzcbalance'"
564- running_processes = Tool (
565- check_running_cmd , executor = self ._executor , failure_verbosity = "silent"
566- ).run ()[0 ]
567- if len (running_processes ) > 0 :
568- pids = running_processes .split ()
569- for pid in pids :
570- if not pid :
571- continue
572- running_pid = int (pid )
573- self ._stop_process (running_pid )
574- time .sleep (2 )
575-
576539 self ._before_start ()
577540
578541 self .host_statistics .start ()
579542
580- self ._process = Daemon (self ._cmd , executor = self ._executor , sudo = self ._sudo )
581- # stderr is implicitly redirected to stdout
582- self ._process .set_outputs (self ._log_file )
583- self ._process .start ()
584- time .sleep (1 )
585-
586- if not self ._process .is_running ():
587- res = self ._process .stop ()
588- return_code = self ._process .returncode ()
589- self ._process = None
590-
591- # stderr is redirected to stdout
592- err = res [0 ]
593- logging .getLogger ().error (
594- "Unable to start yafzcbalance on %s. yaf return code: %d, error: %s" ,
595- "," .join (self ._settings .input .inf ),
596- return_code ,
597- err ,
598- )
599- raise ProbeException ("yafzcbalance startup error" )
600-
601543 max_restarts = 10
602544 restarts = 0
603545
@@ -628,9 +570,6 @@ def start(self):
628570
629571 def stop (self ):
630572 """Stop the probe."""
631- # if process not running, method has no effect
632- if self ._process is None :
633- return
634573
635574 for instance in self ._yaf_instances :
636575 instance .stop ()
@@ -643,33 +582,6 @@ def stop(self):
643582 failure_verbosity = "silent" ,
644583 ).run ()
645584
646- logging .getLogger ().info ("Stopping yafzcbalance." )
647-
648- stdout = []
649- try :
650- stdout , _ = self ._process .stop ()
651- except ExecutableProcessError :
652- pass
653-
654- # Wait till yafzcbalance finishes
655- Tool (
656- f"while pidof { self ._cmd .split (' ' , 1 )[0 ]} > /dev/null; do :; done" ,
657- executor = self ._fallback_executor ,
658- sudo = self ._sudo ,
659- failure_verbosity = "silent" ,
660- ).run ()
661-
662- if self ._process .returncode () > 0 :
663- # stderr is redirected to stdout
664- # Since stdout could be filled with normal output, print only last 1 line#
665- err = stdout [- 1 ] if stdout else ""
666- logging .getLogger ().error (
667- "yaf runtime error: %s, error: %s" ,
668- self ._process .returncode (),
669- err ,
670- )
671-
672- self ._process = None
673585 self .host_statistics .stop ()
674586
675587 self ._after_stop ()
@@ -701,6 +613,8 @@ def _get_driver(self, interface_name):
701613 return driver .split (" " )[1 ].strip ()
702614
703615 def _switch_to_zc (self , interface_name : str ):
616+ if "@" in interface_name :
617+ interface_name = interface_name .split ("@" , 1 )[0 ]
704618 driver = self ._get_driver (interface_name )
705619 if not driver :
706620 return
0 commit comments