-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmetadata.py
More file actions
122 lines (99 loc) · 3.48 KB
/
metadata.py
File metadata and controls
122 lines (99 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import hashlib
import os
import stat
try:
import magic
_magic_instance = magic.Magic(mime=True)
except ImportError:
_magic_instance = None
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from loguru import logger
DELTA_REQUIRED_COLUMNS = ["Directory", "Dataset Repo", "SF Table", "Filename"]
@dataclass
class DeltaInfo:
directory: str
dataset_repo: str
sf_table: str
filename: str
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class FileMetadata:
name: str
extension: str
full_path: str
relative_path: str
size_bytes: int
ctime: datetime
mtime: datetime
permissions: str
owner: str
mime_type: str
md5: str
def compute_hash(self) -> None:
"""Compute MD5 hash lazily (reads entire file)."""
self.md5 = _compute_md5(Path(self.full_path))
def to_dict(self) -> dict:
d = asdict(self)
d["ctime"] = self.ctime.strftime("%Y-%m-%d %H:%M:%S")
d["mtime"] = self.mtime.strftime("%Y-%m-%d %H:%M:%S")
return d
def _compute_md5(file_path: Path) -> str:
"""Compute MD5 in 8KB chunks."""
md5 = hashlib.md5()
try:
with open(file_path, "rb") as f:
while chunk := f.read(8192):
md5.update(chunk)
except (PermissionError, OSError) as e:
logger.debug("_compute_md5 error | file={} error={}", file_path, e)
return ""
return md5.hexdigest()
def _detect_mime(file_path: Path) -> str:
"""Detect MIME type via file header bytes, fall back to extension."""
if _magic_instance:
try:
return _magic_instance.from_file(str(file_path))
except (PermissionError, OSError) as e:
logger.debug("_detect_mime magic failed, falling back | file={} error={}", file_path, e)
import mimetypes
return mimetypes.guess_type(str(file_path))[0] or "unknown"
def _get_owner(file_path: Path) -> str:
"""Get file owner cross-platform."""
try:
return file_path.owner()
except (NotImplementedError, OSError) as e:
logger.debug("_get_owner error | file={} error={}", file_path, e)
return "N/A"
def extract_metadata_stat(
file_path: Path, base_dir: Path, file_stat: os.stat_result
) -> FileMetadata:
"""Build FileMetadata from a pre-computed stat result.
Owner and mime_type are set to placeholders; call enrich_metadata()
to fill them in after cheaper filters have passed.
"""
return FileMetadata(
name=file_path.name,
extension=file_path.suffix,
full_path=str(file_path.resolve()),
relative_path=str(file_path.relative_to(base_dir)),
size_bytes=file_stat.st_size,
ctime=datetime.fromtimestamp(file_stat.st_ctime),
mtime=datetime.fromtimestamp(file_stat.st_mtime),
permissions=stat.filemode(file_stat.st_mode),
owner="",
mime_type="",
md5="",
)
def enrich_metadata(metadata: FileMetadata, file_path: Path) -> FileMetadata:
"""Fill in expensive fields (owner, mime_type) on an existing FileMetadata."""
metadata.owner = _get_owner(file_path)
metadata.mime_type = _detect_mime(file_path)
return metadata
def extract_metadata(file_path: Path, base_dir: Path) -> FileMetadata:
"""Extract all metadata from a single file (convenience wrapper)."""
file_stat = file_path.stat()
metadata = extract_metadata_stat(file_path, base_dir, file_stat)
return enrich_metadata(metadata, file_path)