2525 "scheduler_type" ,
2626 "scheduler_id" ,
2727]
28-
2928EXCLUDED_JOB_STATE_FIELDS = JOB_INIT_EXCLUDED_JOB_STATE_FIELDS + ["job_input" ]
30-
3129OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS = EXCLUDED_JOB_STATE_FIELDS + ["user" , "wsid" ]
3230
3331EXTRA_JOB_STATE_FIELDS = ["batch_id" , "child_jobs" ]
@@ -100,7 +98,7 @@ def __init__(self, ee2_state, extra_data=None, children=None):
10098 if ee2_state .get ("job_id" ) is None :
10199 raise ValueError ("Cannot create a job without a job ID!" )
102100
103- self ._acc_state = ee2_state
101+ self ._update_state ( ee2_state )
104102 self .extra_data = extra_data
105103
106104 # verify parent-children relationship
@@ -325,20 +323,30 @@ def _update_state(self, state: dict) -> None:
325323 """
326324 given a state data structure (as emitted by ee2), update the stored state in the job object
327325 """
328- if state :
326+ if not isinstance (state , dict ):
327+ raise TypeError ("state must be a dict" )
329328
329+ # Check job_id match
330+ if self ._acc_state :
330331 if "job_id" in state and state ["job_id" ] != self .job_id :
331332 raise ValueError (
332333 f"Job ID mismatch in _update_state: job ID: { self .job_id } ; state ID: { state ['job_id' ]} "
333334 )
334335
335- state = copy .deepcopy (state )
336- if self ._acc_state is None :
337- self ._acc_state = state
338- else :
339- self ._acc_state .update (state )
336+ # Check if there would be no change in updating
337+ # i.e., if state <= self._acc_state
338+ if self ._acc_state is not None :
339+ if {** self ._acc_state , ** state } == self ._acc_state :
340+ return
341+
342+ state = copy .deepcopy (state )
343+ if self ._acc_state is None :
344+ self ._acc_state = state
345+ else :
346+ self ._acc_state .update (state )
347+ self .last_updated = time .time_ns ()
340348
341- def state (self , force_refresh = False ):
349+ def state (self , force_refresh = False , exclude = JOB_INIT_EXCLUDED_JOB_STATE_FIELDS ):
342350 """
343351 Queries the job service to see the state of the current job.
344352 """
@@ -347,47 +355,63 @@ def state(self, force_refresh=False):
347355 state = self .query_ee2_state (self .job_id , init = False )
348356 self ._update_state (state )
349357
350- return self ._internal_state (JOB_INIT_EXCLUDED_JOB_STATE_FIELDS )
358+ return self ._internal_state (exclude )
351359
352360 def _internal_state (self , exclude = None ):
353361 """Wrapper for self._acc_state"""
354362 state = copy .deepcopy (self ._acc_state )
355363 self ._trim_ee2_state (state , exclude )
356364 return state
357365
358- def output_state (self , state = None ) -> dict :
366+ def output_state (self , state = None , no_refresh = False ) -> dict :
359367 """
360- :param state: can be queried individually from ee2/cache with self.state(),
361- but sometimes want it to be queried in bulk from ee2 upstream
362- :return: dict, with structure
363-
364- {
365- outputWidgetInfo: (if not finished, None, else...) job.get_viewer_params result
366- jobState: {
367- job_id: string,
368- status: string,
369- created: epoch ms,
370- updated: epoch ms,
371- queued: optional - epoch ms,
372- finished: optional - epoc ms,
373- terminated_code: optional - int,
374- tag: string (release, beta, dev),
375- parent_job_id: optional - string or null,
376- run_id: string,
377- cell_id: string,
378- errormsg: optional - string,
379- error (optional): {
380- code: int,
381- name: string,
382- message: string (should be for the user to read),
383- error: string, (likely a stacktrace)
384- },
385- error_code: optional - int
386- }
387- }
368+ :param state: Supplied when the state is queried beforehand from EE2 in bulk,
369+ or when it is retrieved from a cache. If not supplied, must be
370+ queried with self.state() or self._internal_state()
371+ :return: dict - with structure generally like (not accounting for error modes):
372+ {
373+ "job_id": string,
374+ "jobState": {
375+ "status": string - enum,
376+ "created": epoch ms,
377+ "updated": epoch ms,
378+ "queued": epoch ms,
379+ "running": epoch ms,
380+ "finished": epoch ms,
381+ "batch_job": bool,
382+ "job_output": {
383+ "version": string,
384+ "result": [
385+ {
386+ "obj_ref": string,
387+ "report_name": string,
388+ "report_ref": string,
389+ }
390+ ],
391+ "id": string
392+ },
393+ "batch_id": string,
394+ "child_jobs": list,
395+ "retry_ids": list,
396+ "retry_count": int,
397+ "job_id": string,
398+ "created": epoch ms
399+ },
400+ "outputWidgetInfo": { # None if not finished
401+ "name": string,
402+ "tag": string - (release, beta, dev),
403+ "params": {
404+ "wsName": string,
405+ "obj_ref": string,
406+ "report_name": string,
407+ "report_ref": string
408+ "report_window_line_height": string
409+ }
410+ }
411+ }
388412 """
389413 if not state :
390- state = self .state ()
414+ state = self ._internal_state () if no_refresh else self . state ()
391415 else :
392416 self ._update_state (state )
393417 state = self ._internal_state ()
0 commit comments