|
5 | 5 | # |
6 | 6 |
|
7 | 7 | import json |
| 8 | +import time |
8 | 9 | from collections import namedtuple |
9 | 10 | from gzip import GzipFile |
10 | 11 | from io import BytesIO |
|
18 | 19 | from .arrow_context import ArrowConverterContext |
19 | 20 | from .errorcode import ER_CHUNK_DOWNLOAD_FAILED |
20 | 21 | from .errors import Error, OperationalError |
21 | | -from .time_util import get_time_millis |
| 22 | +from .time_util import get_time_millis, DecorrelateJitterBackoff |
22 | 23 |
|
23 | | -DEFAULT_REQUEST_TIMEOUT = 3600 |
| 24 | +DEFAULT_REQUEST_TIMEOUT = 7 |
24 | 25 |
|
25 | 26 | DEFAULT_CLIENT_PREFETCH_THREADS = 4 |
26 | 27 | MAX_CLIENT_PREFETCH_THREADS = 10 |
@@ -113,41 +114,50 @@ def _download_chunk(self, idx): |
113 | 114 | """ |
114 | 115 | logger.debug(u'downloading chunk %s/%s', idx + 1, self._chunk_size) |
115 | 116 | headers = {} |
116 | | - try: |
117 | | - if self._chunk_headers is not None: |
118 | | - headers = self._chunk_headers |
119 | | - logger.debug(u'use chunk headers from result') |
120 | | - elif self._qrmk is not None: |
121 | | - headers[SSE_C_ALGORITHM] = SSE_C_AES |
122 | | - headers[SSE_C_KEY] = self._qrmk |
123 | | - |
124 | | - logger.debug(u"started getting the result set %s: %s", |
125 | | - idx + 1, self._chunks[idx].url) |
126 | | - result_data = self._fetch_chunk(self._chunks[idx].url, headers) |
127 | | - logger.debug(u"finished getting the result set %s: %s", |
128 | | - idx + 1, self._chunks[idx].url) |
129 | | - |
130 | | - if isinstance(result_data, ResultIterWithTimings): |
131 | | - metrics = result_data.get_timings() |
132 | | - with self._downloading_chunks_lock: |
133 | | - self._total_millis_downloading_chunks += metrics[ |
134 | | - ResultIterWithTimings.DOWNLOAD] |
135 | | - self._total_millis_parsing_chunks += metrics[ |
136 | | - ResultIterWithTimings.PARSE] |
137 | | - |
138 | | - with self._chunk_cond: |
139 | | - self._chunks[idx] = self._chunks[idx]._replace( |
140 | | - result_data=result_data, |
141 | | - ready=True) |
142 | | - self._chunk_cond.notify_all() |
143 | | - logger.debug( |
144 | | - u'added chunk %s/%s to a chunk list.', idx + 1, |
145 | | - self._chunk_size) |
146 | | - except Exception as e: |
147 | | - logger.exception( |
148 | | - u'Failed to fetch the large result set chunk %s/%s', |
149 | | - idx + 1, self._chunk_size) |
150 | | - self._downloader_error = e |
| 117 | + if self._chunk_headers is not None: |
| 118 | + headers = self._chunk_headers |
| 119 | + logger.debug(u'use chunk headers from result') |
| 120 | + elif self._qrmk is not None: |
| 121 | + headers[SSE_C_ALGORITHM] = SSE_C_AES |
| 122 | + headers[SSE_C_KEY] = self._qrmk |
| 123 | + |
| 124 | + last_error = None |
| 125 | + backoff = DecorrelateJitterBackoff(1, 16) |
| 126 | + sleep_timer = 1 |
| 127 | + for retry in range(10): |
| 128 | + try: |
| 129 | + logger.debug(u"started getting the result set %s: %s", |
| 130 | + idx + 1, self._chunks[idx].url) |
| 131 | + result_data = self._fetch_chunk(self._chunks[idx].url, headers) |
| 132 | + logger.debug(u"finished getting the result set %s: %s", |
| 133 | + idx + 1, self._chunks[idx].url) |
| 134 | + |
| 135 | + if isinstance(result_data, ResultIterWithTimings): |
| 136 | + metrics = result_data.get_timings() |
| 137 | + with self._downloading_chunks_lock: |
| 138 | + self._total_millis_downloading_chunks += metrics[ |
| 139 | + ResultIterWithTimings.DOWNLOAD] |
| 140 | + self._total_millis_parsing_chunks += metrics[ |
| 141 | + ResultIterWithTimings.PARSE] |
| 142 | + |
| 143 | + with self._chunk_cond: |
| 144 | + self._chunks[idx] = self._chunks[idx]._replace( |
| 145 | + result_data=result_data, |
| 146 | + ready=True) |
| 147 | + self._chunk_cond.notify_all() |
| 148 | + logger.debug( |
| 149 | + u'added chunk %s/%s to a chunk list.', idx + 1, |
| 150 | + self._chunk_size) |
| 151 | + break |
| 152 | + except Exception as e: |
| 153 | + last_error = e |
| 154 | + sleep_timer = backoff.next_sleep(1, sleep_timer) |
| 155 | + logger.exception( |
| 156 | + u'Failed to fetch the large result set chunk %s/%s for the %s th time, backing off for %s s', |
| 157 | + idx + 1, self._chunk_size, retry + 1, sleep_timer) |
| 158 | + time.sleep(sleep_timer) |
| 159 | + else: |
| 160 | + self._downloader_error = last_error |
151 | 161 |
|
152 | 162 | def next_chunk(self): |
153 | 163 | """ |
|
0 commit comments