@@ -2195,36 +2195,59 @@ def _family_ascent_point_update(self, fp_id):
21952195 tp_added = self .added [TASK_PROXIES ]
21962196 # Count child family states, set is_held, is_queued, is_runahead
21972197 state_counter = Counter ({})
2198+ active_counter = Counter ({})
21982199 is_held_total = 0
21992200 is_queued_total = 0
22002201 is_runahead_total = 0
22012202 is_retry = False
22022203 is_wallclock = False
22032204 is_xtriggered = False
22042205 graph_depth = self .n_edge_distance
2206+ # Gather the counts and totals of child families.
22052207 for child_id in fam_node .child_families :
22062208 child_node = fp_updated .get (child_id , fp_data .get (child_id ))
22072209 if child_node is not None :
2208- is_held_total += child_node .is_held_total
22092210 is_queued_total += child_node .is_queued_total
22102211 is_runahead_total += child_node .is_runahead_total
2211- state_counter += Counter (dict (child_node .state_totals ))
2212+ if child_node .graph_depth == 0 :
2213+ is_held_total += child_node .is_held_total
2214+ active_counter += Counter (
2215+ dict (child_node .state_totals )
2216+ )
2217+ else :
2218+ state_counter += Counter (dict (child_node .state_totals ))
22122219 if child_node .graph_depth < graph_depth :
22132220 graph_depth = child_node .graph_depth
22142221 # Gather all child task states
22152222 task_states = []
2223+ active_states = []
22162224 for tp_id in fam_node .child_tasks :
2225+ is_active = False
22172226 if all_nodes and tp_id not in all_nodes :
22182227 continue
22192228
22202229 tp_delta = tp_updated .get (tp_id )
22212230 tp_node = tp_added .get (tp_id , tp_data .get (tp_id ))
22222231
2232+ tp_depth = tp_delta
2233+ if tp_depth is None or not tp_depth .HasField ('graph_depth' ):
2234+ tp_depth = tp_node
2235+ if tp_depth .graph_depth < graph_depth :
2236+ graph_depth = tp_depth .graph_depth
2237+
2238+ if tp_id in self .all_task_pool :
2239+ is_active = True
2240+
22232241 tp_state = self .from_delta_or_node (tp_delta , tp_node , 'state' )
22242242 if tp_state :
22252243 task_states .append (tp_state )
2244+ if is_active :
2245+ active_states .append (tp_state )
22262246
2227- if self .from_delta_or_node (tp_delta , tp_node , 'is_held' ):
2247+ if (
2248+ is_active
2249+ and self .from_delta_or_node (tp_delta , tp_node , 'is_held' )
2250+ ):
22282251 is_held_total += 1
22292252
22302253 if self .from_delta_or_node (tp_delta , tp_node , 'is_queued' ):
@@ -2242,18 +2265,15 @@ def _family_ascent_point_update(self, fp_id):
22422265 if self .from_delta_or_node (tp_delta , tp_node , 'is_xtriggered' ):
22432266 is_xtriggered = True
22442267
2245- tp_depth = tp_delta
2246- if tp_depth is None or not tp_depth .HasField ('graph_depth' ):
2247- tp_depth = tp_node
2248- if tp_depth .graph_depth < graph_depth :
2249- graph_depth = tp_depth .graph_depth
2250-
22512268 state_counter += Counter (task_states )
2269+ active_counter += Counter (active_states )
2270+ # if n=0 tasks exist only count those, otherwise count all.
2271+ group_counter = active_counter or state_counter
22522272 # created delta data element
22532273 fp_delta = PbFamilyProxy (
22542274 id = fp_id ,
22552275 stamp = f'{ fp_id } @{ time ()} ' ,
2256- state = extract_group_state (state_counter .keys ()),
2276+ state = extract_group_state (group_counter .keys ()),
22572277 is_held = (is_held_total > 0 ),
22582278 is_held_total = is_held_total ,
22592279 is_queued = (is_queued_total > 0 ),
@@ -2265,10 +2285,10 @@ def _family_ascent_point_update(self, fp_id):
22652285 is_xtriggered = is_xtriggered ,
22662286 graph_depth = graph_depth ,
22672287 )
2268- fp_delta .states [:] = state_counter .keys ()
2288+ fp_delta .states [:] = group_counter .keys ()
22692289 # Use all states to clean up pruned counts
22702290 for state in TASK_STATUSES_ORDERED :
2271- fp_delta .state_totals [state ] = state_counter .get (state , 0 )
2291+ fp_delta .state_totals [state ] = group_counter .get (state , 0 )
22722292 fp_updated .setdefault (fp_id , PbFamilyProxy ()).MergeFrom (fp_delta )
22732293 # mark as updated in case parent family is updated next
22742294 self .updated_state_families .add (fp_id )
@@ -2319,7 +2339,7 @@ def update_workflow(self, reloaded=False):
23192339 root_node = root_node_updated
23202340 else :
23212341 root_node = data [FAMILY_PROXIES ].get (root_id )
2322- if root_node is not None :
2342+ if root_node is not None and root_node . graph_depth == 0 :
23232343 is_held_total += root_node .is_held_total
23242344 is_queued_total += root_node .is_queued_total
23252345 is_runahead_total += root_node .is_runahead_total
0 commit comments