Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5d20af4
Basic s3 abstractions and framework for storage
jordanrfrazier Oct 8, 2025
6bb3b12
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 8, 2025
fd02b38
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Oct 8, 2025
c4b70ed
Remove duplicate code, simplifications, comments
jordanrfrazier Oct 8, 2025
5e30e9d
csv and json to data comp fixes
jordanrfrazier Oct 8, 2025
000fb4c
Add hash lock to postgres migrations
jordanrfrazier Oct 8, 2025
aa2880f
Add guarddrails against using some components while in s3 mode
jordanrfrazier Oct 9, 2025
48c46cc
Clean up tests; use real env vars
jordanrfrazier Oct 9, 2025
865d8a1
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 9, 2025
4138179
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Oct 9, 2025
6055057
Claude fixed up some FE array handling and BE exception types
jordanrfrazier Oct 10, 2025
f6db4c5
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 10, 2025
b7c1019
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Oct 10, 2025
fb1e674
[autofix.ci] apply automated fixes (attempt 3/3)
autofix-ci[bot] Oct 10, 2025
e74836d
one comment
jordanrfrazier Oct 10, 2025
6b89a33
use issubclass in the pool creation
zzzming Oct 11, 2025
0ed8661
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 11, 2025
cb38603
Use static lock key for alembic migrations - TESTING
jordanrfrazier Oct 12, 2025
64c5712
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 12, 2025
d818213
Try adding explicit connect args with prepare threshold to disable al…
jordanrfrazier Oct 14, 2025
8b9e7a9
Fix previous -- only add prepare threshold for postgres
jordanrfrazier Oct 14, 2025
586dce7
add LANGFLOW_MIGRATION_LOCK_NAMESPACE for the lock key
zzzming Oct 15, 2025
87ea318
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 23, 2025
2ba02d8
Scope sessions to read/write and normalize using single mgmt function…
jordanrfrazier Oct 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/Configuration/environment-variables.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ If it detects a supported environment variable, then it automatically adopts the
LANGFLOW_CONFIG_DIR=/path/to/config/
LANGFLOW_DATABASE_URL=postgresql://user:password@localhost:5432/langflow
LANGFLOW_DEV=False
LANGFLOW_MIGRATION_LOCK_NAMESPACE=my-namespace
LANGFLOW_FALLBACK_TO_ENV_VAR=False
LANGFLOW_HEALTH_CHECK_MAX_RETRIES=5
LANGFLOW_HOST=localhost
Expand Down
2 changes: 2 additions & 0 deletions docs/docs/Develop/memory.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ The following settings allow you to fine-tune your database connection pool and
```
LANGFLOW_DB_CONNECTION_SETTINGS='{"pool_size": 20, "max_overflow": 30, "pool_timeout": 30, "pool_pre_ping": true, "pool_recycle": 1800, "echo": false}'
LANGFLOW_DB_CONNECT_TIMEOUT=20
LANGFLOW_MIGRATION_LOCK_NAMESPACE=my-namespace
```

- `pool_size`: Maximum number of database connections to keep in the pool (default: 20)
Expand All @@ -70,6 +71,7 @@ LANGFLOW_DB_CONNECT_TIMEOUT=20
- `pool_recycle`: Number of seconds after which a connection is automatically recycled (default: 1800, or 30 minutes)
- `echo`: If true, SQL queries are logged for debugging purposes (default: false)
- `LANGFLOW_DB_CONNECT_TIMEOUT`: Maximum number of seconds to wait when establishing a new database connection (default: 20)
- `LANGFLOW_MIGRATION_LOCK_NAMESPACE`: Optional namespace identifier for PostgreSQL advisory lock during migrations. If not provided, a hash of the database URL will be used. Useful when multiple Langflow instances share the same database and need coordinated migration locking

## Configure cache memory

Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ dependencies = [
"langchain-pinecone>=0.2.8,<1.0.0",
"langchain-mistralai==0.2.3",
"langchain-chroma>=0.2.6,<1.0.0",
"langchain-aws==0.2.33",
"langchain-aws>=0.2.33,<1.0.0",
"langchain-unstructured==0.1.5",
"langchain-milvus==0.1.7",
"langchain-mongodb==0.7.0",
Expand Down Expand Up @@ -133,6 +133,7 @@ dependencies = [
"easyocr>=1.7.2; sys_platform != 'darwin' or platform_machine != 'x86_64'",
"opencv-python>=4.11; sys_platform != 'darwin' or platform_machine != 'x86_64'",
"pytest-codspeed==4.0.0", # Pinned to avoid python3.13 + intel mac compatibility issues
"aioboto3>=15.2.0,<16.0.0",
]

[dependency-groups]
Expand Down
1 change: 0 additions & 1 deletion src/backend/base/langflow/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,8 +877,7 @@

api_key_create = ApiKeyCreate(name="CLI")
unmasked_api_key = await create_api_key(session, api_key_create, user_id=superuser.id)
await session.commit()
return unmasked_api_key

Check failure on line 880 in src/backend/base/langflow/__main__.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (RET504)

src/backend/base/langflow/__main__.py:880:20: RET504 Unnecessary assignment to `unmasked_api_key` before `return` statement

unmasked_api_key = asyncio.run(aapi_key())
# Create a banner to display the API key and tell the user it won't be shown again
Expand Down
29 changes: 26 additions & 3 deletions src/backend/base/langflow/alembic/env.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# noqa: INP001
import asyncio
import hashlib
import logging
import os
from logging.config import fileConfig

from alembic import context
Expand All @@ -9,6 +12,8 @@

from langflow.services.database.service import SQLModel

logger = logging.getLogger(__name__)

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
Expand Down Expand Up @@ -85,16 +90,34 @@

with context.begin_transaction():
if connection.dialect.name == "postgresql":
connection.execute(text("SET LOCAL lock_timeout = '60s';"))
connection.execute(text("SELECT pg_advisory_xact_lock(112233);"))
# Use namespace from environment variable if provided, otherwise use default static key
namespace = os.getenv("LANGFLOW_MIGRATION_LOCK_NAMESPACE")
if namespace:
lock_key = int(hashlib.sha256(namespace.encode()).hexdigest()[:16], 16) % (2**63 - 1)
logger.info(f"Using migration lock namespace: {namespace}, lock_key: {lock_key}")

Check failure on line 97 in src/backend/base/langflow/alembic/env.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (G004)

src/backend/base/langflow/alembic/env.py:97:29: G004 Logging statement uses f-string
else:
lock_key = 11223344
logger.info(f"Using default migration lock_key: {lock_key}")

Check failure on line 100 in src/backend/base/langflow/alembic/env.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (G004)

src/backend/base/langflow/alembic/env.py:100:29: G004 Logging statement uses f-string
connection.execute(text("SET LOCAL lock_timeout = '180s';"))
connection.execute(text(f"SELECT pg_advisory_xact_lock({lock_key});"))
context.run_migrations()


async def _run_async_migrations() -> None:
# Disable prepared statements for PostgreSQL (required for PgBouncer compatibility)
# SQLite doesn't support this parameter, so only add it for PostgreSQL
config_section = config.get_section(config.config_ini_section, {})
db_url = config_section.get("sqlalchemy.url", "")

connect_args = {}
if "postgresql" in db_url:
connect_args["prepare_threshold"] = None

connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
config_section,
prefix="sqlalchemy.",
poolclass=pool.NullPool,
connect_args=connect_args,
)

if connectable.dialect.name == "sqlite":
Expand Down
7 changes: 5 additions & 2 deletions src/backend/base/langflow/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from langflow.services.database.models.transactions.model import TransactionTable
from langflow.services.database.models.user.model import User
from langflow.services.database.models.vertex_builds.model import VertexBuildTable
from langflow.services.deps import get_session, session_scope
from langflow.services.deps import get_session, get_session_with_commit
from langflow.services.store.utils import get_lf_version_from_pypi

if TYPE_CHECKING:
Expand All @@ -34,7 +34,10 @@

CurrentActiveUser = Annotated[User, Depends(get_current_active_user)]
CurrentActiveMCPUser = Annotated[User, Depends(get_current_active_user_mcp)]
DbSession = Annotated[AsyncSession, Depends(get_session)]
# DbSession with auto-commit for write operations
DbSession = Annotated[AsyncSession, Depends(get_session_with_commit)]
# DbSessionReadOnly for read-only operations (no auto-commit, reduces lock contention)
DbSessionReadOnly = Annotated[AsyncSession, Depends(get_session)]


class EventDeliveryType(str, Enum):
Expand Down Expand Up @@ -150,7 +153,7 @@


async def _get_flow_name(flow_id: uuid.UUID) -> str:
async with session_scope() as session:

Check failure on line 156 in src/backend/base/langflow/api/utils.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (F821)

src/backend/base/langflow/api/utils.py:156:16: F821 Undefined name `session_scope`
flow = await session.get(Flow, flow_id)
if flow is None:
msg = f"Flow {flow_id} not found"
Expand Down Expand Up @@ -351,7 +354,7 @@
raise HTTPException(status_code=400, detail="No client_id cookie found")

# Check if the flow is public
async with session_scope() as session:

Check failure on line 357 in src/backend/base/langflow/api/utils.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (F821)

src/backend/base/langflow/api/utils.py:357:16: F821 Undefined name `session_scope`
from sqlmodel import select

from langflow.services.database.models.flow.model import AccessTypeEnum, Flow
Expand Down
14 changes: 8 additions & 6 deletions src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
from langflow.services.deps import (
get_chat_service,
get_queue_service,
get_session,
get_telemetry_service,
session_scope,
)
Expand Down Expand Up @@ -286,11 +285,14 @@ async def build_vertex(
if isinstance(cache, CacheMiss):
# If there's no cache
await logger.awarning(f"No cache found for {flow_id_str}. Building graph starting at {vertex_id}")
graph = await build_graph_from_db(
flow_id=flow_id,
session=await anext(get_session()),
chat_service=chat_service,
)
from langflow.services.deps import session_scope

async with session_scope() as session:
graph = await build_graph_from_db(
flow_id=flow_id,
session=session,
chat_service=chat_service,
)
else:
graph = cache.get("result")
await graph.initialize_run()
Expand Down
33 changes: 21 additions & 12 deletions src/backend/base/langflow/api/v1/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,20 @@
folder_name: str,
file_name: str,
):
"""Download a profile picture.
Profile pictures are system files bundled with the package, served from the installation directory.
"""

Check failure on line 135 in src/backend/base/langflow/api/v1/files.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (D205)

src/backend/base/langflow/api/v1/files.py:133:5: D205 1 blank line required between summary line and description
try:
storage_service = get_storage_service()
# Profile pictures are in the package installation directory
package_dir = Path(__file__).parent.parent.parent / "initial_setup" / "profile_pictures"
file_path = package_dir / folder_name / file_name

if not file_path.exists():
raise HTTPException(status_code=404, detail="Profile picture not found")

extension = file_name.split(".")[-1]
config_dir = storage_service.settings_service.settings.config_dir
config_path = Path(config_dir) # type: ignore[arg-type]
folder_path = config_path / "profile_pictures" / folder_name
content_type = build_content_type_from_extension(extension)
file_content = await storage_service.get_file(flow_id=folder_path, file_name=file_name) # type: ignore[arg-type]
file_content = file_path.read_bytes()
return StreamingResponse(BytesIO(file_content), media_type=content_type)

except Exception as e:
Expand All @@ -146,16 +152,19 @@

@router.get("/profile_pictures/list")
async def list_profile_pictures():
"""List available profile pictures.
Profile pictures are system files bundled with the package, served from the installation directory.
"""

Check failure on line 157 in src/backend/base/langflow/api/v1/files.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (D205)

src/backend/base/langflow/api/v1/files.py:155:5: D205 1 blank line required between summary line and description
try:
storage_service = get_storage_service()
config_dir = storage_service.settings_service.settings.config_dir
config_path = Path(config_dir) # type: ignore[arg-type]
# Profile pictures are in the package installation directory
package_dir = Path(__file__).parent.parent.parent / "initial_setup" / "profile_pictures"

people_path = config_path / "profile_pictures/People"
space_path = config_path / "profile_pictures/Space"
people_path = package_dir / "People"
space_path = package_dir / "Space"

people = await storage_service.list_files(flow_id=people_path) # type: ignore[arg-type]
space = await storage_service.list_files(flow_id=space_path) # type: ignore[arg-type]
# List files from package directory - these are bundled with the container
people = [f.name for f in people_path.iterdir() if f.is_file()] if people_path.exists() else []
space = [f.name for f in space_path.iterdir() if f.is_file()] if space_path.exists() else []

except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
Expand Down
18 changes: 11 additions & 7 deletions src/backend/base/langflow/api/v1/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,14 @@
):
try:
db_flow = await _new_flow(session=session, flow=flow, user_id=current_user.id)
await session.commit()
await session.flush()
await session.refresh(db_flow)

await _save_flow_to_fs(db_flow)

# Convert to FlowRead while session is still active to avoid detached instance errors
flow_read = FlowRead.model_validate(db_flow, from_attributes=True)

except Exception as e:
if "UNIQUE constraint failed" in str(e):
# Get the name of the column that failed
Expand All @@ -180,7 +183,7 @@
if isinstance(e, HTTPException):
raise
raise HTTPException(status_code=500, detail=str(e)) from e
return db_flow
return flow_read


@router.get("/", response_model=list[FlowRead] | Page[FlowRead] | list[FlowHeader], status_code=200)
Expand Down Expand Up @@ -358,7 +361,7 @@
db_flow.folder_id = default_folder.id

session.add(db_flow)
await session.commit()
await session.flush()
await session.refresh(db_flow)

await _save_flow_to_fs(db_flow)
Expand Down Expand Up @@ -398,7 +401,6 @@
if not flow:
raise HTTPException(status_code=404, detail="Flow not found")
await cascade_delete_flow(session, flow.id)
await session.commit()
return {"message": "Flow deleted successfully"}


Expand All @@ -416,10 +418,12 @@
db_flow = Flow.model_validate(flow, from_attributes=True)
session.add(db_flow)
db_flows.append(db_flow)
await session.commit()
await session.flush()
for db_flow in db_flows:
await session.refresh(db_flow)
return db_flows
# Convert to FlowRead while session is still active to avoid detached instance errors
flow_reads = [FlowRead.model_validate(db_flow, from_attributes=True) for db_flow in db_flows]
return flow_reads

Check failure on line 426 in src/backend/base/langflow/api/v1/flows.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (RET504)

src/backend/base/langflow/api/v1/flows.py:426:12: RET504 Unnecessary assignment to `flow_reads` before `return` statement


@router.post("/upload/", response_model=list[FlowRead], status_code=201)
Expand All @@ -444,7 +448,7 @@
response_list.append(response)

try:
await session.commit()
await session.flush()
for db_flow in response_list:
await session.refresh(db_flow)
await _save_flow_to_fs(db_flow)
Expand Down
5 changes: 0 additions & 5 deletions src/backend/base/langflow/api/v1/mcp_projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,6 @@ async def update_project_mcp_settings(
session.add(flow)
updated_flows.append(flow)

await session.commit()

response: dict[str, Any] = {
"message": f"Updated MCP settings for {len(updated_flows)} flows and project auth settings"
}
Expand Down Expand Up @@ -1176,9 +1174,6 @@ async def init_mcp_servers():
await logger.aexception(msg)
# Continue to next project even if this one fails

# Commit any auth settings updates
await session.commit()

except Exception as e: # noqa: BLE001
msg = f"Failed to initialize MCP servers: {e}"
await logger.aexception(msg)
Expand Down
7 changes: 2 additions & 5 deletions src/backend/base/langflow/api/v1/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ async def get_vertex_builds(flow_id: Annotated[UUID, Query()], session: DbSessio
async def delete_vertex_builds(flow_id: Annotated[UUID, Query()], session: DbSession) -> None:
try:
await delete_vertex_builds_by_flow_id(session, flow_id)
await session.commit()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e

Expand Down Expand Up @@ -93,7 +92,6 @@ async def get_messages(
async def delete_messages(message_ids: list[UUID], session: DbSession) -> None:
try:
await session.exec(delete(MessageTable).where(MessageTable.id.in_(message_ids))) # type: ignore[attr-defined]
await session.commit()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e

Expand All @@ -118,7 +116,7 @@ async def update_message(
message_dict["edit"] = True
db_message.sqlmodel_update(message_dict)
session.add(db_message)
await session.commit()
await session.flush()
await session.refresh(db_message)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
Expand Down Expand Up @@ -151,7 +149,7 @@ async def update_session_id(

session.add_all(messages)

await session.commit()
await session.flush()
message_responses = []
for message in messages:
await session.refresh(message)
Expand All @@ -173,7 +171,6 @@ async def delete_messages_session(
.where(col(MessageTable.session_id) == session_id)
.execution_options(synchronize_session="fetch")
)
await session.commit()
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e

Expand Down
Loading
Loading