Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5d20af4
Basic s3 abstractions and framework for storage
jordanrfrazier Oct 8, 2025
6bb3b12
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 8, 2025
fd02b38
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Oct 8, 2025
c4b70ed
Remove duplicate code, simplifications, comments
jordanrfrazier Oct 8, 2025
5e30e9d
csv and json to data comp fixes
jordanrfrazier Oct 8, 2025
000fb4c
Add hash lock to postgres migrations
jordanrfrazier Oct 8, 2025
aa2880f
Add guarddrails against using some components while in s3 mode
jordanrfrazier Oct 9, 2025
48c46cc
Clean up tests; use real env vars
jordanrfrazier Oct 9, 2025
865d8a1
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 9, 2025
4138179
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Oct 9, 2025
6055057
Claude fixed up some FE array handling and BE exception types
jordanrfrazier Oct 10, 2025
f6db4c5
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 10, 2025
b7c1019
[autofix.ci] apply automated fixes (attempt 2/3)
autofix-ci[bot] Oct 10, 2025
fb1e674
[autofix.ci] apply automated fixes (attempt 3/3)
autofix-ci[bot] Oct 10, 2025
e74836d
one comment
jordanrfrazier Oct 10, 2025
6b89a33
use issubclass in the pool creation
zzzming Oct 11, 2025
0ed8661
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 11, 2025
cb38603
Use static lock key for alembic migrations - TESTING
jordanrfrazier Oct 12, 2025
64c5712
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 12, 2025
d818213
Try adding explicit connect args with prepare threshold to disable al…
jordanrfrazier Oct 14, 2025
8b9e7a9
Fix previous -- only add prepare threshold for postgres
jordanrfrazier Oct 14, 2025
586dce7
add LANGFLOW_MIGRATION_LOCK_NAMESPACE for the lock key
zzzming Oct 15, 2025
87ea318
[autofix.ci] apply automated fixes
autofix-ci[bot] Oct 23, 2025
2ba02d8
Scope sessions to read/write and normalize using single mgmt function…
jordanrfrazier Oct 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ dependencies = [
"langchain-pinecone>=0.2.8,<1.0.0",
"langchain-mistralai==0.2.3",
"langchain-chroma>=0.2.6,<1.0.0",
"langchain-aws==0.2.33",
"langchain-aws>=0.2.33,<1.0.0",
"langchain-unstructured==0.1.5",
"langchain-milvus==0.1.7",
"langchain-mongodb==0.7.0",
Expand Down Expand Up @@ -133,6 +133,7 @@ dependencies = [
"easyocr>=1.7.2; sys_platform != 'darwin' or platform_machine != 'x86_64'",
"opencv-python>=4.11; sys_platform != 'darwin' or platform_machine != 'x86_64'",
"pytest-codspeed==4.0.0", # Pinned to avoid python3.13 + intel mac compatibility issues
"aioboto3>=15.2.0,<16.0.0",
]

[dependency-groups]
Expand Down
33 changes: 21 additions & 12 deletions src/backend/base/langflow/api/v1/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,20 @@
folder_name: str,
file_name: str,
):
"""Download a profile picture.
Profile pictures are system files bundled with the package, served from the installation directory.
"""

Check failure on line 135 in src/backend/base/langflow/api/v1/files.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (D205)

src/backend/base/langflow/api/v1/files.py:133:5: D205 1 blank line required between summary line and description
try:
storage_service = get_storage_service()
# Profile pictures are in the package installation directory
package_dir = Path(__file__).parent.parent.parent / "initial_setup" / "profile_pictures"
file_path = package_dir / folder_name / file_name

if not file_path.exists():
raise HTTPException(status_code=404, detail="Profile picture not found")

extension = file_name.split(".")[-1]
config_dir = storage_service.settings_service.settings.config_dir
config_path = Path(config_dir) # type: ignore[arg-type]
folder_path = config_path / "profile_pictures" / folder_name
content_type = build_content_type_from_extension(extension)
file_content = await storage_service.get_file(flow_id=folder_path, file_name=file_name) # type: ignore[arg-type]
file_content = file_path.read_bytes()
return StreamingResponse(BytesIO(file_content), media_type=content_type)

except Exception as e:
Expand All @@ -146,16 +152,19 @@

@router.get("/profile_pictures/list")
async def list_profile_pictures():
"""List available profile pictures.
Profile pictures are system files bundled with the package, served from the installation directory.
"""

Check failure on line 157 in src/backend/base/langflow/api/v1/files.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (D205)

src/backend/base/langflow/api/v1/files.py:155:5: D205 1 blank line required between summary line and description
try:
storage_service = get_storage_service()
config_dir = storage_service.settings_service.settings.config_dir
config_path = Path(config_dir) # type: ignore[arg-type]
# Profile pictures are in the package installation directory
package_dir = Path(__file__).parent.parent.parent / "initial_setup" / "profile_pictures"

people_path = config_path / "profile_pictures/People"
space_path = config_path / "profile_pictures/Space"
people_path = package_dir / "People"
space_path = package_dir / "Space"

people = await storage_service.list_files(flow_id=people_path) # type: ignore[arg-type]
space = await storage_service.list_files(flow_id=space_path) # type: ignore[arg-type]
# List files from package directory - these are bundled with the container
people = [f.name for f in people_path.iterdir() if f.is_file()] if people_path.exists() else []
space = [f.name for f in space_path.iterdir() if f.is_file()] if space_path.exists() else []

except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
Expand Down
29 changes: 21 additions & 8 deletions src/backend/base/langflow/api/v2/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,16 @@
raise HTTPException(status_code=500, detail=f"Error saving file: {e}") from e

# Compute the file size based on the path
file_size = await storage_service.get_file_size(
flow_id=str(current_user.id),
file_name=stored_file_name,
)
try:
file_size = await storage_service.get_file_size(
flow_id=str(current_user.id),
file_name=stored_file_name,
)
except Exception as e:
# If we can't get file size, the file might still be in storage
# Don't delete it - just report the error
# Clean up will happen if database insert fails below
raise HTTPException(status_code=500, detail=f"Storage error getting file size: {e}") from e

# Create a new file record
new_file = UserFile(
Expand All @@ -176,11 +182,18 @@
)
session.add(new_file)

await session.commit()
await session.refresh(new_file)
try:
await session.commit()
await session.refresh(new_file)
except Exception as db_err:
# Database insert failed - clean up the uploaded file to avoid orphaned files
try:
await storage_service.delete_file(flow_id=str(current_user.id), file_name=stored_file_name)
except Exception: # noqa: S110

Check failure on line 192 in src/backend/base/langflow/api/v2/files.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (BLE001)

src/backend/base/langflow/api/v2/files.py:192:20: BLE001 Do not catch blind exception: `Exception`
pass

Check failure on line 193 in src/backend/base/langflow/api/v2/files.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (SIM105)

src/backend/base/langflow/api/v2/files.py:190:13: SIM105 Use `contextlib.suppress(Exception)` instead of `try`-`except`-`pass`
raise HTTPException(status_code=500, detail=f"Database error: {db_err}") from db_err
except Exception as e:
# Optionally, you could also delete the file from disk if the DB insert fails.
raise HTTPException(status_code=500, detail=f"Database error: {e}") from e
raise HTTPException(status_code=500, detail=f"Unexpected error: {e}") from e

return UploadFileResponse(id=new_file.id, name=new_file.name, path=Path(new_file.path), size=new_file.size)

Expand Down
50 changes: 36 additions & 14 deletions src/backend/base/langflow/base/data/base_file.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import ast
import shutil
import tarfile
from abc import ABC, abstractmethod
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING
from zipfile import ZipFile, is_zipfile

from langflow.services.deps import get_settings_service
import orjson
import pandas as pd

from langflow.custom.custom_component.component import Component
from langflow.io import BoolInput, FileInput, HandleInput, Output, StrInput
from langflow.schema.data import Data
from langflow.schema.dataframe import DataFrame
from langflow.schema.message import Message

Check failure on line 18 in src/backend/base/langflow/base/data/base_file.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (I001)

src/backend/base/langflow/base/data/base_file.py:1:1: I001 Import block is un-sorted or un-formatted

if TYPE_CHECKING:
from collections.abc import Callable
Expand All @@ -26,6 +27,8 @@
This class provides common functionality for resolving, validating, and
processing file paths. Child classes must define valid file extensions
and implement the `process_files` method.

# TODO: May want to subclass for local and remote files
"""

class BaseFile:
Expand Down Expand Up @@ -519,16 +522,27 @@
resolved_files = []

def add_file(data: Data, path: str | Path, *, delete_after_processing: bool):
resolved_path = Path(self.resolve_path(str(path)))

if not resolved_path.exists():
msg = f"File or directory not found: {path}"
self.log(msg)
if not self.silent_errors:
raise ValueError(msg)
resolved_files.append(
BaseFileComponent.BaseFile(data, resolved_path, delete_after_processing=delete_after_processing)
)
path_str = str(path)
settings = get_settings_service().settings

# When using object storage (S3), file paths are storage keys (e.g., "<flow_id>/<filename>")
# that don't exist on the local filesystem. We defer validation until file processing.
# For local storage, validate the file exists immediately to fail fast.
if settings.storage_type == "s3":
print(f"FRAZIER _ TEST - FILE PATH: {path_str}")

Check failure on line 532 in src/backend/base/langflow/base/data/base_file.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (T201)

src/backend/base/langflow/base/data/base_file.py:532:17: T201 `print` found
resolved_files.append(
BaseFileComponent.BaseFile(data, Path(path_str), delete_after_processing=delete_after_processing)
)
else:
resolved_path = Path(self.resolve_path(path_str))
if not resolved_path.exists():
msg = f"File or directory not found: {path}"
self.log(msg)
if not self.silent_errors:
raise ValueError(msg)
resolved_files.append(
BaseFileComponent.BaseFile(data, resolved_path, delete_after_processing=delete_after_processing)
)

file_path = self._file_path_as_list()

Expand Down Expand Up @@ -673,15 +687,23 @@
Raises:
ValueError: If unsupported files are encountered and `ignore_unsupported_extensions` is False.
"""
from langflow.services.deps import get_settings_service

settings = get_settings_service().settings
final_files = []
ignored_files = []

for file in files:
if not file.path.is_file():
self.log(f"Not a file: {file.path.name}")
continue
# For S3 storage, paths are virtual keys that don't exist locally
# Skip filesystem checks and only validate extensions
if settings.storage_type != "s3":
if not file.path.is_file():

Check failure on line 700 in src/backend/base/langflow/base/data/base_file.py

View workflow job for this annotation

GitHub Actions / Ruff Style Check (3.13)

Ruff (SIM102)

src/backend/base/langflow/base/data/base_file.py:699:13: SIM102 Use a single `if` statement instead of nested `if` statements
self.log(f"Not a file: {file.path.name}")
continue

if file.path.suffix[1:].lower() not in self.valid_extensions:
# Validate file extension
extension = file.path.suffix[1:].lower()
if extension not in self.valid_extensions:
if self.ignore_unsupported_extensions:
ignored_files.append(file.path.name)
continue
Expand Down
Loading
Loading