Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 214 additions & 12 deletions cravat/websubmit/websubmit.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ async def job_dir(self, request, job_id, given_username=None):

async def job_input (self, request, job_id):
job_dir, statusjson = await self.job_status(request, job_id)

# Handle case when job directory does not exist
if job_dir is None or not os.path.exists(job_dir):
return None

orig_input_fname = None
if 'orig_input_fname' in statusjson:
orig_input_fname = statusjson['orig_input_fname']
Expand All @@ -119,6 +124,11 @@ async def job_input (self, request, job_id):

async def job_run_name(self, request, job_id, given_username=None):
job_dir, statusjson = await self.job_status(request, job_id, given_username=given_username)

# Handle case when job directory does not exist
if job_dir is None or statusjson is None or not os.path.exists(job_dir):
return None

run_name = statusjson.get('run_name')
if run_name is None:
fns = os.listdir(job_dir)
Expand Down Expand Up @@ -175,6 +185,11 @@ async def job_report (self, request, job_id, report_type):
async def job_status (self, request, job_id, given_username=None):
try:
job_dir = await self.job_dir(request, job_id, given_username=given_username)

# Handle case when job directory does not exist
if job_dir is None or not os.path.exists(job_dir):
return None, {'status': 'Error', 'message': f'Job directory not found for job {job_id}'}

fns = os.listdir(job_dir)
statusjson = {}
for fn in fns:
Expand Down Expand Up @@ -314,6 +329,18 @@ async def resubmit (request):
if note != '':
run_args.append('--note')
run_args.append(note)

# Control worker count for WebUI execution (for chunk size optimization)
webui_workers = None
if 'webui_max_workers' in system_conf:
webui_workers = system_conf['webui_max_workers']
else:
# Default value: moderate worker count (4-8 range) to maintain larger chunk sizes
webui_workers = 4

if webui_workers and webui_workers > 0:
run_args.extend(['--mp', str(webui_workers)])

run_args.append('--keep-status')
if cc_cohorts_path != '':
run_args.extend(['--module-option',f'casecontrol.cohorts={cc_cohorts_path}'])
Expand All @@ -323,7 +350,7 @@ async def resubmit (request):
if job_id not in job_ids:
job_ids.append(job_id)
run_jobs_info['job_ids'] = job_ids
qitem = {'cmd': 'submit', 'job_id': job_id, 'run_args': run_args}
qitem = {'cmd': 'submit', 'job_id': job_id, 'run_args': run_args, 'job_dir': job_dir}
job_queue.put(qitem)
status_json['status'] = 'Submitted'
with open(status_json_path, 'w') as wf:
Expand Down Expand Up @@ -454,6 +481,18 @@ async def submit (request):
if servermode:
run_args.append('--writeadmindb')
run_args.extend(['--jobid', job_id])

# Control worker count for WebUI execution (for chunk size optimization)
webui_workers = None
if 'webui_max_workers' in system_conf:
webui_workers = system_conf['webui_max_workers']
else:
# Default value: moderate worker count (4-8 range) to maintain larger chunk sizes
webui_workers = 4

if webui_workers and webui_workers > 0:
run_args.extend(['--mp', str(webui_workers)])

run_args.append('--keep-status')
if cc_cohorts_path is not None:
run_args.extend(['--module-option',f'casecontrol.cohorts={cc_cohorts_path}'])
Expand All @@ -462,7 +501,7 @@ async def submit (request):
job_ids = run_jobs_info['job_ids']
job_ids.append(job_id)
run_jobs_info['job_ids'] = job_ids
qitem = {'cmd': 'submit', 'job_id': job_id, 'run_args': run_args}
qitem = {'cmd': 'submit', 'job_id': job_id, 'run_args': run_args, 'job_dir': job_dir}
job_queue.put(qitem)
status = {'status': 'Submitted'}
job.set_info_values(status=status)
Expand Down Expand Up @@ -943,24 +982,70 @@ def __init__(self, main_loop):
self.queue = []
global system_conf
self.max_num_concurrent_jobs = int(system_conf['max_num_concurrent_jobs'])
self.use_slurm = system_conf.get('use_slurm', False) # Slurm usage setting
self.run_args = {}
self.job_dirs = {} # Add dictionary to store job directories
self.run_jobs_info = run_jobs_info
self.run_jobs_info['job_ids'] = []
self.loop = main_loop

# Always initialize Slurm-related variables (remain empty if not used)
self.slurm_job_ids = {} # Mapping from OpenCRAVAT job_id to Slurm job_id

# Load Slurm options
self.slurm_options = system_conf.get('slurm_options', {})

def add_job(self, qitem):
self.queue.append(qitem['job_id'])
self.run_args[qitem['job_id']] = qitem['run_args']

def get_process(self, uid):
# Return the process for a job
return self.running_jobs.get(uid)
# Save job_dir only if it exists
if 'job_dir' in qitem:
self.job_dirs[qitem['job_id']] = qitem['job_dir']
else:
# If job_dir doesn't exist, infer from run_args or set appropriate directory
# Usually the last argument in run_args should be the output directory
run_args = qitem['run_args']
if len(run_args) > 0 and os.path.exists(run_args[-1]):
self.job_dirs[qitem['job_id']] = run_args[-1]
else:
# Use a fallback directory based on job_id
import cravat.admin_util as au
jobs_dirs = au.get_jobs_dirs()
if jobs_dirs:
self.job_dirs[qitem['job_id']] = os.path.join(jobs_dirs[0], qitem['job_id'])

def get_job_info(self, uid):
# Get job information (Slurm job ID when using Slurm, process when running locally)
if self.use_slurm:
return self.slurm_job_ids.get(uid)
else:
return self.running_jobs.get(uid)

async def cancel_job(self, uid):
if self.use_slurm:
await self._cancel_slurm_job(uid)
else:
await self._cancel_local_job(uid)
self.clean_jobs(uid)

async def _cancel_slurm_job(self, uid):
slurm_job_id = self.slurm_job_ids.get(uid)
if slurm_job_id:
try:
# Slurmジョブをキャンセル
result = subprocess.run(['scancel', str(slurm_job_id)],
capture_output=True, text=True)
if result.returncode == 0:
print(f'Cancelled Slurm job {slurm_job_id} for OpenCRAVAT job {uid}')
else:
print(f'Failed to cancel Slurm job {slurm_job_id}: {result.stderr}')
except Exception as e:
print(f'Error cancelling Slurm job {slurm_job_id}: {e}')

async def _cancel_local_job(self, uid):
p = self.running_jobs.get(uid)
p.poll()
pl = platform.platform().lower()
if p:
pl = platform.platform().lower()
if pl.startswith('windows'):
# proc.kill() doesn't work well on windows
subprocess.Popen("TASKKILL /F /PID {pid} /T".format(pid=p.pid),stdout=subprocess.PIPE,stderr=subprocess.PIPE)
Expand All @@ -983,39 +1068,156 @@ async def cancel_job(self, uid):
continue
else:
p.kill()
self.clean_jobs(id)

def clean_jobs(self, uid):
# Clean up completed jobs
if self.use_slurm:
self._clean_slurm_jobs()
else:
self._clean_local_jobs()

def _clean_slurm_jobs(self):
# Check the status of Slurm jobs and clean up completed jobs
to_del = []
for oc_job_id, slurm_job_id in self.slurm_job_ids.items():
try:
# Check job status with squeue
result = subprocess.run(['squeue', '-j', str(slurm_job_id), '--noheader', '--format=%T'],
capture_output=True, text=True, timeout=10)

if result.returncode != 0 or not result.stdout.strip():
# Job not found or error = completed
to_del.append(oc_job_id)
else:
job_state = result.stdout.strip()
# Check terminal states like COMPLETED, FAILED, CANCELLED, TIMEOUT
if job_state in ['COMPLETED', 'FAILED', 'CANCELLED', 'TIMEOUT', 'NODE_FAIL']:
to_del.append(oc_job_id)
except (subprocess.TimeoutExpired, Exception) as e:
print(f'Error checking Slurm job {slurm_job_id}: {e}')
# Consider as completed when error occurs
to_del.append(oc_job_id)

for oc_job_id in to_del:
if oc_job_id in self.slurm_job_ids:
del self.slurm_job_ids[oc_job_id]
if oc_job_id in self.running_jobs:
del self.running_jobs[oc_job_id]
job_ids = self.run_jobs_info['job_ids']
if oc_job_id in job_ids:
job_ids.remove(oc_job_id)
self.run_jobs_info['job_ids'] = job_ids

def _clean_local_jobs(self):
# Check the status of local processes and clean up completed jobs
to_del = []
for uid, p in self.running_jobs.items():
if p.poll() is not None:
to_del.append(uid)
for uid in to_del:
del self.running_jobs[uid]
job_ids = self.run_jobs_info['job_ids']
job_ids.remove(uid)
self.run_jobs_info['job_ids'] = job_ids
if uid in job_ids:
job_ids.remove(uid)
self.run_jobs_info['job_ids'] = job_ids

def list_running_jobs(self):
# List currently tracked jobs
return list(self.running_jobs.keys())

def run_available_jobs (self):
# Run jobs in available slots
if self.use_slurm:
self._run_slurm_jobs()
else:
self._run_local_jobs()

def _run_slurm_jobs(self):
# Execute jobs with Slurm
num_available_slot = self.max_num_concurrent_jobs - len(self.running_jobs)
if num_available_slot > 0 and len(self.queue) > 0:
for i in range(num_available_slot):
if len(self.queue) > 0:
job_id = self.queue.pop(0)
run_args = self.run_args[job_id]

# Skip if job_dir does not exist
if job_id not in self.job_dirs:
print(f'Warning: Job directory not found for job {job_id}, skipping Slurm execution')
del self.run_args[job_id]
continue

job_dir = self.job_dirs[job_id]
del self.run_args[job_id]
del self.job_dirs[job_id]

# Generate shell script for Slurm sbatch
batch_script_path = os.path.join(job_dir, 'OC_batch.sh')
oc_command = ' '.join(run_args)

# Get Slurm batch script header and footer from system_conf['slurm_script_header'] and ['slurm_script_footer']
script_header = system_conf.get('slurm_script_header', '#!/bin/bash\n')
script_footer = system_conf.get('slurm_script_footer', 'deactivate\n')
# Concatenate header, command, and footer
script_content = f"{script_header}{oc_command}\n{script_footer}"

with open(batch_script_path, 'w') as f:
f.write(script_content)

# Grant execute permission to the script
os.chmod(batch_script_path, 0o755)
# Execute the script with sbatch and obtain the Slurm JOBID
try:
# Construct sbatch command
sbatch_cmd = ['sbatch']

# Add Slurm configuration options
if hasattr(self, 'slurm_options') and self.slurm_options:
for option, value in self.slurm_options.items():
if value is not None and value != '':
if option.startswith('--'):
# If the option already starts with '--'
sbatch_cmd.extend([option, str(value)])
else:
# Add '--' prefix to the option
sbatch_cmd.extend([f'--{option}', str(value)])

# Add script path
sbatch_cmd.append(batch_script_path)

result = subprocess.run(sbatch_cmd,
capture_output=True, text=True)
if result.returncode == 0:
# Extract JOBID from sbatch output ("Submitted batch job 12345")
output = result.stdout.strip()
slurm_job_id = output.split()[-1] # The last number is the JOBID

self.running_jobs[job_id] = True # Running flag
self.slurm_job_ids[job_id] = slurm_job_id
print(f'Submitted OpenCRAVAT job {job_id} as Slurm job {slurm_job_id} with options: {sbatch_cmd[1:-1]}')
else:
print(f'Failed to submit job {job_id}: {result.stderr}')
except Exception as e:
print(f'Error submitting job {job_id}: {e}')

def _run_local_jobs(self):
# Run jobs locally
num_available_slot = self.max_num_concurrent_jobs - len(self.running_jobs)
if num_available_slot > 0 and len(self.queue) > 0:
for i in range(num_available_slot):
if len(self.queue) > 0:
job_id = self.queue.pop(0)
run_args = self.run_args[job_id]
del self.run_args[job_id]
if job_id in self.job_dirs:
del self.job_dirs[job_id]
p = subprocess.Popen(run_args)
self.running_jobs[job_id] = p

async def delete_job (self, qitem):
global filerouter
job_id = qitem['job_id']
if self.get_process(job_id) is not None:
if self.get_job_info(job_id) is not None:
print('\nKilling job {}'.format(job_id))
await self.cancel_job(job_id)
job_dir = qitem['job_dir']
Expand Down
64 changes: 64 additions & 0 deletions slurm_config_example.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# OpenCRAVAT Slurm configuration example
#
# Add the following settings to your cravat-system.yml

# Enable Slurm usage
use_slurm: true

# Set the number of workers for WebUI execution.
# Ensures OpenCRAVAT uses only the CPUs allocated by Slurm,
# preventing overuse of node resources and optimizing chunk size.
webui_max_workers: 4 # Recommended: 4-8 (adjust according to your environment)

# Slurm sbatch option settings
# The following settings are passed as arguments to the sbatch command when executed.
# example: sbatch --account=your_account_name --partition=cpu ...
slurm_options:
# Specify account (often required)
account: "your_account_name"

# Specify partition
partition: "cpu"
# Additional Slurm options can be added similarly
# Example:
# nodes: "1"
# ntasks: "1"
# gres: "gpu:1"

# Slurm batch script customization
# Script header section (shebang, environment setup, etc.)
# The following script is prepended to the Slurm job script.
# Script will save "OC_batch.sh" in the job's directory.
# slurm_script_header and slurm_script_footer sandwiches oc run command.
# shebang is necessary.
slurm_script_header: |
#!/bin/bash
#SBATCH --job-name=opencravat
#SBATCH --output=opencravat-%j.out
#SBATCH --error=opencravat-%j.err

# Environment setup
# Activate OpenCRAVAT environment.
# Replace the following line with the appropriate command for your system:
# - For virtualenv: source "$HOME/oc/bin/activate"
# - For module system: module load opencravat
# - For conda: conda activate opencravat
# - Or use export PATH/other setup as needed
source "$HOME/oc/bin/activate"

# Job start log
echo "Job started at $(date)"
echo "Running on node: $(hostname)"
echo "Working directory: $(pwd)"

# Script footer section (post-processing, log output, etc.)
# slurm_script_footer is optional.
slurm_script_footer: |

# Job end log
echo "Job finished at $(date)"

# Deactivate virtual environment
deactivate