Skip to content

Commit c43ba42

Browse files
Cosmetics
1 parent 8451f56 commit c43ba42

File tree

1 file changed

+82
-84
lines changed

1 file changed

+82
-84
lines changed

cmapi/cmapi_server/managers/host_identity.py

Lines changed: 82 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,69 @@
1818
logger = logging.getLogger(__name__)
1919

2020

21-
@lru_cache(maxsize=1) # singleton
22-
def get_host_address_manager() -> 'HostAddressManager':
23-
return HostAddressManager()
21+
@dataclass
22+
class HostIdentity:
23+
"""Host's network identity, IP addrs and hostnames that are visible from other hosts"""
24+
input: str
25+
# The first, most important IP addr and host name (ordering is done by policy)
26+
primary_ip: str
27+
primary_name: Optional[str] # Name can be missing (but policy can make it required)
28+
ips: list[str] # All IP addrs
29+
names: list[str] # All host names (only those that are visible to other hosts)
30+
unique_key: str # unique id of this host, will be used later for aliases
31+
observed_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
32+
33+
@staticmethod
34+
def from_policy(input: str, policy: 'ResolutionPolicy', ips: Sequence[IPAddress], names: Sequence[str]) -> 'HostIdentity':
35+
"""Construct a HostIdentity from filtered addresses and names using policy ordering."""
36+
if not ips:
37+
raise ResolutionPolicyViolationError('All resolved addresses were rejected by policy.')
38+
39+
ordered_ips = policy.order_addresses(ips)
40+
# Calculate host unique key from its addresses (we prefer globals as more stable ones)
41+
globals_ordered = [ip for ip in ordered_ips if ip.is_global]
42+
privates_ordered = [ip for ip in ordered_ips if ip.is_private]
43+
to_hash = globals_ordered if globals_ordered else privates_ordered
44+
hasher = hashlib.sha256()
45+
for ip in to_hash:
46+
hasher.update(str(ip).encode('utf-8'))
47+
unique_key = hasher.hexdigest()
48+
49+
return HostIdentity(
50+
input=input,
51+
ips=[str(ip) for ip in ordered_ips],
52+
names=list(names),
53+
primary_ip=str(ordered_ips[0]),
54+
primary_name=names[0] if names else None,
55+
unique_key=unique_key,
56+
)
57+
58+
@property
59+
def effective_hostname(self) -> str:
60+
"""Hostname or primary IP if hostname is not set
61+
62+
If policy requires host to have a hostname and it doesn't, we won't even get here
63+
"""
64+
return self.primary_name or self.primary_ip
65+
66+
def __repr__(self) -> str:
67+
parts: list[str] = [
68+
f'input={self.input!r}',
69+
f'primary_ip={self.primary_ip!r}',
70+
]
71+
if self.primary_name is not None:
72+
parts.append(f'primary_name={self.primary_name!r}')
73+
# Don't show addrs and hostnames if there is only one addr or hostname
74+
if self.ips != [self.primary_ip]:
75+
parts.append(f'ips={self.ips!r}')
76+
if self.names != [self.primary_name]:
77+
parts.append(f'names={self.names!r}')
78+
parts.append(f'unique_key={self.unique_key[:4]}')
79+
parts.append(f'observed_at={self.observed_at.replace(microsecond=0).isoformat()!r}')
80+
return 'HostIdentity(' + ', '.join(parts) + ')'
81+
82+
def __str__(self) -> str:
83+
return repr(self)
2484

2585

2686
@dataclass(frozen=True)
@@ -44,9 +104,9 @@ def filter_addresses(self, addrs: Iterable[str]) -> list[IPAddress]:
44104
list(addrs), self.allow_private_ips, self.allow_ipv6
45105
)
46106
for addr in addrs:
47-
ip = ip_or_none(addr)
107+
ip = _ip_or_none(addr)
48108
if ip is None:
49-
logger.debug('Skip %s: not a valid IP literal', addr)
109+
logger.error('Skip %s: not a valid IP literal', addr)
50110
continue
51111

52112
# Fixed rejections first
@@ -91,71 +151,6 @@ def key(ip: IPAddress) -> tuple[int, int, int, int, int, int, int, int]:
91151
return sorted(ips, key=key)
92152

93153

94-
@dataclass
95-
class HostIdentity:
96-
"""Host's network identity, IP addrs and hostnames that are visible from other hosts"""
97-
input: str
98-
# The first, most important IP addr and host name (ordering is done by policy)
99-
primary_ip: str
100-
primary_name: Optional[str] # Name can be missing (but policy can make it required)
101-
ips: list[str] # All IP addrs
102-
names: list[str] # All host names (only those that are visible to other hosts)
103-
unique_key: str # unique id of this host, will be used later for aliases
104-
observed_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
105-
106-
@staticmethod
107-
def from_policy(input: str, policy: ResolutionPolicy, ips: Sequence[IPAddress], names: Sequence[str]) -> 'HostIdentity':
108-
"""Construct a HostIdentity from filtered addresses and names using policy ordering."""
109-
if not ips:
110-
raise ResolutionPolicyViolationError('All resolved addresses were rejected by policy.')
111-
112-
ordered_ips = policy.order_addresses(ips)
113-
# Calculate host unique key from its addresses (we prefer globals as more stable ones)
114-
globals_ordered = [ip for ip in ordered_ips if ip.is_global]
115-
privates_ordered = [ip for ip in ordered_ips if ip.is_private]
116-
to_hash = globals_ordered if globals_ordered else privates_ordered
117-
hasher = hashlib.sha256()
118-
for ip in to_hash:
119-
hasher.update(str(ip).encode('utf-8'))
120-
unique_key = hasher.hexdigest()
121-
122-
return HostIdentity(
123-
input=input,
124-
ips=[str(ip) for ip in ordered_ips],
125-
names=list(names),
126-
primary_ip=str(ordered_ips[0]),
127-
primary_name=names[0] if names else None,
128-
unique_key=unique_key,
129-
)
130-
131-
@property
132-
def effective_hostname(self) -> str:
133-
"""Hostname or primary IP if hostname is not set
134-
135-
If policy requires host to have a hostname and it doesn't, we won't even get here
136-
"""
137-
return self.primary_name or self.primary_ip
138-
139-
def __repr__(self) -> str:
140-
parts: list[str] = [
141-
f'input={self.input!r}',
142-
f'primary_ip={self.primary_ip!r}',
143-
]
144-
if self.primary_name is not None:
145-
parts.append(f'primary_name={self.primary_name!r}')
146-
# Don't show addrs and hostnames if there is only one addr or hostname
147-
if self.ips != [self.primary_ip]:
148-
parts.append(f'ips={self.ips!r}')
149-
if self.names != [self.primary_name]:
150-
parts.append(f'names={self.names!r}')
151-
parts.append(f'unique_key={self.unique_key[:4]}')
152-
parts.append(f'observed_at={self.observed_at.replace(microsecond=0).isoformat()!r}')
153-
return 'HostIdentity(' + ', '.join(parts) + ')'
154-
155-
def __str__(self) -> str:
156-
return repr(self)
157-
158-
159154
class HostAddressManager:
160155
"""Calculates HostIdentity from passed hostname or IP address."""
161156

@@ -170,7 +165,7 @@ def get_identity(self, target: str) -> HostIdentity:
170165

171166
target = target.strip()
172167

173-
ip = ip_or_none(target)
168+
ip = _ip_or_none(target)
174169
if ip is not None:
175170
identity = self._get_identity_from_ip(ip, target)
176171
else:
@@ -211,7 +206,7 @@ def check_hostname_rev_lookup(self, hostname: str) -> tuple[HostIdentity, bool]:
211206
return identity, False
212207

213208
for ip_text in identity.ips:
214-
ip = ip_or_none(ip_text)
209+
ip = _ip_or_none(ip_text)
215210
if ip is None:
216211
logger.error('Invalid IP address: %s', ip_text)
217212
continue
@@ -229,10 +224,6 @@ def check_hostname_rev_lookup(self, hostname: str) -> tuple[HostIdentity, bool]:
229224
logger.warning('Roundtrip check failed for %s', hostname)
230225
return identity, False
231226

232-
@property
233-
def policy(self) -> ResolutionPolicy:
234-
return self._policy
235-
236227
def _get_identity_from_ip(self, ip: IPAddress, original_input: str) -> HostIdentity:
237228
if not self._policy.filter_addresses([str(ip)]):
238229
raise ResolutionPolicyViolationError('Input IP address was rejected by policy')
@@ -246,9 +237,10 @@ def _get_identity_from_ip(self, ip: IPAddress, original_input: str) -> HostIdent
246237

247238
def _get_identity_from_hostname(self, hostname: str) -> HostIdentity:
248239
normalized = hostname.strip().lower()
249-
if not _is_fqdn(normalized):
240+
if _is_fqdn(normalized):
241+
return self._get_identity_from_fqdn(normalized)
242+
else:
250243
return self._get_identity_from_non_fqdn(hostname)
251-
return self._get_identity_from_fqdn(normalized)
252244

253245
def _get_identity_from_fqdn(self, fqdn: str) -> HostIdentity:
254246
# Get IPs from hostname (via DNS), filter them, then get names from each IP
@@ -296,7 +288,7 @@ def _get_identity_from_non_fqdn(self, hostname: str) -> HostIdentity:
296288
# Collect names of IPs
297289
names: set[str] = set()
298290
for ip_text in list(candidate_ips):
299-
ip = ip_or_none(ip_text)
291+
ip = _ip_or_none(ip_text)
300292
if ip is None:
301293
logger.error('Invalid IP address: %s', ip_text)
302294
continue
@@ -337,7 +329,7 @@ def _resolve_dns(self, hostname: str) -> list[IPAddress]:
337329

338330
addrs: list[IPAddress] = []
339331
for ip_text in ipv4_texts + ipv6_texts:
340-
ip = ip_or_none(ip_text)
332+
ip = _ip_or_none(ip_text)
341333
if ip is None:
342334
logger.error('DNS returned invalid IP address %s for host name %s, skipping', ip_text, hostname)
343335
continue
@@ -393,21 +385,27 @@ def _dns_reverse(self, ip_text: str) -> list[str]:
393385
def _contains_private(self, addrs: list[str]) -> bool:
394386
"""Return True if any resolvable address string is a private IP."""
395387
for addr in addrs:
396-
ip = ip_or_none(addr)
388+
ip = _ip_or_none(addr)
397389
if ip is None:
398390
continue
399391
if ip.is_private:
400392
return True
401393
return False
402394

403-
def ip_or_none(val: str) -> Optional[IPAddress]:
395+
396+
@lru_cache(maxsize=1) # singleton
397+
def get_host_address_manager() -> 'HostAddressManager':
398+
return HostAddressManager()
399+
400+
401+
def _ip_or_none(val: str) -> Optional[IPAddress]:
404402
try:
405403
return ipaddress.ip_address(val)
406404
except ValueError:
407405
return None
408406

409-
def is_ip_address(val: str) -> bool:
410-
return ip_or_none(val) is not None
407+
def _is_ip_address(val: str) -> bool:
408+
return _ip_or_none(val) is not None
411409

412410
def _is_fqdn(name: str) -> bool:
413411
"""Return True if the string is a valid FQDN (lower-cased, no trailing dot).

0 commit comments

Comments
 (0)