Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile.targets
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ deploy-docs: ## Publish and deploy the documentation to the live Github page
echo "Skipping publication to Github Pages."; \
echo " "; \
;; \
esac; \
esac; \
Empty file.
189 changes: 189 additions & 0 deletions geospatial_tools/radar/nimrod.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import gzip
import shutil
from pathlib import Path
from typing import Any, Generator

import numpy as np
import pandas as pd
import xarray as xr
from iris.analysis import MEAN
from iris.cube import Cube, CubeList
from iris.fileformats import netcdf
from iris.fileformats.nimrod import load_cubes

from geospatial_tools.utils import parse_gzip_header

FIVE_MIN = np.timedelta64(5, "m")


def extract_nimrod_from_archive(archive_file_path: str | Path, output_directory: str | Path = None) -> Path:
"""
Extract nimrod data from an archive file. If no output directory is provided, the extracted data will be saved to
the archive file's directory.

Args:
archive_file_path: Path to the archive file
output_directory: Optional output directory.

Returns:
Path to the extracted nimrod data file
"""
if isinstance(archive_file_path, str):
archive_file_path = Path(archive_file_path)
full_path = archive_file_path.resolve()
parent_folder = archive_file_path.parent
filename = archive_file_path.stem

target_folder = None
if output_directory:
if isinstance(output_directory, str):
output_directory = Path(output_directory)
target_folder = output_directory

if not target_folder:
target_folder = parent_folder / filename

target_folder.mkdir(parents=True, exist_ok=True)
gzip_file_headers = parse_gzip_header(archive_file_path)
contained_filename = gzip_file_headers["original_name"]

with gzip.open(full_path, "rb") as f_in:
if not contained_filename:
contained_filename = filename
print(f"Filename {contained_filename}")
out_path = target_folder / contained_filename

with open(out_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)

return out_path


def load_nimrod_cubes(filenames: list[str | Path]) -> Generator[Cube | Any, Any, None]:
"""

Args:
filenames:
output_name:

Returns:

"""
cubes = load_cubes(filenames)
# cubes = CubeList(cubes)
# merged_cubes = cubes.merge_cube()
# mean_cube = merged_cubes.collapsed("time", MEAN)
# netcdf.save(mean_cube, output_name)
# print(mean_cube)
return cubes


def load_nimrod_from_archive(filename):
"""

Args:
filename:

Returns:

"""
nimrod_extracted_folder = extract_nimrod_from_archive(filename)
file_list = nimrod_extracted_folder.glob("*")
cubes = load_nimrod_cubes(file_list)
return cubes


def merge_nimrod_cubes(cubes):
"""

Args:
cubes:

Returns:

"""
cubes = CubeList(cubes)
merged_cubes = cubes.merge_cube()
mean_cube = merged_cubes.collapsed("time", MEAN)
return mean_cube


def write_cube_to_file(cube, output_name):
"""
Save a nimrod cube to a Netcdf file.

Args:
cube:
output_name:

Returns:
"""
netcdf.save(cube, output_name)


def assert_dataset_time_dim_is_valid(dataset: xr.Dataset, time_dimension_name: str = "time") -> None:
"""
Ths function checks that the time dimension of a given dataset :
- Is composed of 5-minute time bins - Which is the native Nimrod format
- Contains a continuous time series, without any holes - which would lead to false statistics when resampling

Args:
dataset: Merged nimrod cube
time_dimension_name: Name of the time dimension

Returns:
Bool value indicating if the time bins are 5 minutes long and if there are no
gaps in the time series
"""
dataset_time_dimension = dataset[time_dimension_name]
if not dataset_time_dimension.to_index().is_monotonic_increasing:
raise AssertionError("Time is not sorted ascending")
if not dataset_time_dimension.to_index().is_unique:
duplicates = dataset_time_dimension.to_index()[dataset_time_dimension.to_index().duplicated(keep=False)]
raise AssertionError(f"Duplicate timestamps present: {duplicates[:10]} ...")

difference_between_timesteps = dataset_time_dimension.diff(time_dimension_name)
if (difference_between_timesteps != FIVE_MIN).any():
larger_time_gaps = np.nonzero((difference_between_timesteps != FIVE_MIN).compute())[0][:5]
raise AssertionError(
f"Non-5min gaps at positions {larger_time_gaps} "
f"(examples: {difference_between_timesteps.isel({time_dimension_name: larger_time_gaps}).values})"
)

start = pd.Timestamp(dataset_time_dimension.values[0])
end = pd.Timestamp(dataset_time_dimension.values[-1])
expected_index = pd.date_range(start=start, end=end, freq="5min", inclusive="both")
dataset_index = dataset_time_dimension.to_index()
missing_indexes = expected_index.difference(dataset_index)
if len(missing_indexes) > 0:
raise AssertionError(f"missing {len(missing_indexes)} stamps; first few: {missing_indexes[:10]}")


def resample_nimrod_timebox_30min_bins(filenames, output_name):
"""
This will resample nimrod data's bins to 30-minute interval instead of their
normal 5-minute interval. It uses a mean resampling, and creates time bins like
follows :

ex. [[09h00, < 9h05], [09h05, < 9h10], ... ] -> [[09h00, < 9h30], [09h30, < 10h], ... ]

Args:
filenames:
output_name:

Returns:

"""
ds = xr.open_mfdataset(filenames, combine="nested", concat_dim="time")
ds_30min = ds.resample(time="30min").mean()
ds_30min.to_netcdf(output_name)


extract_nimrod_from_archive(
"/home/francispelletier/projects/geospatial_tools/data/metoffice-c-band-rain-radar_uk_20180101_1km-composite.dat.gz/metoffice-c-band-rain-radar_uk_201801012355_1km-composite.dat.gz"
)
# cubes = load_nimrod_cubes(
# filenames="/home/francispelletier/projects/geospatial_tools/data/metoffice-c-band-rain-radar_uk_20180101_1km-composite.dat.gz/metoffice-c-band-rain-radar_uk_201801012355_1km-composite.dat/metoffice-c-band-rain-radar_uk_201801012355_1km-composite.dat",
# output_name="nimrod_cubes_30min")
# for cube in cubes:
# print(cube)
14 changes: 9 additions & 5 deletions geospatial_tools/raster.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,12 @@ def clip_raster_with_polygon(
Args:
raster_image: Path to raster image to be clipped.
polygon_layer: Polygon layer which polygons will be used to clip the raster image.
base_output_filename: Base filename for outputs. If `None`, will be taken from input polygon layer.
base_output_filename: Base filename for outputs. If `None`, will be taken from
input polygon layer.
output_dir: Directory path where output will be written.
num_of_workers: The number of processes to use for parallel execution. Defaults to `cpu_count()`.
num_of_workers: The number of processes to use for parallel execution. If using
on a compute cluster, please set a specific amount (ex. 1 per CPU core requested).
Defaults to `cpu_count()`.
logger: Logger instance

Returns:
Expand Down Expand Up @@ -260,12 +263,13 @@ def merge_raster_bands(
merged_filename: Name of output raster file.
merged_metadata: Dictionary of metadata to use if you prefer to great it independently.
merged_band_names: Names of final output raster bands. For example : I have 3 images representing each
a single band; raster_file_list = ["image01_B0.tif", "image01_B1.tif", "image01_B2.tif"].
With, merged_band_names, individual band id can be assigned for the final output raster;
["B0", "B1", "B2"].
a single band; raster_file_list = ["image01_B0.tif", "image01_B1.tif", "image01_B2.tif"].
With, merged_band_names, individual band id can be assigned for the final output raster;
["B0", "B1", "B2"].
logger: Logger instance

Returns:
Path to merged raster
"""
if not merged_metadata:
merged_metadata = create_merged_raster_bands_metadata(raster_file_list)
Expand Down
98 changes: 98 additions & 0 deletions geospatial_tools/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
"""This module contains general utility functions."""

from __future__ import annotations

import calendar
import datetime
import json
import logging
import os
import struct
import sys
import zipfile
from pathlib import Path
from typing import Any, Optional

import requests
import yaml
Expand All @@ -17,6 +21,13 @@

GEOPACKAGE_DRIVER = "GPKG"

# GZIP header positions
FTEXT = 0x01
FHCRC = 0x02
FEXTRA = 0x04
FNAME = 0x08
FCOMMENT = 0x10


def create_logger(logger_name: str) -> logging.Logger:
"""
Expand Down Expand Up @@ -254,3 +265,90 @@ def create_date_range_for_specific_period(
end_date = datetime.datetime(year + year_bump, end_month_range, last_day, 23, 59, 59)
date_ranges.append(f"{start_date.isoformat()}Z/{end_date.isoformat()}Z")
return date_ranges


def _read_cstring(f) -> bytes:
"""Read a null-terminated byte string from the current position."""
out = bytearray()
while True:
b = f.read(1)
if not b: # EOF
break
if b == b"\x00": # terminator
break
out += b
return bytes(out)


def parse_gzip_header(path: str | Path) -> dict[str, Any]:
"""
Parse the gzip header at the beginning of `path` (first member only).

Raises ValueError if file doesn't look like gzip.

Args:
path: Path to gzip file

Returns:
dict: Returns a dict with keys
- compression_method (int)
- flags (int)
- mtime (int, Unix epoch or 0)
- xflags (int)
- os (int)
- original_name (Optional[str]) # FNAME
- comment (Optional[str]) # FCOMMENT
- header_end_offset (int) # file offset where compressed data starts
"""
p = Path(path)
with p.open("rb") as f:
# Magic
if f.read(2) != b"\x1f\x8b":
raise ValueError(f"{p} is not a gzip file (bad magic)")

method_b = f.read(1)
flags_b = f.read(1)
if not method_b or not flags_b:
raise ValueError("Truncated header")

compression_method = method_b[0]
flags = flags_b[0]

# MTIME(4), XFL(1), OS(1)
mtime_bytes = f.read(4)
if len(mtime_bytes) != 4:
raise ValueError("Truncated header (mtime)")
mtime = struct.unpack("<I", mtime_bytes)[0]
xflags = f.read(1)[0]
os_code = f.read(1)[0]

# Optional fields in order
if flags & FEXTRA:
xlen_bytes = f.read(2)
if len(xlen_bytes) != 2:
raise ValueError("Truncated FEXTRA length")
xlen = struct.unpack("<H", xlen_bytes)[0]
_ = f.read(xlen) # skip payload

original_name: Optional[str] = None
if flags & FNAME:
# Historically ISO-8859-1; utf-8 with replace is pragmatic
original_name = _read_cstring(f).decode("utf-8", errors="replace")

comment: Optional[str] = None
if flags & FCOMMENT:
comment = _read_cstring(f).decode("utf-8", errors="replace")

if flags & FHCRC:
_ = f.read(2) # skip header CRC16

return {
"compression_method": compression_method,
"flags": flags,
"mtime": mtime,
"xflags": xflags,
"os": os_code,
"original_name": original_name,
"comment": comment,
"header_end_offset": f.tell(),
}
4 changes: 3 additions & 1 deletion geospatial_tools/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ def select_polygons_by_location(
select_features_from: GeoDataFrame containing the polygons from which to select features from.
intersected_with: Geodataframe containing the polygons that will be used to select features with via an intersect
operation.
num_of_workers: Number of parallel processes to use for execution. Defaults to the min of number of CPU cores
num_of_workers: Number of parallel processes to use for execution. If using
on a compute cluster, please set a specific amount (ex. 1 per CPU core requested).
Defaults to the min of number of CPU cores
or number (cpu_count())
join_type:
predicate: The predicate to use for selecting features from. Available predicates are:
Expand Down
Loading
Loading