Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "transformerlab"
version = "0.0.12"
version = "0.0.13"
description = "Python SDK for Transformer Lab"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
3 changes: 2 additions & 1 deletion src/lab/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from .experiment import Experiment
from .model import Model
from .dataset import Dataset
from .task import Task

from .lab_facade import Lab

# Provide a convenient singleton facade for simple usage
lab = Lab()

__all__ = ["WORKSPACE_DIR", "HOME_DIR", Job, Experiment, Model, Dataset, "lab", "Lab"]
__all__ = ["WORKSPACE_DIR", "HOME_DIR", Job, Experiment, Model, Dataset, Task, "lab", "Lab"]
4 changes: 4 additions & 0 deletions src/lab/dirs.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ def plugin_dir_by_name(plugin_name: str) -> str:
DATASETS_DIR = os.path.join(get_workspace_dir(), "datasets")
os.makedirs(name=DATASETS_DIR, exist_ok=True)

# TASKS_DIR
TASKS_DIR = os.path.join(WORKSPACE_DIR, "tasks")
os.makedirs(name=TASKS_DIR, exist_ok=True)


def dataset_dir_by_id(dataset_id: str) -> str:
return os.path.join(DATASETS_DIR, dataset_id)
Expand Down
117 changes: 117 additions & 0 deletions src/lab/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import os
from datetime import datetime
from werkzeug.utils import secure_filename

from .dirs import TASKS_DIR
from .labresource import BaseLabResource


class Task(BaseLabResource):
def get_dir(self):
"""Abstract method on BaseLabResource"""
task_id_safe = secure_filename(str(self.id))
return os.path.join(TASKS_DIR, task_id_safe)

def _default_json(self):
# Default metadata modeled after API tasks table fields
return {
"id": self.id,
"name": "",
"type": "",
"inputs": {},
"config": {},
"plugin": "",
"outputs": {},
"experiment_id": None,
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat(),
}

def set_metadata(self, *, name: str | None = None, type: str | None = None,
inputs: dict | None = None, config: dict | None = None,
plugin: str | None = None, outputs: dict | None = None,
experiment_id: int | None = None):
"""Set task metadata"""
data = self.get_json_data()
if name is not None:
data["name"] = name
if type is not None:
data["type"] = type
if inputs is not None:
data["inputs"] = inputs
if config is not None:
data["config"] = config
if plugin is not None:
data["plugin"] = plugin
if outputs is not None:
data["outputs"] = outputs
if experiment_id is not None:
data["experiment_id"] = experiment_id

# Always update the updated_at timestamp
data["updated_at"] = datetime.utcnow().isoformat()

self._set_json_data(data)

def get_metadata(self):
"""Get task metadata"""
return self.get_json_data()

@staticmethod
def list_all():
"""List all tasks in the filesystem"""
results = []
if not os.path.isdir(TASKS_DIR):
return results
for entry in os.listdir(TASKS_DIR):
full = os.path.join(TASKS_DIR, entry)
if not os.path.isdir(full):
continue
# Attempt to read index.json (or latest snapshot)
try:
task = Task(entry)
results.append(task.get_metadata())
except Exception:
continue
# Sort by created_at descending to match database behavior
results.sort(key=lambda x: x.get("created_at", ""), reverse=True)
return results

@staticmethod
def list_by_type(task_type: str):
"""List all tasks of a specific type"""
all_tasks = Task.list_all()
return [task for task in all_tasks if task.get("type") == task_type]

@staticmethod
def list_by_experiment(experiment_id: int):
"""List all tasks for a specific experiment"""
all_tasks = Task.list_all()
return [task for task in all_tasks if task.get("experiment_id") == experiment_id]

@staticmethod
def list_by_type_in_experiment(task_type: str, experiment_id: int):
"""List all tasks of a specific type in a specific experiment"""
all_tasks = Task.list_all()
return [task for task in all_tasks
if task.get("type") == task_type and task.get("experiment_id") == experiment_id]

@staticmethod
def get_by_id(task_id: str):
"""Get a specific task by ID"""
try:
task = Task.get(task_id)
return task.get_metadata()
except FileNotFoundError:
return None

@staticmethod
def delete_all():
"""Delete all tasks"""
if not os.path.isdir(TASKS_DIR):
return
for entry in os.listdir(TASKS_DIR):
full = os.path.join(TASKS_DIR, entry)
if os.path.isdir(full):
import shutil
shutil.rmtree(full)
Loading