From abd5f168032167ef552cf5692e878e221faa9c3e Mon Sep 17 00:00:00 2001 From: mrbean-bremen Date: Sun, 30 Nov 2025 13:59:05 +0100 Subject: [PATCH] More fixes for functional group handling - fixed combining of errors in shared and per/frame groups - improved handling of conditions that refer to tags in other func group sequences --- CHANGES.md | 1 + .../test_iod_validator_func_groups.py | 96 ++++++++++++++++--- dicom_validator/validator/iod_validator.py | 70 +++++++++++--- 3 files changed, 143 insertions(+), 24 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 381b8f2..8e02a18 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -24,6 +24,7 @@ In line with pydicom, support for Python 3.9 will be removed in that version. as errors twice (see [#196](../../issues/196)) * fixed handling of enum values for a specific tag * fixed handling of multiple definitions for referenced enum values +* fixed handling of conditions in functional groups (see [#226](../../issues/226)) ### Infrastructure * added Python 3.14 to CI (currently needs development version of `pydicom`) diff --git a/dicom_validator/tests/validator/test_iod_validator_func_groups.py b/dicom_validator/tests/validator/test_iod_validator_func_groups.py index 49cf3b6..0973cb0 100644 --- a/dicom_validator/tests/validator/test_iod_validator_func_groups.py +++ b/dicom_validator/tests/validator/test_iod_validator_func_groups.py @@ -101,18 +101,9 @@ def validator(dicom_info, request): ] } -FRAME_VOI_LUT = { - "FrameVOILUTSequence": [{"WindowCenter": "7200", "WindowWidth": "12800"}] -} FRAME_CONTENT = {"FrameContentSequence": [{"FrameReferenceDateTime": "200001011200"}]} -PIXEL_MEASURES = {"PixelMeasuresSequence": [{"PixelSpacing": "0.1\\0.1"}]} - -TIMING_RELATED_PARAMS = { - "MRTimingAndRelatedParametersSequence": [{"RFEchoTrainLength": None}] -} - class TestIODValidatorFuncGroups: """Tests IODValidator for functional groups.""" @@ -160,7 +151,9 @@ def test_empty_func_groups(self, validator, caplog): ) @pytest.mark.shared_macros([FRAME_ANATOMY]) - @pytest.mark.per_frame_macros([FRAME_VOI_LUT]) + @pytest.mark.per_frame_macros( + [{"FrameVOILUTSequence": [{"WindowCenter": "7200", "WindowWidth": "12800"}]}] + ) def test_missing_sequences(self, validator, caplog): caplog.set_level(logging.WARNING) result = validator.validate() @@ -258,7 +251,9 @@ def test_macro_not_allowed_in_shared_group(self, validator, caplog): ) @pytest.mark.shared_macros([]) - @pytest.mark.per_frame_macros([PIXEL_MEASURES]) + @pytest.mark.per_frame_macros( + [{"PixelMeasuresSequence": [{"PixelSpacing": "0.1\\0.1"}]}] + ) def test_macro_not_allowed_in_per_frame_group(self, validator, caplog): caplog.set_level(logging.WARNING) validator._dataset.SOPClassUID = uid.VLWholeSlideMicroscopyImageStorage @@ -276,7 +271,9 @@ def test_macro_not_allowed_in_per_frame_group(self, validator, caplog): in messages ) - @pytest.mark.shared_macros([TIMING_RELATED_PARAMS]) + @pytest.mark.shared_macros( + [{"MRTimingAndRelatedParametersSequence": [{"RFEchoTrainLength": None}]}] + ) @pytest.mark.per_frame_macros([FRAME_CONTENT]) def test_empty_tag_in_shared_group(self, validator): # regression test for KeyError in this case @@ -287,3 +284,78 @@ def test_empty_tag_in_shared_group(self, validator): assert has_tag_error( result, "MR Timing and Related Parameters", 0x0018_9240, ErrorCode.TagEmpty ) + + @pytest.mark.shared_macros( + [ + { + "MRModifierSequence": [ + {"Spoiling": "NONE", "PartialFourierDirection": "PHASE"} + ] + } + ] + ) + @pytest.mark.per_frame_macros( + [ + { + "EchoPulseSequence": "GRADIENT", + "MRImageFrameTypeSequence": [ + {"FrameType": "ORIGINAL\\PRIMARY\\M\\NONE"} + ], + } + ] + ) + def test_shared_group_tag_condition_depending_on_per_frame_group_tag( + self, validator + ): + # regression test for #226 + validator._dataset.SOPClassUID = uid.EnhancedMRImageStorage + result = validator.validate() + # Spoiling in Shared Group, condition depends on FrameType in Per-Frame Group + assert not has_tag_error( + result, "MR Modifier", 0x0018_9016, ErrorCode.TagNotAllowed + ) + # Partial Fourier Direction in Shared Group, condition depends on FrameType + # in Per-Frame Group, which has the wrong value + assert has_tag_error( + result, "MR Modifier", 0x0018_9036, ErrorCode.TagNotAllowed + ) + # Partial Fourier missing with FrameType ORIGINAL + assert has_tag_error(result, "MR Modifier", 0x0018_9081, ErrorCode.TagMissing) + + @pytest.mark.shared_macros( + [ + { + "EchoPulseSequence": "GRADIENT", + "MRImageFrameTypeSequence": [ + {"FrameType": "ORIGINAL\\PRIMARY\\M\\NONE"} + ], + } + ] + ) + @pytest.mark.per_frame_macros( + [ + { + "MRModifierSequence": [ + {"Spoiling": "NONE", "PartialFourierDirection": "PHASE"} + ] + } + ] + ) + def test_per_frame_group_tag_condition_depending_on_shared_group_tag( + self, validator + ): + validator._dataset.SOPClassUID = uid.EnhancedMRImageStorage + validator._dataset.ImageType = "ORIGINAL\\PRIMARY" + result = validator.validate() + # Spoiling in Per-Frame Group, condition depends on FrameType + # and Echo Pulse Sequence in Shared Group, which have the correct value + assert not has_tag_error( + result, "MR Modifier", 0x0018_9016, ErrorCode.TagNotAllowed + ) + # Partial Fourier Direction in Per-Frame Group, condition depends on FrameType + # in Shared Group, which has the wrong value + assert has_tag_error( + result, "MR Modifier", 0x0018_9036, ErrorCode.TagNotAllowed + ) + # Partial Fourier missing with FrameType ORIGINAL + assert has_tag_error(result, "MR Modifier", 0x0018_9081, ErrorCode.TagMissing) diff --git a/dicom_validator/validator/iod_validator.py b/dicom_validator/validator/iod_validator.py index 8205c12..a638085 100644 --- a/dicom_validator/validator/iod_validator.py +++ b/dicom_validator/validator/iod_validator.py @@ -7,7 +7,7 @@ from pydicom import config, Sequence, Dataset, DataElement from pydicom.multival import MultiValue from pydicom.tag import BaseTag, Tag -from pydicom.valuerep import validate_value +from pydicom.valuerep import validate_value, VR from dicom_validator.spec_reader.condition import ( ConditionType, @@ -39,10 +39,14 @@ def __init__( dataset: Dataset, tag: BaseTag | None = None, stack: list[BaseTag] | None = None, + in_func_group: bool = False, ) -> None: self.dataset = dataset self.tag = tag self.stack = stack + self.in_func_group = in_func_group + if not in_func_group: + self.in_func_group = tag in (0x5200_9229, 0x5200_9230) if tag is not None: if stack is None: self.stack = [tag] @@ -104,11 +108,13 @@ def combined( elif error.code == ErrorCode.TagMissing: # for missing tags, we also have to check if the error does not appear # in the per-frame group because it is part of a missing sequence + handled_tags = [] for per_frame_tag in per_frame: - if per_frame_tag.tag in [t.tag for t in shared]: + if per_frame_tag.parents and tag.tag in per_frame_tag.parents[1:]: result[per_frame_tag] = per_frame[per_frame_tag] - del per_frame[per_frame_tag] - break + handled_tags.append(per_frame_tag) + for handled_tag in handled_tags: + del per_frame[handled_tag] else: # other errors (unexpected tag, missing value) shall always remain result[tag] = shared[tag] @@ -116,11 +122,13 @@ def combined( for tag, error in per_frame.items(): if error.code == ErrorCode.TagMissing: # same check as above + handled_tags = [] for shared_tag in shared: - if shared_tag.tag in [t.tag for t in per_frame]: + if shared_tag.parents and tag.tag in shared_tag.parents[1:]: result[shared_tag] = shared[shared_tag] - del shared[shared_tag] - break + handled_tags.append(shared_tag) + for handled_tag in handled_tags: + del shared[handled_tag] else: result[tag] = per_frame[tag] return result @@ -374,6 +382,7 @@ def _validate_attributes( sq_item_dataset, tag_id.tag, self._dataset_stack[-1].stack, + self._dataset_stack[-1].in_func_group, ) ) # the item attributes are only created at this point, @@ -566,8 +575,7 @@ def _matches_condition(self, condition: dict[str, Any]) -> bool: return self._tag_exists(tag_id) elif operator == ConditionOperator.Absent: return not self._tag_exists(tag_id) - elif self._tag_exists(tag_id): - data_element = self._lookup_tag(tag_id) + elif data_element := self._lookup_tag(tag_id): assert data_element is not None index = condition["index"] if index > 0: @@ -635,10 +643,48 @@ def _get_existing_tags_of_module( existing_tag_ids.add(tag_id) return existing_tag_ids - def _lookup_tag(self, tag_id: DicomTag) -> DataElement | None: + def _lookup_tag_in_func_group( + self, tag: DicomTag, seq_tag: int + ) -> DataElement | None: + """Lookup a tag in the functional group if it wasn't directly found + at the current level (e.g. the current sequence). + """ + dataset = self._dataset_stack[0].dataset + if seq_tag not in dataset: + return None + if not len(dataset[seq_tag].value): + return None + seq_item: Dataset = dataset[seq_tag].value[0] + # the tag may be a top-level sequence + if tag.tag in seq_item: + return seq_item[tag.tag] + # otherwise, only check top-level tags in all sequences + # as only these are referenced in conditions + for seq in seq_item.values(): + if seq.VR != VR.SQ or not len(seq.value): + continue + item: Dataset = seq.value[0] + if tag.tag in item: + return item[tag.tag] + return None + + def _lookup_tag_in_func_groups(self, tag: DicomTag) -> DataElement | None: + """Lookup a tag in the functional groups if it wasn't directly found + at the current level (e.g. thr current sequence). + """ + if tag.parents is None: + return None + if element := self._lookup_tag_in_func_group(tag, tag.parents[0]): + return element + other_group_tag = 0x5200_9230 if tag.parents[0] == 0x5200_9229 else 0x5200_9229 + return self._lookup_tag_in_func_group(tag, other_group_tag) + + def _lookup_tag(self, tag: DicomTag) -> DataElement | None: for stack_item in reversed(self._dataset_stack): - if tag_id.tag in stack_item.dataset: - return stack_item.dataset[tag_id.tag] + if tag.tag in stack_item.dataset: + return stack_item.dataset[tag.tag] + if self._dataset_stack[-1].in_func_group: + return self._lookup_tag_in_func_groups(tag) return None def _tag_exists(self, tag_id: DicomTag) -> bool: