Skip to content

Commit e8cafe8

Browse files
committed
fix: Downgrade uvloop to fix "no current event loop" error
* Downgrade uvloop to 0.21.0 The current (0.22.1) version of uvloop is not supported by pproxy. A fix is available but has not yet been merged: qwj/python-proxy#202 * Add an end-to-end validation test to verify that pproxy works. The `ChildProcess` and `fetch_via_http_proxy` helpers were written using Copilot. Signed-off-by: Erik Swanson <[email protected]>
1 parent f0cc6cb commit e8cafe8

File tree

2 files changed

+196
-1
lines changed

2 files changed

+196
-1
lines changed

ci/validate.py

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import subprocess
22
import sys
3+
import threading
4+
import time
5+
import urllib.error
6+
import urllib.request
7+
from typing import Union, Optional, Sequence
38

49
# Check that python hasn't been unexpectedly upgraded
510
#
@@ -16,3 +21,193 @@
1621

1722
# Check that pproxy can be started
1823
subprocess.check_call(["/usr/local/bin/pproxy", "--version"])
24+
25+
26+
class ChildProcess:
27+
"""
28+
Context manager to run a child process while other code executes.
29+
30+
Parameters:
31+
- args: sequence or string of args used to launch subprocess (same as subprocess.Popen args)
32+
- prefix: string to prepend to every line read from stdout and stderr
33+
- cwd: optional working directory for the child
34+
- env: optional environment mapping for the child
35+
36+
Example:
37+
with ChildProcess(["python", "-u", "myscript.py"], prefix="[worker] "):
38+
do_other_work()
39+
"""
40+
41+
def __init__(
42+
self,
43+
args: Union[str, Sequence[str]],
44+
prefix: str = "",
45+
cwd: Optional[str] = None,
46+
env: Optional[dict] = None,
47+
):
48+
self.args = args
49+
self.prefix = prefix
50+
self.cwd = cwd
51+
self.env = env
52+
53+
self.proc: Optional[subprocess.Popen] = None
54+
self._threads: list[threading.Thread] = []
55+
56+
def __enter__(self) -> "ChildProcess":
57+
# Start the subprocess with pipes for stdout and stderr.
58+
self.proc = subprocess.Popen(
59+
self.args,
60+
stdout=subprocess.PIPE,
61+
stderr=subprocess.PIPE,
62+
cwd=self.cwd,
63+
env=self.env,
64+
text=True, # uses universal newlines; same as universal_newlines=True
65+
bufsize=1, # line buffered
66+
)
67+
68+
# Reader function for a given pipe -> writes to the appropriate std stream.
69+
def _reader(pipe, target_stream):
70+
try:
71+
# iter(pipe.readline, '') reads until EOF
72+
for line in iter(pipe.readline, ""):
73+
if not line:
74+
break
75+
# Ensure each line is prefixed; line already contains newline if any.
76+
try:
77+
target_stream.write(f"{self.prefix}{line}")
78+
target_stream.flush()
79+
except Exception:
80+
# Best-effort: if writing fails, stop reading
81+
break
82+
finally:
83+
try:
84+
pipe.close()
85+
except Exception:
86+
pass
87+
88+
# Start threads to forward stdout and stderr
89+
if self.proc.stdout is not None:
90+
t_out = threading.Thread(
91+
target=_reader, args=(self.proc.stdout, sys.stdout), name="child_stdout_reader"
92+
)
93+
t_out.daemon = False
94+
t_out.start()
95+
self._threads.append(t_out)
96+
97+
if self.proc.stderr is not None:
98+
t_err = threading.Thread(
99+
target=_reader, args=(self.proc.stderr, sys.stderr), name="child_stderr_reader"
100+
)
101+
t_err.daemon = False
102+
t_err.start()
103+
self._threads.append(t_err)
104+
105+
return self
106+
107+
def terminate(self, wait_secs: float = 5.0) -> None:
108+
"""
109+
Terminate the child process: send terminate(), wait up to wait_secs,
110+
and if still alive send kill().
111+
"""
112+
if not self.proc:
113+
return
114+
115+
if self.proc.poll() is None:
116+
try:
117+
self.proc.terminate()
118+
except Exception:
119+
pass
120+
121+
try:
122+
self.proc.wait(timeout=wait_secs)
123+
except subprocess.TimeoutExpired:
124+
# If it didn't die, force kill
125+
try:
126+
self.proc.kill()
127+
except Exception:
128+
pass
129+
try:
130+
self.proc.wait(timeout=2)
131+
except Exception:
132+
pass
133+
134+
def __exit__(self, exc_type, exc, tb) -> None:
135+
# Terminate the child if still running.
136+
try:
137+
self.terminate()
138+
finally:
139+
# Wait for reader threads to finish reading remaining output.
140+
for t in self._threads:
141+
try:
142+
t.join(timeout=2.0)
143+
except Exception:
144+
pass
145+
146+
# Close any remaining file descriptors on the process object.
147+
if self.proc:
148+
try:
149+
if self.proc.stdout and not self.proc.stdout.closed:
150+
self.proc.stdout.close()
151+
except Exception:
152+
pass
153+
try:
154+
if self.proc.stderr and not self.proc.stderr.closed:
155+
self.proc.stderr.close()
156+
except Exception:
157+
pass
158+
159+
# Do not suppress exceptions from the with-block.
160+
return False
161+
162+
163+
def fetch_via_http_proxy(
164+
url: str,
165+
proxy: str,
166+
timeout: int = 10,
167+
max_retry_time: float = 30.0,
168+
sleep_between_attempts: float = 0.1,
169+
) -> None:
170+
"""
171+
Fetch a URL through an HTTP proxy (used for both http and https).
172+
Retries on any error, waiting a fixed amount of time between attempts.
173+
174+
Parameters:
175+
- url: target URL to fetch.
176+
- proxy: HTTP proxy URL (e.g. "http://proxy.host:3128"). Used for both http and https.
177+
- timeout: per-request timeout in seconds.
178+
- max_retry_time: overall retry budget in seconds.
179+
- sleep_between_attempts: fixed wait time between attempts in seconds (default 0.1).
180+
181+
The function returns None and does not print anything when the GET succeeds.
182+
Raises RuntimeError when the retry budget is exhausted.
183+
"""
184+
opener = urllib.request.build_opener(
185+
urllib.request.ProxyHandler({"http": proxy, "https": proxy})
186+
)
187+
req = urllib.request.Request(url)
188+
189+
start = time.monotonic()
190+
attempt = 0
191+
192+
while True:
193+
attempt += 1
194+
try:
195+
with opener.open(req, timeout=timeout) as resp:
196+
# successful fetch: do not print anything, just return
197+
resp.read(1)
198+
return
199+
except Exception as err:
200+
elapsed = time.monotonic() - start
201+
if elapsed >= max_retry_time:
202+
raise RuntimeError(f"Giving up after {elapsed:.1f}s while fetching {url}") from err
203+
204+
sleep_time = min(sleep_between_attempts, max_retry_time - elapsed)
205+
print(f"Attempt {attempt} failed ({err}); retrying in {sleep_time:.2f}s (elapsed {elapsed:.2f}s)...")
206+
time.sleep(sleep_time)
207+
208+
209+
# Check that pproxy can be used as an http proxy to access http and https servers
210+
with ChildProcess(["/usr/local/bin/pproxy", "-l", "http://:8080", "-v"], "pproxy:"):
211+
with ChildProcess(["/usr/local/bin/python", "-m", "http.server", "8081"], "http.server:"):
212+
fetch_via_http_proxy("http://localhost:8081/", "http://localhost:8080")
213+
fetch_via_http_proxy("https://github.com/robots.txt", "http://localhost:8080")

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,5 +42,5 @@ typing-extensions==4.15.0
4242
# via
4343
# asyncssh
4444
# pyopenssl
45-
uvloop==0.22.1
45+
uvloop==0.21.0
4646
# via pproxy

0 commit comments

Comments
 (0)