1+ #!/usr/bin/env python3
2+ # encoding: utf-8
3+
4+ import json
5+ import os
6+ import re
7+ import requests
8+ from urllib .parse import urlparse
9+ from cortexutils .analyzer import Analyzer
10+
11+ ONION_RE = re .compile (r'^[a-z2-7]{16,56}\.onion$' , re .IGNORECASE )
12+
13+ class AILOnionLookup (Analyzer ):
14+ def __init__ (self ):
15+ Analyzer .__init__ (self )
16+ self .base_url = self .get_param ("config.base_url" , "https://onion.ail-project.org" )
17+ self .timeout = int (self .get_param ("config.timeout" , 30 ))
18+ self .verify_tls = bool (self .get_param ("config.verify_tls" , True ))
19+ self .csam_tag = 'dark-web:topic="child-sexual-abuse-material"'
20+ self .tag_descriptions = self ._load_tag_descriptions ()
21+
22+ def _extract_onion_host (self , value : str ) -> str :
23+ v = (value or "" ).strip ()
24+ # If it's a URL, parse hostname
25+ if re .match (r'^[a-z]+://' , v , re .IGNORECASE ):
26+ host = urlparse (v ).hostname or ""
27+ else :
28+ host = v
29+ host = host .lower ().strip ()
30+ # strip optional port
31+ if ":" in host :
32+ host = host .split (":" , 1 )[0 ]
33+ # basic sanity
34+ if not host .endswith (".onion" ):
35+ raise ValueError ("Not a .onion host" )
36+ if not ONION_RE .match (host ):
37+ pass
38+ return host
39+
40+ def _load_tag_descriptions (self ):
41+ """Load tag descriptions from machinetag.json"""
42+ machinetag_path = os .path .join (os .path .dirname (__file__ ), 'machinetag.json' )
43+ if not os .path .exists (machinetag_path ):
44+ return {}
45+
46+ with open (machinetag_path , 'r' , encoding = 'utf-8' ) as f :
47+ self .machinetag_data = json .load (f )
48+
49+ descriptions = {}
50+ for value_info in self .machinetag_data ['values' ]:
51+ predicate = value_info ['predicate' ]
52+ for entry in value_info ['entry' ]:
53+ key = f"dark-web:{ predicate } ={ entry ['value' ]} "
54+ descriptions [key ] = {
55+ 'description' : entry ['description' ],
56+ 'expanded' : entry ['expanded' ],
57+ 'value' : entry ['value' ]
58+ }
59+
60+ return descriptions
61+
62+ def _count_detections (self , tags ):
63+ """Count total detections and malicious detections"""
64+ if not tags or not isinstance (tags , list ):
65+ return {'total' : 0 , 'malicious' : 0 }
66+
67+ sanitized_tags = self ._create_sanitized_tags (tags )
68+
69+ total_detections = len (tags )
70+ malicious_detections = 0
71+
72+ for tag in sanitized_tags :
73+ if tag in self .tag_descriptions :
74+ # ANY tag matching an entry in machinetag.json is considered malicious / notables
75+ malicious_detections += 1
76+
77+ return {'total' : total_detections , 'malicious' : malicious_detections }
78+
79+ def _create_sanitized_tags (self , tags ):
80+ """Create sanitized tags by stripping all quotes, escapes, and whitespace"""
81+ return [re .sub (r'["\\\s]' , '' , tag .strip ()) for tag in tags ]
82+
83+ def _enrich_tags (self , tags ):
84+ """Add human-readable descriptions to tags for security analysts"""
85+ enriched = []
86+ sanitized = self ._create_sanitized_tags (tags )
87+
88+ for original , clean in zip (tags , sanitized ):
89+ tag_info = {
90+ 'original' : original ,
91+ 'sanitized' : clean ,
92+ 'description' : None ,
93+ 'expanded' : None
94+ }
95+
96+ if clean in self .tag_descriptions :
97+ tag_info .update (self .tag_descriptions [clean ])
98+
99+ enriched .append (tag_info )
100+
101+ return enriched
102+
103+ def run (self ):
104+ try :
105+ onion = self ._extract_onion_host (self .get_data ())
106+ url = f"{ self .base_url .rstrip ('/' )} /api/lookup/{ onion } "
107+ r = requests .get (url , timeout = 3 , verify = self .verify_tls )
108+
109+ if r .status_code == 200 :
110+ resp = r .json ()
111+ # API returns [{"error": "Invalid Domain"}, 404] for non-existent onions
112+ if isinstance (resp , list ) and len (resp ) == 2 and isinstance (resp [0 ], dict ) and "error" in resp [0 ]:
113+ self .error ("Onion service not found" )
114+ else :
115+ # For csam tag testing
116+ # if isinstance(resp, dict) and "tags" in resp and isinstance(resp["tags"], list):
117+ # resp["tags"].append(self.csam_tag)
118+ # Add enriched tags with analyst-friendly descriptions
119+ if isinstance (resp , dict ) and "tags" in resp and isinstance (resp ["tags" ], list ):
120+ resp ["tags_enriched" ] = self ._enrich_tags (resp ["tags" ])
121+ resp ["tags_sanitized" ] = self ._create_sanitized_tags (resp ["tags" ])
122+ self .report (resp )
123+ else :
124+ self .error ("API request failed" )
125+
126+ except Exception :
127+ self .error ("Failed to process onion lookup" )
128+
129+ def operations (self , raw ):
130+ ops = []
131+ try :
132+ # Skip operations if raw is an error array
133+ if isinstance (raw , list ):
134+ return []
135+
136+ tags = set ()
137+ if isinstance (raw , dict ) and "tags" in raw and isinstance (raw ["tags" ], list ):
138+ tags .update (str (t ) for t in raw ["tags" ])
139+ tags .update ({"source:ail-onion-lookup" })
140+
141+ for t in sorted (tags ):
142+ ops .append (self .build_operation ("AddTagToArtifact" , tag = t ))
143+
144+ # if self.csam_tag in tags:
145+ # ops.append(self.build_operation("AddTagToArtifact", tag="risk:csam-linked"))
146+ # ops.append(self.build_operation("AddTagToCase", tag="risk:csam-linked"))
147+ # task_title = "Review CSAM-linked onion"
148+ # task_desc = (
149+ # "- Validate evidence handling (no download / safe preview)\n"
150+ # "- Update blocklists / mail/ web proxies as applicable\n"
151+ # "- Check prior sightings / related artifacts\n"
152+ # "- Consider legal/notification procedures per policy\n"
153+ # f"- Source: {self.base_url}\n"
154+ # )
155+ # ops.append(self.build_operation("CreateTask", title=task_title, description=task_desc))
156+
157+ except Exception :
158+ return []
159+ return ops
160+
161+ def artifacts (self , raw ):
162+ artifacts = []
163+ return artifacts
164+
165+ def summary (self , raw ):
166+ taxonomies = []
167+ namespace = "OnionLookup"
168+
169+ try :
170+ # Skip summary if raw is an error array
171+ if isinstance (raw , list ):
172+ return {"taxonomies" : []}
173+
174+ tags = []
175+ if isinstance (raw , dict ) and "tags" in raw and isinstance (raw ["tags" ], list ):
176+ tags = raw ["tags" ]
177+
178+ found = False
179+ if isinstance (raw , dict ):
180+ found = any (raw .get (k ) for k in ("id" , "first_seen" , "last_seen" , "titles" , "languages" , "tags" ))
181+
182+ # Status taxonomy
183+ taxonomies .append (
184+ self .build_taxonomy ("info" , namespace , "Status" , "found" if found else "not-found" )
185+ )
186+
187+ # Detection count taxonomies for short reports
188+ if found and tags :
189+ detection_counts = self ._count_detections (tags )
190+
191+ # Total detections with descriptions
192+ if detection_counts ['total' ] > 0 :
193+ taxonomies .append (
194+ self .build_taxonomy ("info" , namespace , "Detections" , str (detection_counts ['total' ]))
195+ )
196+
197+ # Notables detections
198+ if detection_counts ['malicious' ] > 0 :
199+ taxonomies .append (
200+ self .build_taxonomy ("suspicious" , namespace , "Notables" , str (detection_counts ['malicious' ]))
201+ )
202+
203+ # Special case for CSAM
204+ if self .csam_tag in [str (t ) for t in tags ]:
205+ taxonomies .append (self .build_taxonomy ("malicious" , namespace , "CSAM" , "linked" ))
206+
207+ except Exception :
208+ pass
209+
210+ return {"taxonomies" : taxonomies }
211+
212+
213+ if __name__ == "__main__" :
214+ AILOnionLookup ().run ()
0 commit comments