Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ In line with pydicom, support for Python 3.9 will be removed in that version.
as errors twice (see [#196](../../issues/196))
* fixed handling of enum values for a specific tag
* fixed handling of multiple definitions for referenced enum values
* fixed handling of conditions in functional groups (see [#226](../../issues/226))

### Infrastructure
* added Python 3.14 to CI (currently needs development version of `pydicom`)
Expand Down
96 changes: 84 additions & 12 deletions dicom_validator/tests/validator/test_iod_validator_func_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,9 @@ def validator(dicom_info, request):
]
}

FRAME_VOI_LUT = {
"FrameVOILUTSequence": [{"WindowCenter": "7200", "WindowWidth": "12800"}]
}

FRAME_CONTENT = {"FrameContentSequence": [{"FrameReferenceDateTime": "200001011200"}]}

PIXEL_MEASURES = {"PixelMeasuresSequence": [{"PixelSpacing": "0.1\\0.1"}]}

TIMING_RELATED_PARAMS = {
"MRTimingAndRelatedParametersSequence": [{"RFEchoTrainLength": None}]
}


class TestIODValidatorFuncGroups:
"""Tests IODValidator for functional groups."""
Expand Down Expand Up @@ -160,7 +151,9 @@ def test_empty_func_groups(self, validator, caplog):
)

@pytest.mark.shared_macros([FRAME_ANATOMY])
@pytest.mark.per_frame_macros([FRAME_VOI_LUT])
@pytest.mark.per_frame_macros(
[{"FrameVOILUTSequence": [{"WindowCenter": "7200", "WindowWidth": "12800"}]}]
)
def test_missing_sequences(self, validator, caplog):
caplog.set_level(logging.WARNING)
result = validator.validate()
Expand Down Expand Up @@ -258,7 +251,9 @@ def test_macro_not_allowed_in_shared_group(self, validator, caplog):
)

@pytest.mark.shared_macros([])
@pytest.mark.per_frame_macros([PIXEL_MEASURES])
@pytest.mark.per_frame_macros(
[{"PixelMeasuresSequence": [{"PixelSpacing": "0.1\\0.1"}]}]
)
def test_macro_not_allowed_in_per_frame_group(self, validator, caplog):
caplog.set_level(logging.WARNING)
validator._dataset.SOPClassUID = uid.VLWholeSlideMicroscopyImageStorage
Expand All @@ -276,7 +271,9 @@ def test_macro_not_allowed_in_per_frame_group(self, validator, caplog):
in messages
)

@pytest.mark.shared_macros([TIMING_RELATED_PARAMS])
@pytest.mark.shared_macros(
[{"MRTimingAndRelatedParametersSequence": [{"RFEchoTrainLength": None}]}]
)
@pytest.mark.per_frame_macros([FRAME_CONTENT])
def test_empty_tag_in_shared_group(self, validator):
# regression test for KeyError in this case
Expand All @@ -287,3 +284,78 @@ def test_empty_tag_in_shared_group(self, validator):
assert has_tag_error(
result, "MR Timing and Related Parameters", 0x0018_9240, ErrorCode.TagEmpty
)

@pytest.mark.shared_macros(
[
{
"MRModifierSequence": [
{"Spoiling": "NONE", "PartialFourierDirection": "PHASE"}
]
}
]
)
@pytest.mark.per_frame_macros(
[
{
"EchoPulseSequence": "GRADIENT",
"MRImageFrameTypeSequence": [
{"FrameType": "ORIGINAL\\PRIMARY\\M\\NONE"}
],
}
]
)
def test_shared_group_tag_condition_depending_on_per_frame_group_tag(
self, validator
):
# regression test for #226
validator._dataset.SOPClassUID = uid.EnhancedMRImageStorage
result = validator.validate()
# Spoiling in Shared Group, condition depends on FrameType in Per-Frame Group
assert not has_tag_error(
result, "MR Modifier", 0x0018_9016, ErrorCode.TagNotAllowed
)
# Partial Fourier Direction in Shared Group, condition depends on FrameType
# in Per-Frame Group, which has the wrong value
assert has_tag_error(
result, "MR Modifier", 0x0018_9036, ErrorCode.TagNotAllowed
)
# Partial Fourier missing with FrameType ORIGINAL
assert has_tag_error(result, "MR Modifier", 0x0018_9081, ErrorCode.TagMissing)

@pytest.mark.shared_macros(
[
{
"EchoPulseSequence": "GRADIENT",
"MRImageFrameTypeSequence": [
{"FrameType": "ORIGINAL\\PRIMARY\\M\\NONE"}
],
}
]
)
@pytest.mark.per_frame_macros(
[
{
"MRModifierSequence": [
{"Spoiling": "NONE", "PartialFourierDirection": "PHASE"}
]
}
]
)
def test_per_frame_group_tag_condition_depending_on_shared_group_tag(
self, validator
):
validator._dataset.SOPClassUID = uid.EnhancedMRImageStorage
validator._dataset.ImageType = "ORIGINAL\\PRIMARY"
result = validator.validate()
# Spoiling in Per-Frame Group, condition depends on FrameType
# and Echo Pulse Sequence in Shared Group, which have the correct value
assert not has_tag_error(
result, "MR Modifier", 0x0018_9016, ErrorCode.TagNotAllowed
)
# Partial Fourier Direction in Per-Frame Group, condition depends on FrameType
# in Shared Group, which has the wrong value
assert has_tag_error(
result, "MR Modifier", 0x0018_9036, ErrorCode.TagNotAllowed
)
# Partial Fourier missing with FrameType ORIGINAL
assert has_tag_error(result, "MR Modifier", 0x0018_9081, ErrorCode.TagMissing)
70 changes: 58 additions & 12 deletions dicom_validator/validator/iod_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pydicom import config, Sequence, Dataset, DataElement
from pydicom.multival import MultiValue
from pydicom.tag import BaseTag, Tag
from pydicom.valuerep import validate_value
from pydicom.valuerep import validate_value, VR

from dicom_validator.spec_reader.condition import (
ConditionType,
Expand Down Expand Up @@ -39,10 +39,14 @@ def __init__(
dataset: Dataset,
tag: BaseTag | None = None,
stack: list[BaseTag] | None = None,
in_func_group: bool = False,
) -> None:
self.dataset = dataset
self.tag = tag
self.stack = stack
self.in_func_group = in_func_group
if not in_func_group:
self.in_func_group = tag in (0x5200_9229, 0x5200_9230)
if tag is not None:
if stack is None:
self.stack = [tag]
Expand Down Expand Up @@ -104,23 +108,27 @@ def combined(
elif error.code == ErrorCode.TagMissing:
# for missing tags, we also have to check if the error does not appear
# in the per-frame group because it is part of a missing sequence
handled_tags = []
for per_frame_tag in per_frame:
if per_frame_tag.tag in [t.tag for t in shared]:
if per_frame_tag.parents and tag.tag in per_frame_tag.parents[1:]:
result[per_frame_tag] = per_frame[per_frame_tag]
del per_frame[per_frame_tag]
break
handled_tags.append(per_frame_tag)
for handled_tag in handled_tags:
del per_frame[handled_tag]
else:
# other errors (unexpected tag, missing value) shall always remain
result[tag] = shared[tag]

for tag, error in per_frame.items():
if error.code == ErrorCode.TagMissing:
# same check as above
handled_tags = []
for shared_tag in shared:
if shared_tag.tag in [t.tag for t in per_frame]:
if shared_tag.parents and tag.tag in shared_tag.parents[1:]:
result[shared_tag] = shared[shared_tag]
del shared[shared_tag]
break
handled_tags.append(shared_tag)
for handled_tag in handled_tags:
del shared[handled_tag]
else:
result[tag] = per_frame[tag]
return result
Expand Down Expand Up @@ -374,6 +382,7 @@ def _validate_attributes(
sq_item_dataset,
tag_id.tag,
self._dataset_stack[-1].stack,
self._dataset_stack[-1].in_func_group,
)
)
# the item attributes are only created at this point,
Expand Down Expand Up @@ -566,8 +575,7 @@ def _matches_condition(self, condition: dict[str, Any]) -> bool:
return self._tag_exists(tag_id)
elif operator == ConditionOperator.Absent:
return not self._tag_exists(tag_id)
elif self._tag_exists(tag_id):
data_element = self._lookup_tag(tag_id)
elif data_element := self._lookup_tag(tag_id):
assert data_element is not None
index = condition["index"]
if index > 0:
Expand Down Expand Up @@ -635,10 +643,48 @@ def _get_existing_tags_of_module(
existing_tag_ids.add(tag_id)
return existing_tag_ids

def _lookup_tag(self, tag_id: DicomTag) -> DataElement | None:
def _lookup_tag_in_func_group(
self, tag: DicomTag, seq_tag: int
) -> DataElement | None:
"""Lookup a tag in the functional group if it wasn't directly found
at the current level (e.g. the current sequence).
"""
dataset = self._dataset_stack[0].dataset
if seq_tag not in dataset:
return None
if not len(dataset[seq_tag].value):
return None
seq_item: Dataset = dataset[seq_tag].value[0]
# the tag may be a top-level sequence
if tag.tag in seq_item:
return seq_item[tag.tag]
# otherwise, only check top-level tags in all sequences
# as only these are referenced in conditions
for seq in seq_item.values():
if seq.VR != VR.SQ or not len(seq.value):
continue
item: Dataset = seq.value[0]
if tag.tag in item:
return item[tag.tag]
return None

def _lookup_tag_in_func_groups(self, tag: DicomTag) -> DataElement | None:
"""Lookup a tag in the functional groups if it wasn't directly found
at the current level (e.g. thr current sequence).
"""
if tag.parents is None:
return None
if element := self._lookup_tag_in_func_group(tag, tag.parents[0]):
return element
other_group_tag = 0x5200_9230 if tag.parents[0] == 0x5200_9229 else 0x5200_9229
return self._lookup_tag_in_func_group(tag, other_group_tag)

def _lookup_tag(self, tag: DicomTag) -> DataElement | None:
for stack_item in reversed(self._dataset_stack):
if tag_id.tag in stack_item.dataset:
return stack_item.dataset[tag_id.tag]
if tag.tag in stack_item.dataset:
return stack_item.dataset[tag.tag]
if self._dataset_stack[-1].in_func_group:
return self._lookup_tag_in_func_groups(tag)
return None

def _tag_exists(self, tag_id: DicomTag) -> bool:
Expand Down