1818logger = logging .getLogger (__name__ )
1919
2020
21- @lru_cache (maxsize = 1 ) # singleton
22- def get_host_address_manager () -> 'HostAddressManager' :
23- return HostAddressManager ()
21+ @dataclass
22+ class HostIdentity :
23+ """Host's network identity, IP addrs and hostnames that are visible from other hosts"""
24+ input : str
25+ # The first, most important IP addr and host name (ordering is done by policy)
26+ primary_ip : str
27+ primary_name : Optional [str ] # Name can be missing (but policy can make it required)
28+ ips : list [str ] # All IP addrs
29+ names : list [str ] # All host names (only those that are visible to other hosts)
30+ unique_key : str # unique id of this host, will be used later for aliases
31+ observed_at : datetime = field (default_factory = lambda : datetime .now (timezone .utc ))
32+
33+ @staticmethod
34+ def from_policy (input : str , policy : 'ResolutionPolicy' , ips : Sequence [IPAddress ], names : Sequence [str ]) -> 'HostIdentity' :
35+ """Construct a HostIdentity from filtered addresses and names using policy ordering."""
36+ if not ips :
37+ raise ResolutionPolicyViolationError ('All resolved addresses were rejected by policy.' )
38+
39+ ordered_ips = policy .order_addresses (ips )
40+ # Calculate host unique key from its addresses (we prefer globals as more stable ones)
41+ globals_ordered = [ip for ip in ordered_ips if ip .is_global ]
42+ privates_ordered = [ip for ip in ordered_ips if ip .is_private ]
43+ to_hash = globals_ordered if globals_ordered else privates_ordered
44+ hasher = hashlib .sha256 ()
45+ for ip in to_hash :
46+ hasher .update (str (ip ).encode ('utf-8' ))
47+ unique_key = hasher .hexdigest ()
48+
49+ return HostIdentity (
50+ input = input ,
51+ ips = [str (ip ) for ip in ordered_ips ],
52+ names = list (names ),
53+ primary_ip = str (ordered_ips [0 ]),
54+ primary_name = names [0 ] if names else None ,
55+ unique_key = unique_key ,
56+ )
57+
58+ @property
59+ def effective_hostname (self ) -> str :
60+ """Hostname or primary IP if hostname is not set
61+
62+ If policy requires host to have a hostname and it doesn't, we won't even get here
63+ """
64+ return self .primary_name or self .primary_ip
65+
66+ def __repr__ (self ) -> str :
67+ parts : list [str ] = [
68+ f'input={ self .input !r} ' ,
69+ f'primary_ip={ self .primary_ip !r} ' ,
70+ ]
71+ if self .primary_name is not None :
72+ parts .append (f'primary_name={ self .primary_name !r} ' )
73+ # Don't show addrs and hostnames if there is only one addr or hostname
74+ if self .ips != [self .primary_ip ]:
75+ parts .append (f'ips={ self .ips !r} ' )
76+ if self .names != [self .primary_name ]:
77+ parts .append (f'names={ self .names !r} ' )
78+ parts .append (f'unique_key={ self .unique_key [:4 ]} ' )
79+ parts .append (f'observed_at={ self .observed_at .replace (microsecond = 0 ).isoformat ()!r} ' )
80+ return 'HostIdentity(' + ', ' .join (parts ) + ')'
81+
82+ def __str__ (self ) -> str :
83+ return repr (self )
2484
2585
2686@dataclass (frozen = True )
@@ -44,9 +104,9 @@ def filter_addresses(self, addrs: Iterable[str]) -> list[IPAddress]:
44104 list (addrs ), self .allow_private_ips , self .allow_ipv6
45105 )
46106 for addr in addrs :
47- ip = ip_or_none (addr )
107+ ip = _ip_or_none (addr )
48108 if ip is None :
49- logger .debug ('Skip %s: not a valid IP literal' , addr )
109+ logger .error ('Skip %s: not a valid IP literal' , addr )
50110 continue
51111
52112 # Fixed rejections first
@@ -91,71 +151,6 @@ def key(ip: IPAddress) -> tuple[int, int, int, int, int, int, int, int]:
91151 return sorted (ips , key = key )
92152
93153
94- @dataclass
95- class HostIdentity :
96- """Host's network identity, IP addrs and hostnames that are visible from other hosts"""
97- input : str
98- # The first, most important IP addr and host name (ordering is done by policy)
99- primary_ip : str
100- primary_name : Optional [str ] # Name can be missing (but policy can make it required)
101- ips : list [str ] # All IP addrs
102- names : list [str ] # All host names (only those that are visible to other hosts)
103- unique_key : str # unique id of this host, will be used later for aliases
104- observed_at : datetime = field (default_factory = lambda : datetime .now (timezone .utc ))
105-
106- @staticmethod
107- def from_policy (input : str , policy : ResolutionPolicy , ips : Sequence [IPAddress ], names : Sequence [str ]) -> 'HostIdentity' :
108- """Construct a HostIdentity from filtered addresses and names using policy ordering."""
109- if not ips :
110- raise ResolutionPolicyViolationError ('All resolved addresses were rejected by policy.' )
111-
112- ordered_ips = policy .order_addresses (ips )
113- # Calculate host unique key from its addresses (we prefer globals as more stable ones)
114- globals_ordered = [ip for ip in ordered_ips if ip .is_global ]
115- privates_ordered = [ip for ip in ordered_ips if ip .is_private ]
116- to_hash = globals_ordered if globals_ordered else privates_ordered
117- hasher = hashlib .sha256 ()
118- for ip in to_hash :
119- hasher .update (str (ip ).encode ('utf-8' ))
120- unique_key = hasher .hexdigest ()
121-
122- return HostIdentity (
123- input = input ,
124- ips = [str (ip ) for ip in ordered_ips ],
125- names = list (names ),
126- primary_ip = str (ordered_ips [0 ]),
127- primary_name = names [0 ] if names else None ,
128- unique_key = unique_key ,
129- )
130-
131- @property
132- def effective_hostname (self ) -> str :
133- """Hostname or primary IP if hostname is not set
134-
135- If policy requires host to have a hostname and it doesn't, we won't even get here
136- """
137- return self .primary_name or self .primary_ip
138-
139- def __repr__ (self ) -> str :
140- parts : list [str ] = [
141- f'input={ self .input !r} ' ,
142- f'primary_ip={ self .primary_ip !r} ' ,
143- ]
144- if self .primary_name is not None :
145- parts .append (f'primary_name={ self .primary_name !r} ' )
146- # Don't show addrs and hostnames if there is only one addr or hostname
147- if self .ips != [self .primary_ip ]:
148- parts .append (f'ips={ self .ips !r} ' )
149- if self .names != [self .primary_name ]:
150- parts .append (f'names={ self .names !r} ' )
151- parts .append (f'unique_key={ self .unique_key [:4 ]} ' )
152- parts .append (f'observed_at={ self .observed_at .replace (microsecond = 0 ).isoformat ()!r} ' )
153- return 'HostIdentity(' + ', ' .join (parts ) + ')'
154-
155- def __str__ (self ) -> str :
156- return repr (self )
157-
158-
159154class HostAddressManager :
160155 """Calculates HostIdentity from passed hostname or IP address."""
161156
@@ -170,7 +165,7 @@ def get_identity(self, target: str) -> HostIdentity:
170165
171166 target = target .strip ()
172167
173- ip = ip_or_none (target )
168+ ip = _ip_or_none (target )
174169 if ip is not None :
175170 identity = self ._get_identity_from_ip (ip , target )
176171 else :
@@ -195,7 +190,7 @@ def get_local_identity(self) -> HostIdentity:
195190 try :
196191 return self .get_identity (ip_text )
197192 except CMAPIBasicError :
198- logger .exception ('Local identity candidate %s failed resolution' , ip_text )
193+ logger .debug ('Local identity candidate %s failed resolution' , ip_text )
199194 continue
200195 raise ResolutionPolicyViolationError ('Could not determine any acceptable local IP addresses under current policy.' )
201196
@@ -211,7 +206,7 @@ def check_hostname_rev_lookup(self, hostname: str) -> tuple[HostIdentity, bool]:
211206 return identity , False
212207
213208 for ip_text in identity .ips :
214- ip = ip_or_none (ip_text )
209+ ip = _ip_or_none (ip_text )
215210 if ip is None :
216211 logger .error ('Invalid IP address: %s' , ip_text )
217212 continue
@@ -229,10 +224,6 @@ def check_hostname_rev_lookup(self, hostname: str) -> tuple[HostIdentity, bool]:
229224 logger .warning ('Roundtrip check failed for %s' , hostname )
230225 return identity , False
231226
232- @property
233- def policy (self ) -> ResolutionPolicy :
234- return self ._policy
235-
236227 def _get_identity_from_ip (self , ip : IPAddress , original_input : str ) -> HostIdentity :
237228 if not self ._policy .filter_addresses ([str (ip )]):
238229 raise ResolutionPolicyViolationError ('Input IP address was rejected by policy' )
@@ -246,9 +237,10 @@ def _get_identity_from_ip(self, ip: IPAddress, original_input: str) -> HostIdent
246237
247238 def _get_identity_from_hostname (self , hostname : str ) -> HostIdentity :
248239 normalized = hostname .strip ().lower ()
249- if not _is_fqdn (normalized ):
240+ if _is_fqdn (normalized ):
241+ return self ._get_identity_from_fqdn (normalized )
242+ else :
250243 return self ._get_identity_from_non_fqdn (hostname )
251- return self ._get_identity_from_fqdn (normalized )
252244
253245 def _get_identity_from_fqdn (self , fqdn : str ) -> HostIdentity :
254246 # Get IPs from hostname (via DNS), filter them, then get names from each IP
@@ -296,7 +288,7 @@ def _get_identity_from_non_fqdn(self, hostname: str) -> HostIdentity:
296288 # Collect names of IPs
297289 names : set [str ] = set ()
298290 for ip_text in list (candidate_ips ):
299- ip = ip_or_none (ip_text )
291+ ip = _ip_or_none (ip_text )
300292 if ip is None :
301293 logger .error ('Invalid IP address: %s' , ip_text )
302294 continue
@@ -337,7 +329,7 @@ def _resolve_dns(self, hostname: str) -> list[IPAddress]:
337329
338330 addrs : list [IPAddress ] = []
339331 for ip_text in ipv4_texts + ipv6_texts :
340- ip = ip_or_none (ip_text )
332+ ip = _ip_or_none (ip_text )
341333 if ip is None :
342334 logger .error ('DNS returned invalid IP address %s for host name %s, skipping' , ip_text , hostname )
343335 continue
@@ -383,31 +375,37 @@ def _dns_reverse(self, ip_text: str) -> list[str]:
383375 names : list [str ] = []
384376 for ptr_rdata in answer :
385377 try :
386- fqdn_name = str (ptr_rdata .target ).rstrip ('.' ).lower ()
387- if _is_fqdn ( fqdn_name ) :
388- names .append (fqdn_name )
378+ name = str (ptr_rdata .target ).rstrip ('.' ).lower ()
379+ if name :
380+ names .append (name )
389381 except Exception :
390382 continue
391383 return names
392384
393385 def _contains_private (self , addrs : list [str ]) -> bool :
394386 """Return True if any resolvable address string is a private IP."""
395387 for addr in addrs :
396- ip = ip_or_none (addr )
388+ ip = _ip_or_none (addr )
397389 if ip is None :
398390 continue
399391 if ip .is_private :
400392 return True
401393 return False
402394
403- def ip_or_none (val : str ) -> Optional [IPAddress ]:
395+
396+ @lru_cache (maxsize = 1 ) # singleton
397+ def get_host_address_manager () -> 'HostAddressManager' :
398+ return HostAddressManager ()
399+
400+
401+ def _ip_or_none (val : str ) -> Optional [IPAddress ]:
404402 try :
405403 return ipaddress .ip_address (val )
406404 except ValueError :
407405 return None
408406
409- def is_ip_address (val : str ) -> bool :
410- return ip_or_none (val ) is not None
407+ def _is_ip_address (val : str ) -> bool :
408+ return _ip_or_none (val ) is not None
411409
412410def _is_fqdn (name : str ) -> bool :
413411 """Return True if the string is a valid FQDN (lower-cased, no trailing dot).
0 commit comments