diff --git a/docs/releasenotes.md b/docs/releasenotes.md index cf54708..43fe6b9 120000 --- a/docs/releasenotes.md +++ b/docs/releasenotes.md @@ -1 +1 @@ -../CHANGES.md \ No newline at end of file +../CHANGES.md diff --git a/src/cooler/cli/cload.py b/src/cooler/cli/cload.py index ded4b19..d1f52af 100644 --- a/src/cooler/cli/cload.py +++ b/src/cooler/cli/cload.py @@ -1,10 +1,14 @@ import sys import click +from .. import create_cooler import h5py +import io +import gzip import numpy as np import pandas as pd import simplejson as json +from typing import Union from cytoolz import compose from multiprocess import Pool @@ -24,10 +28,10 @@ from pandas.io.common import get_handle -# Copied from pairtools._headerops def get_header(instream, comment_char="#"): - """Returns a header from the stream and an the reaminder of the stream + """Returns a header from the stream and the remainder of the stream with the actual data. + Parameters ---------- instream : a file object @@ -41,39 +45,94 @@ def get_header(instream, comment_char="#"): The header lines, stripped of terminal spaces and newline characters. remainder_stream : stream/file-like object Stream with the remaining lines. - """ - header = [] if not comment_char: raise ValueError("Please, provide a comment char!") - comment_byte = comment_char.encode() - # get peekable buffer for the instream - read_f, peek_f = None, None - if hasattr(instream, "buffer"): - peek_f = instream.buffer.peek - readline_f = instream.buffer.readline - elif hasattr(instream, "peek"): - peek_f = instream.peek - readline_f = instream.readline - else: - raise ValueError("Cannot find the peek() function of the provided stream!") - current_peek = peek_f(1) - while current_peek.startswith(comment_byte): - # consuming a line from buffer guarantees - # that the remainder of the buffer starts - # with the beginning of the line. + header = [] + buffered_lines = [] + readline_f = instream.readline + is_bytes = False + + while True: line = readline_f() + if not line: # End of stream + break if isinstance(line, bytes): + is_bytes = True line = line.decode() - # append line to header, since it does start with header + if not line.startswith(comment_char): + buffered_lines.append(line) + break header.append(line.strip()) - # peek into the remainder of the instream - current_peek = peek_f(1) - # apparently, next line does not start with the comment - # return header and the instream, advanced to the beginning of the data - return header, instream + # Create a new stream with the remaining data + from io import StringIO, BytesIO + if buffered_lines: + if is_bytes: + # For binary streams, read remaining data as bytes + remainder = instream.read() + if isinstance(remainder, str): + remainder = remainder.encode() + remainder_stream = BytesIO(b"".join([line.encode() for line in buffered_lines]) + remainder) + else: + # For text streams, read remaining data as text + remainder = instream.read() + if isinstance(remainder, bytes): + remainder = remainder.decode() + remainder_stream = StringIO("".join(buffered_lines) + remainder) + else: + remainder_stream = instream + + return header, remainder_stream + +def open_maybe_gzip(path, mode="rt"): + if isinstance(path, str) and path.endswith(".gz"): + return gzip.open(path, mode) + return open(path, mode) + +def validate_pairs_columns(file_path: str, field_numbers: dict[str, int], is_stdin: bool = False) -> Union[io.StringIO, io.TextIOBase]: + """ + Validate that column indices for chrom1, pos1, chrom2, pos2, and any additional fields + do not exceed the number of columns in the pairs file. Returns the stream positioned after headers. + + Args: + file_path: Path to the pairs file or '-' for stdin. + field_numbers: Dictionary mapping field names (e.g., 'chrom1') to zero-based column indices. + is_stdin: True if input is from stdin. + + Returns: + File-like object positioned at the start of data. + + Raises: + ValueError: If any column index exceeds the number of columns in the file. + """ + if is_stdin: + input_data = sys.stdin.read() if hasattr(sys.stdin, 'read') else sys.stdin.getvalue() + f = io.StringIO(input_data) + else: + f = open_maybe_gzip(file_path, 'r') + + _, f = get_header(f, comment_char="#") + pos = f.tell() + + # Read first data line and check column count + first_data_line = f.readline() + if isinstance(first_data_line, bytes): + first_data_line = first_data_line.decode() + first_data_line = first_data_line.strip() + + f.seek(pos) # Rewind to preserve the stream + if not first_data_line: + raise ValueError(f"Pairs file '{file_path}' is empty or contains only header lines") + + num_cols = len(first_data_line.split("\t")) + for name, idx in field_numbers.items(): + if idx >= num_cols: + raise ValueError( + f"Column index {idx + 1} ({name}) exceeds number of columns ({num_cols}) in '{file_path}'" + ) + return f @cli.group() def cload(): @@ -551,9 +610,6 @@ def pairs( if len(field): for arg in field: name, colnum, dtype, agg = parse_field_param(arg) - - # Special cases: these do not have input fields. - # Omit field number and agg to change standard dtypes. if colnum is None: if ( agg is None @@ -584,19 +640,12 @@ def pairs( else: aggregations[name] = "sum" - # # Pairs counts are always produced, unless supressed explicitly - # do_count = not no_count - # if do_count: - # if 'count' not in output_field_names: - # output_field_names.append('count') # default dtype and agg - # else: - # if not len(output_field_names): - # click.BadParameter( - # "To pass `--no-count`, specify at least one input " - # "value-column using `--field`.") if "count" not in output_field_names: output_field_names.append("count") + # Validate and get the stream + f_in = validate_pairs_columns(pairs_path, input_field_numbers, is_stdin=(pairs_path == "-")) + # Customize the HDF5 filters if storage_options is not None: h5opts = parse_kv_list_param(storage_options) @@ -606,19 +655,8 @@ def pairs( else: h5opts = None - # Initialize the input stream - # TODO: we could save the header into metadata + # Explicitly set kwargs to empty to avoid passing Click options to read_csv kwargs = {} - if pairs_path == "-": - f_in = sys.stdin - elif int(_pandas_version[0]) == 1 and int(_pandas_version[1]) < 2: - # get_handle returns a pair of objects in pandas 1.0 and 1.1 - f_in = get_handle(pairs_path, mode="r", compression="infer")[0] - else: - # get_handle returns a single wrapper object in pandas 1.2+ and 2.* - f_in = get_handle(pairs_path, mode="r", compression="infer").handle - - _, f_in = get_header(f_in) reader = pd.read_csv( f_in, diff --git a/src/cooler/util.py b/src/cooler/util.py index d25b65e..13e68eb 100644 --- a/src/cooler/util.py +++ b/src/cooler/util.py @@ -1,11 +1,12 @@ from __future__ import annotations +import io import os import re from collections import OrderedDict, defaultdict from collections.abc import Generator, Iterable, Iterator from contextlib import contextmanager -from typing import IO, Any +from typing import Any import h5py import numpy as np @@ -204,16 +205,15 @@ def argnatsort(array: Iterable[str]) -> np.ndarray: cols = tuple(zip(*(natsort_key(x) for x in array))) return np.lexsort(cols[::-1]) - def read_chromsizes( - filepath_or: str | IO[str], + filepath_or: str | io.StringIO, name_patterns: tuple[str, ...] = (r"^chr[0-9]+$", r"^chr[XY]$", r"^chrM$"), all_names: bool = False, **kwargs, ) -> pd.Series: """ - Parse a ``.chrom.sizes`` or ``.chromInfo.txt`` file from the UCSC - database, where ``db`` is a genome assembly name. + Parse a `.chrom.sizes` or `.chromInfo.txt` file from the UCSC + database, where `db` is a genome assembly name. Parameters ---------- @@ -221,32 +221,68 @@ def read_chromsizes( Path or url to text file, or buffer. name_patterns : sequence, optional Sequence of regular expressions to capture desired sequence names. - Each corresponding set of records will be sorted in natural order. all_names : bool, optional - Whether to return all contigs listed in the file. Default is - ``False``. + Whether to return all contigs listed in the file. + verbose : bool, optional + Whether to enable verbose logging for diagnostics. + """ + # Handle URL case separately + if isinstance(filepath_or, str) and filepath_or.startswith(('http://', 'https://')): + try: + # Use pandas' built-in URL handling + chromtable = pd.read_csv( + filepath_or, + sep="\t", + usecols=[0, 1], + names=["name", "length"], + dtype={"name": str}, + on_bad_lines="error", + **kwargs, + ) + except Exception as e: + raise ValueError(f"Failed to fetch URL {filepath_or}: {e!s}") from e + else: + # Original validation for local files/StringIO + if isinstance(filepath_or, (str, io.StringIO)): + first_line = None + if isinstance(filepath_or, io.StringIO): + first_line = filepath_or.getvalue().splitlines()[0] + elif isinstance(filepath_or, str): + with open(filepath_or) as file: + first_line = file.readline() + + if first_line and ' ' in first_line: + raise ValueError( + f"Chromsizes file '{filepath_or}' uses spaces instead of tabs " + "as delimiters. Please use tabs.") + + # Read the chromosome size file into a DataFrame + chromtable = pd.read_csv( + filepath_or, + sep="\t", # Ensuring tab is the delimiter + usecols=[0, 1], + names=["name", "length"], + dtype={"name": str}, + on_bad_lines="error", + **kwargs, + ) - Returns - ------- - :py:class:`pandas.Series` - Series of integer bp lengths indexed by sequence name. + # Convert the "length" column to numeric values, coercing errors to NaN + chromtable["length"] = pd.to_numeric(chromtable["length"], errors="coerce") + + # Check for NaN values after conversion and raise an error if any are found. + if chromtable["length"].isnull().any(): + raise ValueError( + f"Chromsizes file '{filepath_or}' contains missing or invalid length " + "values. " + "Please ensure that the file is properly formatted as tab-delimited with " + "two columns: " + "sequence name and integer length. " + "Check for extraneous spaces or hidden characters. " + f"Invalid rows: \n{chromtable[chromtable['length'].isnull()]}" + ) - References - ---------- - * `UCSC assembly terminology `_ - * `GRC assembly terminology `_ - - """ - if isinstance(filepath_or, str) and filepath_or.endswith(".gz"): - kwargs.setdefault("compression", "gzip") - chromtable = pd.read_csv( - filepath_or, - sep="\t", - usecols=[0, 1], - names=["name", "length"], - dtype={"name": str}, - **kwargs, - ) + # Filter by patterns if needed if not all_names: parts = [] for pattern in name_patterns: @@ -254,6 +290,7 @@ def read_chromsizes( part = part.iloc[argnatsort(part["name"])] parts.append(part) chromtable = pd.concat(parts, axis=0) + chromtable.index = chromtable["name"].values return chromtable["length"] diff --git a/tests/test_cload.py b/tests/test_cload.py new file mode 100644 index 0000000..d0f8cfd --- /dev/null +++ b/tests/test_cload.py @@ -0,0 +1,210 @@ +from click.testing import CliRunner + +from cooler.cli.cload import pairs + +runner = CliRunner() + + +def test_valid_indices(tmpdir): + """Test that valid column indices process successfully.""" + bins_path = tmpdir.join("bins.bed") + pairs_path = tmpdir.join("pairs.txt") + out_path = tmpdir.join("out.cool") + + bins_path.write("chr1\t0\t1000000\nchr1\t1000000\t2000000\nchr2\t0\t1000000\n") + pairs_path.write("chr1\t1000\tchr1\t2000\nchr1\t1500\tchr2\t3000\n") + + result = runner.invoke(pairs, [ + "-c1", "1", + "-p1", "2", + "-c2", "3", + "-p2", "4", + str(bins_path), + str(pairs_path), + str(out_path), + ]) + + assert result.exit_code == 0, f"Command failed: {result.output}" + + +def test_invalid_index(tmpdir): + """Test that an invalid column index raises an error.""" + bins_path = tmpdir.join("bins.bed") + pairs_path = tmpdir.join("pairs.txt") + out_path = tmpdir.join("out.cool") + + bins_path.write("chr1\t0\t1000000\n") + pairs_path.write("chr1\t1000\tchr1\t2000\n") + + result = runner.invoke(pairs, [ + "-c1", "1", + "-p1", "2", + "-c2", "3", + "-p2", "5", + str(bins_path), + str(pairs_path), + str(out_path), + ]) + + assert result.exit_code != 0, "Command should have failed" + assert isinstance(result.exception, ValueError), ( + f"Expected ValueError, got {result.exception}" + ) + assert ( + "Column index 5 (pos2) exceeds number of columns (4)" in str(result.exception) + ), ( + f"Unexpected error: {result.exception}" + ) + + +def test_stdin_valid(tmpdir): + """Test valid indices with stdin input.""" + bins_path = tmpdir.join("bins.bed") + out_path = tmpdir.join("out.cool") + + bins_path.write("chr1\t0\t1000000\n") + pairs_data = "chr1\t1000\tchr1\t2000\n" + + result = runner.invoke( + pairs, + [ + "-c1", "1", + "-p1", "2", + "-c2", "3", + "-p2", "4", + str(bins_path), + "-", + str(out_path), + ], + input=pairs_data, + ) + + assert result.exit_code == 0, f"Command failed: {result.output} {result.exception}" + + +def test_stdin_invalid(tmpdir): + """Test invalid index with stdin raises an error.""" + bins_path = tmpdir.join("bins.bed") + out_path = tmpdir.join("out.cool") + + bins_path.write("chr1\t0\t1000000\n") + pairs_data = "chr1\t1000\tchr1\t2000\n" + + result = runner.invoke( + pairs, + [ + "-c1", "1", + "-p1", "2", + "-c2", "3", + "-p2", "5", + str(bins_path), + "-", + str(out_path), + ], + input=pairs_data, + ) + + assert result.exit_code != 0, "Command should have failed" + assert isinstance(result.exception, ValueError), ( + f"Expected ValueError, got {result.exception}" + ) + assert ( + "Column index 5 (pos2) exceeds number of columns (4)" in str(result.exception) + ), ( + f"Unexpected error: {result.exception}" + ) + + +def test_header_skipping(tmpdir): + """Test that headers are skipped and validation works.""" + bins_path = tmpdir.join("bins.bed") + pairs_path = tmpdir.join("pairs.txt") + out_path = tmpdir.join("out.cool") + + bins_path.write("chr1\t0\t1000000\n") + pairs_path.write("#comment1\n#comment2\nchr1\t1000\tchr1\t2000\n") + + result = runner.invoke(pairs, [ + "-c1", "1", + "-p1", "2", + "-c2", "3", + "-p2", "4", + str(bins_path), + str(pairs_path), + str(out_path), + ]) + + assert result.exit_code == 0, f"Command failed: {result.output}" + + +def test_empty_file(tmpdir): + """Test that an empty file or header-only file raises an error.""" + bins_path = tmpdir.join("bins.bed") + pairs_path = tmpdir.join("pairs.txt") + out_path = tmpdir.join("out.cool") + + bins_path.write("chr1\t0\t1000000\n") + pairs_path.write("#comment1\n#comment2\n") + + result = runner.invoke(pairs, [ + "-c1", "1", + "-p1", "2", + "-c2", "3", + "-p2", "4", + str(bins_path), + str(pairs_path), + str(out_path), + ]) + + assert result.exit_code != 0, "Command should have failed" + assert isinstance(result.exception, ValueError), ( + f"Expected ValueError, got {result.exception}" + ) + assert "is empty or contains only header lines" in str(result.exception), ( + f"Unexpected error: {result.exception}" + ) + + +def test_extra_field(tmpdir): + """Test validation with an additional --field column.""" + bins_path = tmpdir.join("bins.bed") + pairs_path = tmpdir.join("pairs.txt") + out_path = tmpdir.join("out.cool") + + bins_path.write("chr1\t0\t1000000\nchr2\t0\t1000000\n") + pairs_path.write("chr1\t1000\tchr1\t2000\t1.5\nchr1\t1500\tchr2\t3000\t2.0\n") + + result = runner.invoke(pairs, [ + "-c1", "1", + "-p1", "2", + "-c2", "3", + "-p2", "4", + "--field", "value=5", + str(bins_path), + str(pairs_path), + str(out_path), + ]) + + assert result.exit_code == 0, f"Command failed: {result.output}" + + # Test invalid extra field + result = runner.invoke(pairs, [ + "-c1", "1", + "-p1", "2", + "-c2", "3", + "-p2", "4", + "--field", "value=6", + str(bins_path), + str(pairs_path), + str(out_path), + ]) + + assert result.exit_code != 0, "Command should have failed" + assert isinstance(result.exception, ValueError), ( + f"Expected ValueError, got {result.exception}" + ) + assert ( + "Column index 6 (value) exceeds number of columns (5)" in str(result.exception) + ), ( + f"Unexpected error: {result.exception}" + ) diff --git a/tests/test_util.py b/tests/test_util.py index b3f94be..dcf1826 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -1,5 +1,5 @@ import os.path as op -from io import BytesIO +from io import BytesIO, StringIO import h5py import numpy as np @@ -166,6 +166,26 @@ def test_read_chromsizes(): util.read_chromsizes(op.join(datadir, "toy.chrom.sizes")) +def test_read_chromsizes_bad_input(): + broken_data = "chr1\t1000\nchr2\tbad_value\nchr3\t2000\n" + broken_file = StringIO(broken_data) + with pytest.raises(ValueError): + util.read_chromsizes(broken_file) + + +def test_read_chromsizes_bad_delimiter(): + broken_data = "chr1 1000\nchr2 bad_value\nchr3 2000\n" + broken_file = StringIO(broken_data) + with pytest.raises(ValueError): + util.read_chromsizes(broken_file) + + +# Main function to run the tests +if __name__ == "__main__": + pytest.main([__file__]) + + + # def test_fetch_chromsizes(): # util.fetch_chromsizes("hg19")