Skip to content

Commit f952bf0

Browse files
committed
feat: add page-range arg to pdf parse
1 parent c99b9ac commit f952bf0

File tree

9 files changed

+662
-55
lines changed

9 files changed

+662
-55
lines changed

src/fenic/_backends/local/semantic_operators/parse_pdf.py

Lines changed: 70 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
from textwrap import dedent
3-
from typing import List, Optional, Tuple
3+
from typing import List, Optional, Tuple, Union
44

55
import fitz
66
import jinja2
@@ -10,7 +10,11 @@
1010
BaseSingleColumnFilePathOperator,
1111
CompletionOnlyRequestSender,
1212
)
13-
from fenic._backends.local.utils.doc_loader import DocFolderLoader
13+
from fenic._backends.local.utils.doc_loader import (
14+
DocFolderLoader,
15+
resolve_and_coalesce_pages,
16+
validate_pages_argument,
17+
)
1418
from fenic._inference.language_model import InferenceConfiguration, LanguageModel
1519
from fenic._inference.types import LMRequestFile, LMRequestMessages
1620
from fenic.core._logical_plan.resolved_types import ResolvedModelAlias
@@ -48,11 +52,13 @@ def __init__(
4852
page_separator: Optional[str] = None,
4953
describe_images: bool = False,
5054
model_alias: Optional[ResolvedModelAlias] = None,
55+
pages: Optional[Union[pl.Series, int, List[Union[int, List[int]]]]] = None,
5156
):
5257
self.page_separator = page_separator
5358
self.describe_images = describe_images
5459
self.model = model
5560
self.model_alias = model_alias
61+
self.pages = pages
5662

5763
DocFolderLoader.check_file_extensions(input.to_list(), "pdf")
5864

@@ -105,12 +111,19 @@ def build_request_messages_batch(self) -> Tuple[List[Optional[LMRequestMessages]
105111
List of the each chunk size (page count) per PDF (page_counts_per_chunk_per_row)"""
106112
messages_batch = []
107113
page_counts_per_chunk_per_row = []
108-
for path in self.input:
114+
for idx, path in enumerate(self.input):
109115
if not path:
110116
messages_batch.append(None)
111117
page_counts_per_chunk_per_row.append([1])
112118
else:
113-
file_chunks = self._get_file_chunks(path)
119+
# pages can be a literal int, list of ranges, or a logical expression that resolves to an int or list of ranges
120+
row_pages = self.pages.to_list()[idx] if isinstance(self.pages, pl.Series) else self.pages
121+
122+
# Validate pages if it's not None (validation happens here for column values)
123+
if row_pages is not None:
124+
validate_pages_argument(row_pages)
125+
126+
file_chunks = self._get_file_chunks(path, row_pages)
114127
page_counts_per_chunk = []
115128
for file in file_chunks:
116129
messages_batch.append(
@@ -120,57 +133,73 @@ def build_request_messages_batch(self) -> Tuple[List[Optional[LMRequestMessages]
120133
page_counts_per_chunk_per_row.append(page_counts_per_chunk)
121134
return messages_batch, page_counts_per_chunk_per_row
122135

123-
124-
def _get_file_chunks(self, file_path: str) -> List[LMRequestFile]:
136+
def _get_file_chunks(self, file_path: str, pages: Optional[Union[int, List[Union[int, List[int]]]]] = None) -> List[LMRequestFile]:
125137
"""Get the page chunks for the PDF file.
126138
127139
Limit the pages based on the model's output token limit and internal max pages per chunk.
128140
129141
Args:
130142
file_path: Path to the PDF file
143+
pages: Optional pages specification (1-indexed). If None, process all pages.
131144
132145
Returns:
133146
List of LMRequestFile objects
134-
List of (start_page, end_page) tuples (inclusive, 0-indexed)
135147
"""
136148
chunks = []
137-
range_start_page = 0
138-
range_tokens = 0
139-
range_page_count = 0
140149

141150
with fitz.open(file_path) as doc:
142151
total_pages = doc.page_count
143-
for page_num in range(total_pages):
144-
text = doc[page_num].get_text("text")
145-
page_tokens = self.model.count_tokens(text)
146-
# Check if we need to start a new range, either by reaching the token limit or the requested page range size
147-
would_exceed_tokens = range_tokens > 0 and (range_tokens + page_tokens) * PDF_MARKDOWN_OUTPUT_TOKEN_MULTIPLIER > self.model.model_parameters.max_output_tokens
148-
would_exceed_page_limit = range_page_count >= PDF_MAX_PAGES_CHUNK
149-
150-
if would_exceed_tokens or would_exceed_page_limit:
151-
# Save current batch
152-
last_page = page_num - 1
153-
page_range = (range_start_page, last_page)
154-
with fitz.open() as doc_chunk:
155-
doc_chunk.insert_pdf(doc, from_page=range_start_page, to_page=last_page)
156-
chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=doc_chunk.tobytes(), page_range=page_range))
157-
range_start_page = page_num
158-
range_tokens = page_tokens
159-
range_page_count = 1
160-
else:
161-
range_tokens += page_tokens
162-
range_page_count += 1
163-
164-
# Add the last batch if there are remaining pages
165-
if range_start_page < total_pages:
166-
if range_start_page == 0:
167-
# whole pdf fits in one chunk, no need to keep data in memory
168-
chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=None, page_range=(0, total_pages - 1)))
169-
else:
170-
# multi-page chunk
171-
with fitz.open() as doc_chunk:
172-
doc_chunk.insert_pdf(doc, from_page=range_start_page, to_page=total_pages - 1)
173-
chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=doc_chunk.tobytes(), page_range=(range_start_page, total_pages - 1)))
152+
153+
# Resolve page ranges
154+
if pages is not None:
155+
resolved_ranges = resolve_and_coalesce_pages(pages, total_pages)
156+
# Filter out ranges that exceed the document's page count
157+
resolved_ranges = [(start, min(end, total_pages - 1)) for start, end in resolved_ranges if start < total_pages]
158+
else:
159+
# Process all pages
160+
resolved_ranges = [(0, total_pages - 1)]
161+
162+
# Process each range
163+
for range_start, range_end in resolved_ranges:
164+
# Track current chunk within this range
165+
chunk_start_page = range_start
166+
chunk_tokens = 0
167+
chunk_page_count = 0
168+
169+
for page_num in range(range_start, range_end + 1):
170+
text = doc[page_num].get_text("text")
171+
page_tokens = self.model.count_tokens(text)
172+
173+
# Check if we need to start a new chunk
174+
would_exceed_tokens = chunk_tokens > 0 and (chunk_tokens + page_tokens) * PDF_MARKDOWN_OUTPUT_TOKEN_MULTIPLIER > self.model.model_parameters.max_output_tokens
175+
would_exceed_page_limit = chunk_page_count >= PDF_MAX_PAGES_CHUNK
176+
177+
if would_exceed_tokens or would_exceed_page_limit:
178+
# Save current chunk
179+
last_page = page_num - 1
180+
page_range = (chunk_start_page, last_page)
181+
with fitz.open() as doc_chunk:
182+
doc_chunk.insert_pdf(doc, from_page=chunk_start_page, to_page=last_page)
183+
chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=doc_chunk.tobytes(), page_range=page_range))
184+
185+
# Start new chunk
186+
chunk_start_page = page_num
187+
chunk_tokens = page_tokens
188+
chunk_page_count = 1
189+
else:
190+
chunk_tokens += page_tokens
191+
chunk_page_count += 1
192+
193+
# Add the last chunk for this range if there are remaining pages
194+
if chunk_start_page <= range_end:
195+
if chunk_start_page == 0 and range_end == total_pages - 1 and len(resolved_ranges) == 1:
196+
# Whole PDF fits in one chunk, no need to keep data in memory
197+
chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=None, page_range=(0, total_pages - 1)))
198+
else:
199+
# Multi-page chunk or partial PDF
200+
with fitz.open() as doc_chunk:
201+
doc_chunk.insert_pdf(doc, from_page=chunk_start_page, to_page=range_end)
202+
chunks.append(LMRequestFile(path=file_path, pdf_chunk_bytes=doc_chunk.tobytes(), page_range=(chunk_start_page, range_end)))
174203

175204
return chunks
176205

src/fenic/_backends/local/transpiler/expr_converter.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -706,17 +706,30 @@ def sem_summarize_fn(batch: pl.Series) -> pl.Series:
706706
@_convert_expr.register(SemanticParsePDFExpr)
707707
def _convert_parse_pdf_expr(self, logical: SemanticParsePDFExpr) -> pl.Expr:
708708
def parse_pdf_fn(batch: pl.Series) -> pl.Series:
709+
if batch.dtype == pl.Struct:
710+
fields = batch.struct.fields
711+
docs_series = batch.struct.field(fields[0])
712+
pages_series_or_static = batch.struct.field(fields[1])
713+
else:
714+
docs_series = batch
715+
pages_series_or_static = logical.pages
716+
709717
return SemanticParsePDF(
710-
input=batch,
718+
input=docs_series,
711719
model=self.session_state.get_language_model(logical.model_alias),
712720
page_separator=logical.page_separator,
713721
describe_images=logical.describe_images,
714722
model_alias=logical.model_alias,
723+
pages=pages_series_or_static,
715724
).execute()
716-
717-
return self._convert_expr(logical.expr).map_batches(
718-
parse_pdf_fn, return_dtype=pl.Utf8
719-
)
725+
if isinstance(logical.pages, LogicalExpr):
726+
return pl.struct(self._convert_expr(logical.expr), self._convert_expr(logical.pages)).map_batches(
727+
parse_pdf_fn, return_dtype=pl.Utf8
728+
)
729+
else:
730+
return self._convert_expr(logical.expr).map_batches(
731+
parse_pdf_fn, return_dtype=pl.Utf8
732+
)
720733

721734
@_convert_expr.register(ArrayJoinExpr)
722735
def _convert_array_join_expr(self, logical: ArrayJoinExpr) -> pl.Expr:

src/fenic/_backends/local/utils/doc_loader.py

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import re
77
from concurrent.futures import ThreadPoolExecutor, as_completed
88
from pathlib import Path
9-
from typing import List, Literal, Optional, Tuple
9+
from typing import List, Literal, Optional, Tuple, Union
1010

1111
import fitz # PyMuPDF
1212
import polars as pl
@@ -26,6 +26,95 @@
2626

2727
logger = logging.getLogger(__name__)
2828

29+
30+
def validate_pages_argument(pages: Optional[Union[int, List[Union[int, List[int]]]]]) -> None:
31+
"""Validate the pages argument.
32+
33+
Args:
34+
pages: Either an int, or a list of ints or pairs of ints (ranges), or None
35+
36+
Raises:
37+
ValidationError: If the pages argument is invalid
38+
"""
39+
if pages is None:
40+
return
41+
if isinstance(pages, int):
42+
if pages <= 0:
43+
raise ValidationError("Page numbers must be positive integers")
44+
elif isinstance(pages, list):
45+
for item in pages:
46+
if isinstance(item, int):
47+
if item <= 0:
48+
raise ValidationError("Page numbers must be positive integers")
49+
elif isinstance(item, list):
50+
if len(item) != 2:
51+
raise ValidationError("Page ranges must be pairs of two numbers")
52+
if not all(isinstance(x, int) for x in item):
53+
breakpoint()
54+
raise ValidationError("Page range values must be integers")
55+
if item[0] <= 0 or item[1] <= 0:
56+
raise ValidationError("Page numbers must be positive integers")
57+
if item[1] < item[0]:
58+
raise ValidationError(f"Invalid page range [{item[0]}, {item[1]}]: end page must be >= start page")
59+
else:
60+
raise ValidationError(f"Invalid pages element type: {type(item).__name__}. Expected int or list of two ints")
61+
else:
62+
raise ValidationError(f"Invalid pages type: {type(pages).__name__}. Expected int, list, or Column")
63+
64+
65+
def resolve_and_coalesce_pages(pages: Union[int, List[Union[int, List[int]]]], total_pages: int) -> List[Tuple[int, int]]:
66+
"""Resolve and coalesce page specifications into sorted, non-overlapping ranges.
67+
68+
Converts page numbers and ranges into a sorted list of non-overlapping page ranges.
69+
All page numbers are 1-indexed as input but converted to 0-indexed ranges for internal use.
70+
71+
Args:
72+
pages: Either a single page number (int) or a list of page numbers and/or ranges.
73+
Page numbers are 1-indexed. Ranges are represented as [start, end] (inclusive).
74+
75+
Returns:
76+
List of (start, end) tuples representing 0-indexed page ranges (inclusive).
77+
Ranges are sorted and non-overlapping.
78+
79+
Examples:
80+
>>> resolve_and_coalesce_pages(5)
81+
[(4, 4)]
82+
>>> resolve_and_coalesce_pages([1, 3, 5])
83+
[(0, 0), (2, 2), (4, 4)]
84+
>>> resolve_and_coalesce_pages([1, [2, 4], 3, 5])
85+
[(0, 0), (1, 3), (4, 4)]
86+
>>> resolve_and_coalesce_pages([[1, 3], [2, 5], 7])
87+
[(0, 4), (6, 6)]
88+
"""
89+
# Convert to list of (start, end) tuples (0-indexed, inclusive)
90+
ranges = []
91+
if isinstance(pages, int):
92+
# Single page: convert 1-indexed to 0-indexed
93+
ranges.append((pages - 1, min(pages - 1, total_pages - 1)))
94+
else:
95+
for item in pages:
96+
# Single page: convert 1-indexed to 0-indexed
97+
# every range is capped by the total number of pages in the document
98+
ranges.append((item - 1, min(item - 1, total_pages - 1)) if isinstance(item, int) else (item[0] - 1, min(item[1] - 1, total_pages - 1)))
99+
100+
# Sort by start page
101+
ranges.sort()
102+
103+
# Coalesce overlapping ranges
104+
if not ranges:
105+
return []
106+
coalesced = [ranges[0]]
107+
for start, end in ranges[1:]:
108+
last_start, last_end = coalesced[-1]
109+
# Check if ranges overlap or are adjacent
110+
if start <= last_end + 1:
111+
# Merge ranges
112+
coalesced[-1] = (last_start, max(last_end, end))
113+
else:
114+
# No overlap, add new range
115+
coalesced.append((start, end))
116+
return coalesced
117+
29118
class DocFolderLoader:
30119
"""A class that encapsulates folder traversal and multi-threaded file processing.
31120

src/fenic/api/functions/semantic.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from pydantic import BaseModel, ConfigDict, validate_call
66

7+
from fenic._backends.local.utils.doc_loader import validate_pages_argument
78
from fenic.api.column import Column, ColumnOrName
89
from fenic.core._logical_plan.expressions import (
910
AliasExpr,
@@ -596,6 +597,7 @@ def parse_pdf(
596597
model_alias: Optional[Union[str, ModelAlias]] = None,
597598
page_separator: Optional[str] = None,
598599
describe_images: bool = False, # for images that aren't tables
600+
pages: Optional[Union[Column, int, List[Union[int, List[int]]]]] = None,
599601
) -> Column:
600602
r"""Parses a column of PDF paths into markdown.
601603
@@ -607,12 +609,19 @@ def parse_pdf(
607609
model_alias: Optional alias for the language model to use for the parsing. If None, will use the language model configured as the default.
608610
page_separator: Optional page separator to use for the parsing. If the separator includes the {page} placeholder, the model will replace it with the current page number.
609611
describe_images: Flag to describe images in the PDF. If True, the prompt will ask the model to include a description of the image in the markdown output. If False, the prompt asks the model to ignore images that aren't tables or charts.
612+
pages: Optional pages or page ranges to parse. Can be:
613+
- An int (single page number, 1-indexed)
614+
- A list of ints and/or pairs of ints (e.g., [1, [3, 5], 7] to parse pages 1, 3-5, and 7)
615+
- A Column expression that resolves to an int or list of ints or ranges
616+
If None, all pages will be parsed.
610617
611618
Note:
612619
For Gemini models, this function uses the google file API, uploading PDF files to Google's file store and deleting them after each request.
620+
A Column expression for pages is limited by its dtype, so it must either be list of ranges (list of lists size 2) or a list of page numbers, not both. Rows can contain None/empty list to parse all pages.
613621
614622
Raises:
615623
ExecutionError: If paths in the column are not valid PDF files.
624+
ValidationError: If the pages argument is invalid.
616625
617626
Example: Parse PDF paths in a column into markdown
618627
```python
@@ -631,14 +640,33 @@ def parse_pdf(
631640
pdf_markdown = pdf_metadata.select(semantic.parse_pdf(col("file_path"), page_separator="--- PAGE BREAK ---")
632641
pdf_markdown.select(col("markdown_content")).show()
633642
```
643+
644+
Example: Parsing PDFs with a page range - take only the pages 1-2 and 5-7
645+
```python
646+
pdf_metadata = local_session.read.pdf_metadata("data/docs/**/*.pdf")
647+
pdf_markdown = semantic.parse_pdf(col("file_path"), pages=[[1,2], [5,7]])
648+
pdf_markdown.select(col("markdown_content")).show()
649+
```
650+
651+
Example: Parsing PDFs with a page range column - take only the first and last page
652+
```python
653+
pdf_metadata = local_session.read.pdf_metadata("data/docs/**/*.pdf")
654+
pdf_markdown = semantic.parse_pdf(col("file_path"), pages=array(lit(1), col("page_count"))
655+
pdf_markdown.select(col("markdown_content")).show()
656+
```
634657
"""
635658
resolved_model_alias = _resolve_model_alias(model_alias)
636659

660+
# Validate pages if it's not a Column
661+
if not isinstance(pages, Column):
662+
validate_pages_argument(pages)
663+
637664
return Column._from_logical_expr(
638665
SemanticParsePDFExpr(
639666
Column._from_col_or_name(column)._logical_expr,
640667
model_alias=resolved_model_alias,
641668
page_separator=page_separator,
642669
describe_images=describe_images,
670+
pages=pages if not isinstance(pages, Column) else pages._logical_expr,
643671
)
644672
)

src/fenic/api/session/session.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,13 +205,15 @@ def create_dataframe(
205205
pl_df = pl.from_arrow(data)
206206

207207
else:
208+
breakpoint()
208209
raise ValidationError(
209210
f"Unsupported data type: {type(data)}. Supported types are: Polars DataFrame, Pandas DataFrame, dict, or list."
210211
)
211212

212213
except ValidationError:
213214
raise
214215
except Exception as e:
216+
breakpoint()
215217
raise PlanError(f"Failed to create DataFrame from {data}") from e
216218

217219
return DataFrame._from_logical_plan(

0 commit comments

Comments
 (0)