-
Notifications
You must be signed in to change notification settings - Fork 90
Add EncodingFormat for FHIR files #883
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 20 commits
fe626df
4501421
5d82a63
1dd393f
92a9c75
cc18426
265e93f
1ee3986
30fde7c
350e9a5
ed78906
a4dce21
3fb1277
bf76353
3504bf6
5e5b9b2
062ab96
5c790b0
d0f36f6
c331ae3
238bedd
469e870
d88f892
a351116
9b94d70
7f73bc6
741bdfa
18895c0
15c49b0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| {"serviceRequest/authoredOn": "2019-06-04", "serviceRequest/identifier_value": "1", "serviceRequest/intent": "plan", "serviceRequest/performer_display": "null", "serviceRequest/performer_reference": "0", "serviceRequest/performer_type": "Organization", "serviceRequest/reasonCode_text": "Abortion", "serviceRequest/requester_display": "Ngarenairobi RC Health centre", "serviceRequest/requester_reference": "2", "serviceRequest/requester_type": "Organization", "serviceRequest/resourceType": "ServiceRequest", "serviceRequest/status": "unknown", "serviceRequest/subject_reference": "0QH6R84NZEVZ6FD87G94UDQ1NT1HWK", "serviceRequest/subject_type": "Patient"} | ||
| {"serviceRequest/authoredOn": "2019-06-04", "serviceRequest/identifier_value": "2", "serviceRequest/intent": "plan", "serviceRequest/performer_display": "null", "serviceRequest/performer_reference": "2", "serviceRequest/performer_type": "Organization", "serviceRequest/reasonCode_text": "Other", "serviceRequest/requester_display": "Ngarenairobi RC Health centre", "serviceRequest/requester_reference": "2", "serviceRequest/requester_type": "Organization", "serviceRequest/resourceType": "ServiceRequest", "serviceRequest/status": "unknown", "serviceRequest/subject_reference": "DA8DV5VNC520V4AW0DD4PY0TVFJLXG", "serviceRequest/subject_type": "Patient"} | ||
| {"serviceRequest/authoredOn": "2019-06-12", "serviceRequest/identifier_value": "3", "serviceRequest/intent": "plan", "serviceRequest/performer_display": "null", "serviceRequest/performer_reference": "0", "serviceRequest/performer_type": "Organization", "serviceRequest/reasonCode_text": "Abortion", "serviceRequest/requester_display": "Ngarenairobi RC Health centre", "serviceRequest/requester_reference": "2", "serviceRequest/requester_type": "Organization", "serviceRequest/resourceType": "ServiceRequest", "serviceRequest/status": "unknown", "serviceRequest/subject_reference": "9PAXV9DHENCMAL0MD9WCLGF6DUALRZ", "serviceRequest/subject_type": "Patient"} | ||
| {"serviceRequest/authoredOn": "2019-06-14", "serviceRequest/identifier_value": "4", "serviceRequest/intent": "plan", "serviceRequest/performer_display": "null", "serviceRequest/performer_reference": "2", "serviceRequest/performer_type": "Organization", "serviceRequest/reasonCode_text": "Abortion", "serviceRequest/requester_display": "Magugu Health Centre", "serviceRequest/requester_reference": "6", "serviceRequest/requester_type": "Organization", "serviceRequest/resourceType": "ServiceRequest", "serviceRequest/status": "unknown", "serviceRequest/subject_reference": "RV98DJ47NE093WQZYUNYR5MKD8RAL2", "serviceRequest/subject_type": "Patient"} | ||
| {"serviceRequest/authoredOn": "2019-06-20", "serviceRequest/identifier_value": "5", "serviceRequest/intent": "plan", "serviceRequest/performer_display": "null", "serviceRequest/performer_reference": "1", "serviceRequest/performer_type": "Organization", "serviceRequest/reasonCode_text": "Abortion", "serviceRequest/requester_display": "Dareda Hospital", "serviceRequest/requester_reference": "3", "serviceRequest/requester_type": "Organization", "serviceRequest/resourceType": "ServiceRequest", "serviceRequest/status": "unknown", "serviceRequest/subject_reference": "YYJM3EF040EDD1Z3Q4DE3RPEXCM9G9", "serviceRequest/subject_type": "Patient"} | ||
| {"serviceRequest/authoredOn": "2019-06-27", "serviceRequest/identifier_value": "6", "serviceRequest/intent": "plan", "serviceRequest/performer_display": "null", "serviceRequest/performer_reference": "1", "serviceRequest/performer_type": "Organization", "serviceRequest/reasonCode_text": "Other", "serviceRequest/requester_display": "Galapo Health centre", "serviceRequest/requester_reference": "7", "serviceRequest/requester_type": "Organization", "serviceRequest/resourceType": "ServiceRequest", "serviceRequest/status": "unknown", "serviceRequest/subject_reference": "5K8JQWC7DM4X2RJM3XYQWRU9EJ8V8P", "serviceRequest/subject_type": "Patient"} | ||
| {"serviceRequest/authoredOn": "2019-06-27", "serviceRequest/identifier_value": "7", "serviceRequest/intent": "plan", "serviceRequest/performer_display": "null", "serviceRequest/performer_reference": "1", "serviceRequest/performer_type": "Organization", "serviceRequest/reasonCode_text": "Other", "serviceRequest/requester_display": "Galapo Health centre", "serviceRequest/requester_reference": "7", "serviceRequest/requester_type": "Organization", "serviceRequest/resourceType": "ServiceRequest", "serviceRequest/status": "unknown", "serviceRequest/subject_reference": "NURM0TRUZV8MC8WFQZPUDUC7PLR2ER", "serviceRequest/subject_type": "Patient"} | ||
| {"serviceRequest/authoredOn": "2019-07-02", "serviceRequest/identifier_value": "8", "serviceRequest/intent": "plan", "serviceRequest/performer_display": "null", "serviceRequest/performer_reference": "1", "serviceRequest/performer_type": "Organization", "serviceRequest/reasonCode_text": "Abortion", "serviceRequest/requester_display": "Bashnet Hospital", "serviceRequest/requester_reference": "4", "serviceRequest/requester_type": "Organization", "serviceRequest/resourceType": "ServiceRequest", "serviceRequest/status": "unknown", "serviceRequest/subject_reference": "8H5N62X4V95G8Q2THPD6X08Y745MAY", "serviceRequest/subject_type": "Patient"} | ||
| {"serviceRequest/authoredOn": "2019-07-02", "serviceRequest/identifier_value": "9", "serviceRequest/intent": "plan", "serviceRequest/performer_display": "null", "serviceRequest/performer_reference": "1", "serviceRequest/performer_type": "Organization", "serviceRequest/reasonCode_text": "Other", "serviceRequest/requester_display": "Galapo Health centre", "serviceRequest/requester_reference": "7", "serviceRequest/requester_type": "Organization", "serviceRequest/resourceType": "ServiceRequest", "serviceRequest/status": "unknown", "serviceRequest/subject_reference": "HYTGMF1Z301UJZE3J3ULPN4PYG5VLL", "serviceRequest/subject_type": "Patient"} | ||
| {"serviceRequest/authoredOn": "2019-07-02", "serviceRequest/identifier_value": "10", "serviceRequest/intent": "plan", "serviceRequest/performer_display": "null", "serviceRequest/performer_reference": "1", "serviceRequest/performer_type": "Organization", "serviceRequest/reasonCode_text": "Other", "serviceRequest/requester_display": "Galapo Health centre", "serviceRequest/requester_reference": "7", "serviceRequest/requester_type": "Organization", "serviceRequest/resourceType": "ServiceRequest", "serviceRequest/status": "unknown", "serviceRequest/subject_reference": "P27Q2AK2U1FDC1455WWHC32PLY763V", "serviceRequest/subject_type": "Patient"} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,20 +1,223 @@ | ||
| """Parse JSON operation.""" | ||
|
|
||
| from typing import Any, TextIO | ||
|
|
||
| import jsonpath_rw | ||
| import orjson | ||
| import pandas as pd | ||
|
|
||
| from mlcroissant._src.core.types import Json | ||
| from mlcroissant._src.structure_graph.nodes.field import Field | ||
| from mlcroissant._src.structure_graph.nodes.source import FileProperty | ||
|
|
||
|
|
||
| def _unwrap_single_item(value: Any) -> Any: | ||
| """Unwraps a single-item list to its value, or returns the value as is.""" | ||
| if isinstance(value, list) and len(value) == 1: | ||
| if value[0] is None: | ||
| return None | ||
| return value[0] | ||
| return value | ||
|
|
||
|
|
||
| def parse_json_content(json: Json, fields: tuple[Field, ...]) -> pd.DataFrame: | ||
| def parse_json_content(json_obj, fields): | ||
| """Parsed all JSONs defined in the fields of RecordSet and outputs a pd.DF.""" | ||
| series = {} | ||
| for field in fields: | ||
| json_path = field.source.extract.json_path | ||
| if json_path is None: | ||
| if not json_path: | ||
| continue | ||
| jsonpath_expression = jsonpath_rw.parse(json_path) | ||
| values = [match.value for match in jsonpath_expression.find(json)] | ||
| series[json_path] = values | ||
| expr = jsonpath_rw.parse(json_path) | ||
| vals = [] | ||
| for match in expr.find(json_obj): | ||
| v = match.value | ||
| # If we got back a one‐item list, unwrap it. | ||
| if isinstance(v, list) and len(v) == 1: | ||
| v = v[0] | ||
| vals.append(v) | ||
| series[json_path] = vals | ||
| return pd.DataFrame(series) | ||
|
|
||
|
|
||
ccl-core marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| class JsonReader: | ||
ccl-core marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Parser for JSON files, supporting both JSONPath and JMESPath expressions.""" | ||
|
|
||
| def __init__(self, fields: tuple[Field, ...]): | ||
| """Initializes the parser with a tuple of Field objects. | ||
| Args: | ||
| fields (tuple[Field, ...]): A tuple of Field objects, each containing | ||
| a source with a JSON path to extract. | ||
| The constructor builds a list of tuples for each field with a valid | ||
| JSON path: | ||
| - The original JSON path string. | ||
| - The engine used for evaluation ("jsonpath" for recursive-descent | ||
| paths, "jmespath" for simple direct paths). | ||
| - The compiled expression object for efficient evaluation. | ||
| Fields without a JSON path are skipped. | ||
| """ | ||
| import jmespath | ||
|
|
||
| # Build a list of (original_jsonpath, engine, compiled_expr). | ||
| self.exprs: list[tuple[str, str, Any]] = [] | ||
| for field in fields: | ||
| json_path = field.source.extract.json_path | ||
| if not json_path: | ||
| continue | ||
|
|
||
| # Decide whether this path can be JMESPath or needs full JSONPath. | ||
| stripped = json_path.lstrip("$.") | ||
| if ".." in json_path: | ||
| # Uses recursive‐descent → fall back to jsonpath_ng. | ||
| expr = jsonpath_rw.parse(json_path) | ||
| engine = "jsonpath" | ||
| else: | ||
| # Simple direct path → use JMESPath. | ||
| expr = jmespath.compile(stripped) | ||
| engine = "jmespath" | ||
|
|
||
| self.exprs.append((json_path, engine, expr)) | ||
| self.fields = fields | ||
|
|
||
| def parse(self, fh: TextIO) -> pd.DataFrame: | ||
| """Parses a JSON file-like object and extracts data into a pandas DataFrame. | ||
| Args: | ||
| fh (TextIO): A file-like object containing JSON data. | ||
| Returns: | ||
| pd.DataFrame: DataFrame with extracted data, | ||
| where each column corresponds to an expression. | ||
| """ | ||
| import orjson | ||
|
|
||
| # Load entire JSON file (could be a list or a single dict). | ||
| raw = fh.read() | ||
| data = orjson.loads(raw) | ||
|
||
|
|
||
| # Always treat as list of records. | ||
| records = data if isinstance(data, list) else [data] | ||
|
|
||
| series: dict[str, list] = {} | ||
| for jp, engine, expr in self.exprs: | ||
| vals: list = [] | ||
| for rec in records: | ||
| if engine == "jmespath": | ||
| out = expr.search(rec) | ||
| # Unwrap single‐item lists. | ||
| out = _unwrap_single_item(out) | ||
| else: # Engine jsonpath_ng. | ||
| matches = expr.find(rec) | ||
| out = [m.value for m in matches] | ||
| out = _unwrap_single_item(out) | ||
|
|
||
| vals.append(out) | ||
|
|
||
| series[jp] = vals | ||
|
|
||
| return pd.DataFrame(series) | ||
|
|
||
| def raw(self, fh: TextIO) -> pd.DataFrame: | ||
| """Reads a JSON file-like object and returns a single-cell pandas DataFrame. | ||
| The entire JSON content is loaded and placed in a DataFrame with one row | ||
| and one column, where the column name is specified by `FileProperty.content`. | ||
| Args: | ||
| fh (TextIO): A file-like object opened for reading JSON data. | ||
| Returns: | ||
| pd.DataFrame: A DataFrame containing the JSON content in a single cell. | ||
| """ | ||
| # Raw JSON fallback: one‐cell DataFrame. | ||
| raw = fh.read() | ||
| content = orjson.loads(raw) | ||
| return pd.DataFrame({FileProperty.content: [content]}) | ||
|
|
||
|
|
||
| class JsonlReader: | ||
| """Parser for JSON Lines files, supporting both JSONPath and JMESPath.""" | ||
|
|
||
| def __init__(self, fields): | ||
| """Initializes the parser with a list of fields. | ||
| Args: | ||
| fields (list): A list of field objects, each expected to have a | ||
| `source.extract.json_path` attribute. | ||
| The constructor processes each field's JSON path: | ||
| - If the path is a simple JSONPath (starts with "$." and does not | ||
| contain ".."), it is converted to a JMESPath expression and | ||
| compiled. | ||
| - Otherwise, the path is parsed and compiled using jsonpath_rw. | ||
| Compiled expressions, along with their original paths and the engine | ||
| used, are stored in `self.exprs`. The original list of fields is stored | ||
| in `self.fields`. | ||
| """ | ||
| import jmespath | ||
|
|
||
| self.exprs = [] # list of (orig_path, engine, compiled_expr) | ||
| for field in fields: | ||
| json_path = field.source.extract.json_path | ||
| if not json_path: | ||
| continue | ||
|
|
||
| if json_path.startswith("$.") and ".." not in json_path: | ||
| # simple JSONPath → JMESPath | ||
ccl-core marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| jm = json_path.lstrip("$.") # drop the "$." | ||
| expr = jmespath.compile(jm) | ||
| engine = "jmespath" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OoC, why do we need both JSONPath and JMESPath? Why not use JSONPath everywhere? |
||
| else: | ||
| # anything with recursive‐descent or complex filters | ||
| expr = jsonpath_rw.parse(json_path) | ||
| engine = "jsonpath" | ||
|
|
||
| self.exprs.append((json_path, engine, expr)) | ||
| self.fields = fields | ||
|
|
||
| def parse(self, fh): | ||
| """Parses a file-like object containing JSON objects (one per line). | ||
| Args: | ||
| fh: A file-like object to read from, where each line is a JSON object. | ||
| Returns: | ||
| pd.DataFrame: A DataFrame where each row corresponds to a parsed | ||
| JSON object with extracted fields. | ||
| Notes: | ||
| - The extraction expressions are defined in self.exprs as tuples of | ||
| (json_path, engine, expr). | ||
| - For JMESPath, single-item lists are unwrapped to their value. | ||
| - For JSONPath, values are extracted from Match objects and | ||
| single-item lists are unwrapped. | ||
| """ | ||
| import orjson | ||
|
|
||
| rows = [] | ||
| for line in fh: | ||
| line = line.strip() | ||
| if not line: | ||
| continue | ||
| rec = orjson.loads(line) | ||
| row: dict[str, object] = {} | ||
| for json_path, engine, expr in self.exprs: | ||
| if engine == "jmespath": | ||
| out = expr.search(rec) | ||
| # Unwrap single‐item lists. | ||
| out = _unwrap_single_item(out) | ||
| else: | ||
| matches = expr.find(rec) | ||
| temp = [m.value for m in matches] | ||
| # Unwrap single‐item lists. | ||
| out = _unwrap_single_item(temp) | ||
| row[json_path] = out | ||
| rows.append(row) | ||
| return pd.DataFrame(rows) | ||
|
|
||
| def raw(self, fh: TextIO) -> pd.DataFrame: | ||
| """Reads a JSON Lines file-like object and returns a DataFrame.""" | ||
| fh.seek(0) | ||
| return pd.read_json(fh, lines=True) | ||
Uh oh!
There was an error while loading. Please reload this page.