Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions api.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@
from lab.dirs import get_workspace_dir
from lab import dirs as lab_dirs
from transformerlab.shared import dirs
from transformerlab.db.filesystem_migrations import migrate_datasets_table_to_filesystem, migrate_models_table_to_filesystem
from transformerlab.db.filesystem_migrations import (
migrate_datasets_table_to_filesystem,
migrate_models_table_to_filesystem,
migrate_tasks_table_to_filesystem,
)
from transformerlab.shared.request_context import set_current_org_id
from lab.dirs import set_organization_id as lab_set_org_id

Expand Down Expand Up @@ -101,9 +105,9 @@ async def lifespan(app: FastAPI):
if "--reload" in sys.argv:
await install_all_plugins()
# run the migrations
asyncio.create_task(migrate())
asyncio.create_task(migrate_models_table_to_filesystem())
asyncio.create_task(migrate_datasets_table_to_filesystem())
asyncio.create_task(migrate_tasks_table_to_filesystem())
asyncio.create_task(run_over_and_over())
print("FastAPI LIFESPAN: 🏁 🏁 🏁 Begin API Server 🏁 🏁 🏁", flush=True)
yield
Expand All @@ -114,16 +118,6 @@ async def lifespan(app: FastAPI):
print("FastAPI LIFESPAN: Complete")


# the migrate function only runs the conversion function if no tasks are already present
async def migrate():
if len(await tasks.tasks_get_all()) == 0:
for exp in await experiment.experiments_get_all():
await tasks.convert_all_to_tasks(exp["id"])





async def run_over_and_over():
"""Every three seconds, check for new jobs to run."""
while True:
Expand Down
2 changes: 1 addition & 1 deletion requirements-no-gpu-uv.txt
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ tqdm==4.66.5
# peft
# sentence-transformers
# transformers
transformerlab==0.0.12
transformerlab==0.0.13
# via -r requirements.in
transformerlab-inference==0.2.49
# via -r requirements.in
Expand Down
2 changes: 1 addition & 1 deletion requirements-rocm-uv.txt
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ tqdm==4.66.5
# peft
# sentence-transformers
# transformers
transformerlab==0.0.12
transformerlab==0.0.13
# via -r requirements-rocm.in
transformerlab-inference==0.2.49
# via -r requirements-rocm.in
Expand Down
2 changes: 1 addition & 1 deletion requirements-rocm.in
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ hf_xet
macmon-python
mcp[cli]
transformerlab-inference==0.2.49
transformerlab==0.0.12
transformerlab==0.0.13
diffusers==0.33.1
pyrsmi
controlnet_aux==0.0.10
Expand Down
2 changes: 1 addition & 1 deletion requirements-uv.txt
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ tqdm==4.66.5
# peft
# sentence-transformers
# transformers
transformerlab==0.0.12
transformerlab==0.0.13
# via -r requirements.in
transformerlab-inference==0.2.49
# via -r requirements.in
Expand Down
2 changes: 1 addition & 1 deletion requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ markitdown[all]
hf_xet
macmon-python
transformerlab-inference==0.2.49
transformerlab==0.0.12
transformerlab==0.0.13
diffusers==0.33.1
nvidia-ml-py
mcp[cli]
Expand Down
27 changes: 15 additions & 12 deletions test/api/test_experiment_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
import json
import transformerlab.db.db as db
import transformerlab.db.workflows as db_workflows
from transformerlab.services.tasks_service import tasks_service
from lab.dirs import get_workspace_dir

WORKSPACE_DIR = get_workspace_dir()
import pytest


@pytest.mark.skip("skipping until migrations are done")
async def test_export_experiment(client):
"""Test exporting an experiment to JSON format"""
# Create a test experiment
Expand All @@ -23,13 +24,13 @@ async def test_export_experiment(client):
"batch_size": "4",
"learning_rate": "0.0001",
}
await db.add_task(
tasks_service.add_task(
name="test_train_task",
Type="TRAIN",
inputs=json.dumps({"model_name": "test-model", "dataset_name": "test-dataset"}),
config=json.dumps(train_config),
task_type="TRAIN",
inputs={"model_name": "test-model", "dataset_name": "test-dataset"},
config=train_config,
plugin="test_trainer",
outputs="{}",
outputs={},
experiment_id=experiment_id,
)

Expand All @@ -42,13 +43,13 @@ async def test_export_experiment(client):
"script_parameters": {"tasks": ["mmlu"], "limit": 0.5},
"eval_dataset": "test-eval-dataset",
}
await db.add_task(
tasks_service.add_task(
name="test_eval_task",
Type="EVAL",
inputs=json.dumps({"model_name": "test-model-2", "dataset_name": "test-eval-dataset"}),
config=json.dumps(eval_config),
task_type="EVAL",
inputs={"model_name": "test-model-2", "dataset_name": "test-eval-dataset"},
config=eval_config,
plugin="test_evaluator",
outputs=json.dumps({"eval_results": "{}"}),
outputs={"eval_results": "{}"},
experiment_id=experiment_id,
)

Expand All @@ -68,6 +69,8 @@ async def test_export_experiment(client):
# The response should be a JSON file
assert response.headers["content-type"] == "application/json"

WORKSPACE_DIR = get_workspace_dir()

# Read the exported file from workspace directory
export_file = os.path.join(WORKSPACE_DIR, f"{test_experiment_name}_export.json")
assert os.path.exists(export_file)
Expand Down
39 changes: 0 additions & 39 deletions test/db/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@
save_plugin,
config_get,
config_set,
add_task,
update_task,
tasks_get_all,
tasks_get_by_type,
tasks_get_by_type_in_experiment,
delete_task,
tasks_delete_all,
tasks_get_by_id,
get_training_template,
get_training_templates,
create_training_template,
Expand Down Expand Up @@ -139,12 +131,6 @@ async def test_job_cancel_in_progress_jobs_sets_cancelled():
assert job.get("status") == "CANCELLED"


@pytest.mark.asyncio
async def test_tasks_get_by_id_returns_none_for_missing():
task = await tasks_get_by_id(999999)
assert task is None


@pytest.mark.asyncio
async def test_get_training_template_and_by_name_returns_none_for_missing():
tmpl = await get_training_template(999999)
Expand Down Expand Up @@ -249,31 +235,6 @@ async def test_experiment_update_and_update_config_and_save_prompt_template(test
assert exp_config.get("prompt_template") == test_prompt


@pytest.mark.asyncio
async def test_task_crud(test_experiment):
await add_task("task1", "TYPE", "{}", "{}", "plugin", "{}", test_experiment)
tasks = await tasks_get_all()
assert any(t["name"] == "task1" for t in tasks)
task = tasks[0]
await update_task(task["id"], {"inputs": "[]", "config": "{}", "outputs": "[]", "name": "task1_updated"})
updated = await tasks_get_by_id(task["id"])
assert updated["name"] == "task1_updated"
await delete_task(task["id"])
deleted = await tasks_get_by_id(task["id"])
assert deleted is None


@pytest.mark.asyncio
async def test_tasks_get_by_type_and_in_experiment(test_experiment):
await add_task("task2", "TYPE2", "{}", "{}", "plugin", "{}", test_experiment)
by_type = await tasks_get_by_type("TYPE2")
assert any(t["name"] == "task2" for t in by_type)
by_type_exp = await tasks_get_by_type_in_experiment("TYPE2", test_experiment)
assert any(t["name"] == "task2" for t in by_type_exp)
await tasks_delete_all()
all_tasks = await tasks_get_all()
assert len(all_tasks) == 0


@pytest.mark.asyncio
async def test_training_template_crud():
Expand Down
120 changes: 4 additions & 116 deletions transformerlab/db/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import AsyncGenerator

from transformerlab.db.jobs import job_create, job_delete, jobs_get_by_experiment
from transformerlab.services.tasks_service import tasks_service
from transformerlab.db.workflows import (
workflow_delete_by_id,
workflows_get_from_experiment,
Expand All @@ -31,119 +32,6 @@ async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
yield session


###############
# TASKS MODEL
###############


async def add_task(name, Type, inputs, config, plugin, outputs, experiment_id):
async with async_session() as session:
stmt = insert(models.Task).values(
name=name,
type=Type,
inputs=inputs,
config=config,
plugin=plugin,
outputs=outputs,
experiment_id=experiment_id,
)
await session.execute(stmt)
await session.commit()
return


async def update_task(task_id, new_task):
async with async_session() as session:
values = {}
if "inputs" in new_task:
values["inputs"] = new_task["inputs"]
if "config" in new_task:
values["config"] = new_task["config"]
if "outputs" in new_task:
values["outputs"] = new_task["outputs"]
if "name" in new_task and new_task["name"] != "":
values["name"] = new_task["name"]
if values:
await session.execute(update(models.Task).where(models.Task.id == task_id).values(**values))
await session.commit()
return


async def tasks_get_all():
async with async_session() as session:
result = await session.execute(select(models.Task).order_by(models.Task.created_at.desc()))
tasks = result.scalars().all()
data = []
for task in tasks:
row = sqlalchemy_to_dict(task)
data.append(row)
return data


async def tasks_get_by_type(Type):
async with async_session() as session:
result = await session.execute(
select(models.Task).where(models.Task.type == Type).order_by(models.Task.created_at.desc())
)
tasks = result.scalars().all()
data = []
for task in tasks:
row = sqlalchemy_to_dict(task)
data.append(row)
return data


async def tasks_get_by_type_in_experiment(Type, experiment_id):
async with async_session() as session:
result = await session.execute(
select(models.Task)
.where(models.Task.type == Type, models.Task.experiment_id == experiment_id)
.order_by(models.Task.created_at.desc())
)
tasks = result.scalars().all()
data = []
for task in tasks:
row = sqlalchemy_to_dict(task)
data.append(row)
return data


async def delete_task(task_id):
async with async_session() as session:
await session.execute(delete(models.Task).where(models.Task.id == task_id))
await session.commit()
return


async def tasks_delete_all():
async with async_session() as session:
await session.execute(delete(models.Task))
await session.commit()
return


async def tasks_get_by_id(task_id):
async with async_session() as session:
result = await session.execute(
select(models.Task).where(models.Task.id == task_id).order_by(models.Task.created_at.desc()).limit(1)
)
task = result.scalar_one_or_none()
if task is None:
return None
return sqlalchemy_to_dict(task)


async def tasks_get_by_experiment(experiment_id):
"""Get all tasks for a specific experiment"""
async with async_session() as session:
result = await session.execute(
select(models.Task)
.where(models.Task.experiment_id == experiment_id)
.order_by(models.Task.created_at.desc())
)
tasks = result.scalars().all()
return [sqlalchemy_to_dict(task) for task in tasks]


async def get_training_template(id):
async with async_session() as session:
Expand Down Expand Up @@ -341,10 +229,10 @@ async def experiment_delete(id):
result = await session.execute(select(models.Experiment).where(models.Experiment.id == id))
experiment = result.scalar_one_or_none()
if experiment:
# Delete all associated tasks using the existing delete method
tasks = await tasks_get_by_experiment(id)
# Delete all associated tasks using the filesystem service
tasks = tasks_service.tasks_get_by_experiment(id)
for task in tasks:
await delete_task(task["id"])
tasks_service.delete_task(int(task["id"]))

# Delete all associated jobs using the job delete method
jobs = await jobs_get_by_experiment(id)
Expand Down
Loading
Loading