3737 UpdateJob ,
3838 UpdateJobDefinition ,
3939)
40- from jupyter_scheduler .orm import Job , JobDefinition , Workflow , create_session
40+ from jupyter_scheduler .orm import Job , JobDefinition , Workflow , WorkflowDefinition , create_session
4141from jupyter_scheduler .utils import (
4242 copy_directory ,
4343 create_output_directory ,
4444 create_output_filename ,
4545)
46- from jupyter_scheduler .workflows import CreateWorkflow , DescribeWorkflow , UpdateWorkflow
46+ from jupyter_scheduler .workflows import (
47+ CreateWorkflow ,
48+ CreateWorkflowDefinition ,
49+ DescribeWorkflow ,
50+ DescribeWorkflowDefinition ,
51+ UpdateWorkflow ,
52+ UpdateWorkflowDefinition ,
53+ )
4754
4855
4956class BaseScheduler (LoggingConfigurable ):
@@ -117,6 +124,10 @@ def run_workflow(self, workflow_id: str) -> str:
117124 """Triggers execution of the workflow."""
118125 raise NotImplementedError ("must be implemented by subclass" )
119126
127+ def activate_workflow_definition (self , workflow_definition_id : str ) -> str :
128+ """Activates workflow marking it as ready for execution."""
129+ raise NotImplementedError ("must be implemented by subclass" )
130+
120131 def get_workflow (self , workflow_id : str ) -> DescribeWorkflow :
121132 """Returns workflow record for a single workflow."""
122133 raise NotImplementedError ("must be implemented by subclass" )
@@ -125,6 +136,12 @@ def create_workflow_task(self, workflow_id: str, model: CreateJob) -> str:
125136 """Adds a task to a workflow."""
126137 raise NotImplementedError ("must be implemented by subclass" )
127138
139+ def create_workflow_definition_task (
140+ self , workflow_definition_id : str , model : CreateJobDefinition
141+ ) -> str :
142+ """Adds a task to a workflow definition."""
143+ raise NotImplementedError ("must be implemented by subclass" )
144+
128145 def update_job (self , job_id : str , model : UpdateJob ):
129146 """Updates job metadata in the persistence store,
130147 for example name, status etc. In case of status
@@ -176,6 +193,13 @@ def create_job_definition(self, model: CreateJobDefinition) -> str:
176193 """
177194 raise NotImplementedError ("must be implemented by subclass" )
178195
196+ def create_workflow_definition (self , model : CreateWorkflowDefinition ) -> str :
197+ """Creates a new workflow definition record,
198+ consider this as the template for creating
199+ recurring/scheduled workflows.
200+ """
201+ raise NotImplementedError ("must be implemented by subclass" )
202+
179203 def update_job_definition (self , job_definition_id : str , model : UpdateJobDefinition ):
180204 """Updates job definition metadata in the persistence store,
181205 should only impact all future jobs.
@@ -192,6 +216,10 @@ def get_job_definition(self, job_definition_id: str) -> DescribeJobDefinition:
192216 """Returns job definition record for a single job definition"""
193217 raise NotImplementedError ("must be implemented by subclass" )
194218
219+ def get_workflow_definition (self , workflow_definition_id : str ) -> DescribeWorkflowDefinition :
220+ """Returns workflow definition record for a single workflow definition"""
221+ raise NotImplementedError ("must be implemented by subclass" )
222+
195223 def list_job_definitions (self , query : ListJobDefinitionsQuery ) -> ListJobDefinitionsResponse :
196224 """Returns list of all job definitions filtered by query"""
197225 raise NotImplementedError ("must be implemented by subclass" )
@@ -524,6 +552,13 @@ def create_workflow(self, model: CreateWorkflow) -> str:
524552 session .commit ()
525553 return workflow .workflow_id
526554
555+ def create_workflow_definition (self , model : CreateWorkflowDefinition ) -> str :
556+ with self .db_session () as session :
557+ workflow_definition = WorkflowDefinition (** model .dict (exclude_none = True ))
558+ session .add (workflow_definition )
559+ session .commit ()
560+ return workflow_definition .workflow_definition_id
561+
527562 def run_workflow (self , workflow_id : str ) -> str :
528563 execution_manager = self .execution_manager_class (
529564 workflow_id = workflow_id ,
@@ -533,6 +568,15 @@ def run_workflow(self, workflow_id: str) -> str:
533568 execution_manager .process_workflow ()
534569 return workflow_id
535570
571+ def activate_workflow_definition (self , workflow_definition_id : str ) -> str :
572+ execution_manager = self .execution_manager_class (
573+ workflow_definition_id = workflow_definition_id ,
574+ root_dir = self .root_dir ,
575+ db_url = self .db_url ,
576+ )
577+ execution_manager .activate_workflow_definition ()
578+ return workflow_definition_id
579+
536580 def get_workflow (self , workflow_id : str ) -> DescribeWorkflow :
537581 with self .db_session () as session :
538582 workflow_record = (
@@ -541,6 +585,16 @@ def get_workflow(self, workflow_id: str) -> DescribeWorkflow:
541585 model = DescribeWorkflow .from_orm (workflow_record )
542586 return model
543587
588+ def get_workflow_definition (self , workflow_definition_id : str ) -> DescribeWorkflowDefinition :
589+ with self .db_session () as session :
590+ workflow_definition_record = (
591+ session .query (WorkflowDefinition )
592+ .filter (WorkflowDefinition .workflow_definition_id == workflow_definition_id )
593+ .one ()
594+ )
595+ model = DescribeWorkflowDefinition .from_orm (workflow_definition_record )
596+ return model
597+
544598 def create_workflow_task (self , workflow_id : str , model : CreateJob ) -> str :
545599 job_id = self .create_job (model , run = False )
546600 workflow : DescribeWorkflow = self .get_workflow (workflow_id )
@@ -549,13 +603,36 @@ def create_workflow_task(self, workflow_id: str, model: CreateJob) -> str:
549603 self .update_workflow (workflow_id , UpdateWorkflow (tasks = updated_tasks ))
550604 return job_id
551605
606+ def create_workflow_definition_task (
607+ self , workflow_definition_id : str , model : CreateJobDefinition
608+ ) -> str :
609+ job_definition_id = self .create_job_definition (model , add_to_task_runner = False )
610+ workflow_definition : DescribeWorkflowDefinition = self .get_workflow_definition (
611+ workflow_definition_id
612+ )
613+ updated_tasks = (workflow_definition .tasks or [])[:]
614+ updated_tasks .append (job_definition_id )
615+ self .update_workflow_definition (
616+ workflow_definition_id , UpdateWorkflowDefinition (tasks = updated_tasks )
617+ )
618+ return job_definition_id
619+
552620 def update_workflow (self , workflow_id : str , model : UpdateWorkflow ):
553621 with self .db_session () as session :
554622 session .query (Workflow ).filter (Workflow .workflow_id == workflow_id ).update (
555623 model .dict (exclude_none = True )
556624 )
557625 session .commit ()
558626
627+ def update_workflow_definition (
628+ self , workflow_definition_id : str , model : UpdateWorkflowDefinition
629+ ):
630+ with self .db_session () as session :
631+ session .query (WorkflowDefinition ).filter (
632+ WorkflowDefinition .workflow_definition_id == workflow_definition_id
633+ ).update (model .dict (exclude_none = True ))
634+ session .commit ()
635+
559636 def update_job (self , job_id : str , model : UpdateJob ):
560637 with self .db_session () as session :
561638 session .query (Job ).filter (Job .job_id == job_id ).update (model .dict (exclude_none = True ))
@@ -657,7 +734,9 @@ def stop_job(self, job_id):
657734 session .commit ()
658735 break
659736
660- def create_job_definition (self , model : CreateJobDefinition ) -> str :
737+ def create_job_definition (
738+ self , model : CreateJobDefinition , add_to_task_runner : bool = True
739+ ) -> str :
661740 with self .db_session () as session :
662741 if not self .file_exists (model .input_uri ):
663742 raise InputUriError (model .input_uri )
@@ -681,7 +760,7 @@ def create_job_definition(self, model: CreateJobDefinition) -> str:
681760 else :
682761 self .copy_input_file (model .input_uri , staging_paths ["input" ])
683762
684- if self .task_runner and job_definition_schedule :
763+ if add_to_task_runner and self .task_runner and job_definition_schedule :
685764 self .task_runner .add_job_definition (job_definition_id )
686765
687766 return job_definition_id
0 commit comments