Skip to content

Commit 924f2ca

Browse files
authored
chore: Revert HashCache DB changes causing E2E test failures (#892)
* chore: Revert "fix: Add backoff retry when opening HashDB. Also ensure each test is using isolated cache location. (#884)" This reverts commit 08c1e89. Signed-off-by: Jericho Tolentino <[email protected]> * chore: Revert "fix: concurrent job bundle submissions fail due to sqlite3 table already exist (#862)" This reverts commit b35aeb1. Signed-off-by: Jericho Tolentino <[email protected]> * chore: Revert "fix: Adding thread local s3 cache db connections to fix performance bottleneck during bundle submission (#848)" This reverts commit 4914e61. Signed-off-by: Jericho Tolentino <[email protected]> --------- Signed-off-by: Jericho Tolentino <[email protected]>
1 parent c2188ed commit 924f2ca

File tree

8 files changed

+59
-559
lines changed

8 files changed

+59
-559
lines changed

src/deadline/job_attachments/caches/cache_db.py

Lines changed: 13 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,11 @@
66

77
import logging
88
import os
9-
import threading
109
from abc import ABC
1110
from threading import Lock
1211
from typing import Optional
1312

1413
from ..exceptions import JobAttachmentsError
15-
from .._utils import _retry
1614

1715
CONFIG_ROOT = ".deadline"
1816
COMPONENT_NAME = "job_attachments"
@@ -28,9 +26,6 @@ class CacheDB(ABC):
2826
close the connection to the cache database.
2927
"""
3028

31-
# Number of retry attempts for SQLite operational errors (e.g., database locks)
32-
RETRY_ATTEMPTS = 3
33-
3429
def __init__(
3530
self, cache_name: str, table_name: str, create_query: str, cache_dir: Optional[str] = None
3631
) -> None:
@@ -39,8 +34,6 @@ def __init__(
3934
self.cache_name: str = cache_name
4035
self.table_name: str = table_name
4136
self.create_query: str = create_query
42-
self.local = threading.local()
43-
self.local_connections: set = set()
4437

4538
try:
4639
# SQLite is included in Python installers, but might not exist if building python from source.
@@ -67,85 +60,29 @@ def __enter__(self):
6760
if self.enabled:
6861
import sqlite3
6962

70-
@_retry(
71-
ExceptionToCheck=sqlite3.OperationalError,
72-
tries=self.RETRY_ATTEMPTS,
73-
delay=(0.5, 1.5), # Jitter between 0.5 and 1.5 seconds
74-
backoff=1.0,
75-
logger=logger.warning,
76-
)
77-
def _connect_to_db():
78-
try:
79-
connection = sqlite3.connect(self.cache_dir, check_same_thread=False)
80-
try:
81-
# Test the connection by trying to query the table
82-
connection.execute(f"SELECT * FROM {self.table_name}")
83-
except Exception:
84-
# DB file doesn't have our table, so we need to create it
85-
logger.info(
86-
f"No cache entries for the current library version were found. Creating a new cache for {self.cache_name}"
87-
)
88-
connection.execute(self.create_query)
89-
return connection
90-
except sqlite3.OperationalError as oe:
91-
logger.info("Error connecting to database, retrying.")
92-
raise oe
93-
9463
try:
95-
self.db_connection = _connect_to_db()
64+
self.db_connection: sqlite3.Connection = sqlite3.connect(
65+
self.cache_dir, check_same_thread=False
66+
)
9667
except sqlite3.OperationalError as oe:
9768
raise JobAttachmentsError(
98-
f"Could not access cache file after {self.RETRY_ATTEMPTS} retry attempts: {self.cache_dir}"
69+
f"Could not access cache file in {self.cache_dir}"
9970
) from oe
71+
72+
try:
73+
self.db_connection.execute(f"SELECT * FROM {self.table_name}")
74+
except Exception:
75+
# DB file doesn't have our table, so we need to create it
76+
logger.info(
77+
f"No cache entries for the current library version were found. Creating a new cache for {self.cache_name}"
78+
)
79+
self.db_connection.execute(self.create_query)
10080
return self
10181

10282
def __exit__(self, exc_type, exc_value, exc_traceback):
10383
"""Called when exiting the context manager."""
104-
10584
if self.enabled:
106-
import sqlite3
107-
10885
self.db_connection.close()
109-
for conn in self.local_connections:
110-
try:
111-
conn.close()
112-
except sqlite3.Error as e:
113-
logger.warning(f"SQLite connection failed to close with error {e}")
114-
115-
self.local_connections.clear()
116-
117-
def get_local_connection(self):
118-
"""Create and/or returns a thread local connection to the SQLite database."""
119-
if not self.enabled:
120-
return None
121-
import sqlite3
122-
123-
if not hasattr(self.local, "connection"):
124-
125-
@_retry(
126-
ExceptionToCheck=sqlite3.OperationalError,
127-
tries=self.RETRY_ATTEMPTS,
128-
delay=(0.5, 1.5), # Jitter between 0.5 and 1.5 seconds
129-
backoff=1.0,
130-
logger=logger.warning,
131-
)
132-
def _create_local_connection():
133-
try:
134-
connection = sqlite3.connect(self.cache_dir, check_same_thread=False)
135-
return connection
136-
except sqlite3.OperationalError as oe:
137-
logger.info("Error connecting to database, retrying.")
138-
raise oe
139-
140-
try:
141-
self.local.connection = _create_local_connection()
142-
self.local_connections.add(self.local.connection)
143-
except sqlite3.OperationalError as oe:
144-
raise JobAttachmentsError(
145-
f"Could not create connection to cache after {self.RETRY_ATTEMPTS} retry attempts: {self.cache_dir}"
146-
) from oe
147-
148-
return self.local.connection
14986

15087
@classmethod
15188
def get_default_cache_db_file_dir(cls) -> Optional[str]:
@@ -162,23 +99,12 @@ def remove_cache(self) -> None:
16299
"""
163100
Removes the underlying cache contents from the file system.
164101
"""
165-
166102
if self.enabled:
167-
import sqlite3
168-
169103
self.db_connection.close()
170-
conn_list = list(self.local_connections)
171-
for conn in conn_list:
172-
try:
173-
conn.close()
174-
self.local_connections.remove(conn)
175-
except sqlite3.Error as e:
176-
logger.warning(f"SQLite connection failed to close with error {e}")
177104

178105
logger.debug(f"The cache {self.cache_dir} will be removed")
179106
try:
180107
os.remove(self.cache_dir)
181108
except Exception as e:
182109
logger.error(f"Error occurred while removing the cache file {self.cache_dir}: {e}")
183-
184110
raise e

src/deadline/job_attachments/caches/hash_cache.py

Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -51,40 +51,14 @@ class HashCache(CacheDB):
5151

5252
def __init__(self, cache_dir: Optional[str] = None) -> None:
5353
table_name: str = f"hashesV{self.CACHE_DB_VERSION}"
54-
create_query: str = f"CREATE TABLE IF NOT EXISTS hashesV{self.CACHE_DB_VERSION}(file_path blob primary key, hash_algorithm text secondary key, file_hash text, last_modified_time timestamp)"
54+
create_query: str = f"CREATE TABLE hashesV{self.CACHE_DB_VERSION}(file_path blob primary key, hash_algorithm text secondary key, file_hash text, last_modified_time timestamp)"
5555
super().__init__(
5656
cache_name=self.CACHE_NAME,
5757
table_name=table_name,
5858
create_query=create_query,
5959
cache_dir=cache_dir,
6060
)
6161

62-
def get_connection_entry(
63-
self, file_path_key: str, hash_algorithm: HashAlgorithm, connection
64-
) -> Optional[HashCacheEntry]:
65-
"""
66-
Returns an entry from the hash cache, if it exists.
67-
"""
68-
if not self.enabled:
69-
return None
70-
71-
entry_vals = connection.execute(
72-
f"SELECT * FROM {self.table_name} WHERE file_path=? AND hash_algorithm=?",
73-
[
74-
file_path_key.encode(encoding="utf-8", errors="surrogatepass"),
75-
hash_algorithm.value,
76-
],
77-
).fetchone()
78-
if entry_vals:
79-
return HashCacheEntry(
80-
file_path=str(entry_vals[0], encoding="utf-8", errors="surrogatepass"),
81-
hash_algorithm=HashAlgorithm(entry_vals[1]),
82-
file_hash=entry_vals[2],
83-
last_modified_time=str(entry_vals[3]),
84-
)
85-
else:
86-
return None
87-
8862
def get_entry(
8963
self, file_path_key: str, hash_algorithm: HashAlgorithm
9064
) -> Optional[HashCacheEntry]:
@@ -95,7 +69,22 @@ def get_entry(
9569
return None
9670

9771
with self.db_lock, self.db_connection:
98-
return self.get_connection_entry(file_path_key, hash_algorithm, self.db_connection)
72+
entry_vals = self.db_connection.execute(
73+
f"SELECT * FROM {self.table_name} WHERE file_path=? AND hash_algorithm=?",
74+
[
75+
file_path_key.encode(encoding="utf-8", errors="surrogatepass"),
76+
hash_algorithm.value,
77+
],
78+
).fetchone()
79+
if entry_vals:
80+
return HashCacheEntry(
81+
file_path=str(entry_vals[0], encoding="utf-8", errors="surrogatepass"),
82+
hash_algorithm=HashAlgorithm(entry_vals[1]),
83+
file_hash=entry_vals[2],
84+
last_modified_time=str(entry_vals[3]),
85+
)
86+
else:
87+
return None
9988

10089
def put_entry(self, entry: HashCacheEntry) -> None:
10190
"""Inserts or replaces an entry into the hash cache database after acquiring the lock."""

src/deadline/job_attachments/caches/s3_check_cache.py

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -48,37 +48,14 @@ class S3CheckCache(CacheDB):
4848

4949
def __init__(self, cache_dir: Optional[str] = None) -> None:
5050
table_name: str = f"s3checkV{self.CACHE_DB_VERSION}"
51-
create_query: str = f"CREATE TABLE IF NOT EXISTS s3checkV{self.CACHE_DB_VERSION}(s3_key text primary key, last_seen_time timestamp)"
51+
create_query: str = f"CREATE TABLE s3checkV{self.CACHE_DB_VERSION}(s3_key text primary key, last_seen_time timestamp)"
5252
super().__init__(
5353
cache_name=self.CACHE_NAME,
5454
table_name=table_name,
5555
create_query=create_query,
5656
cache_dir=cache_dir,
5757
)
5858

59-
def get_connection_entry(self, s3_key: str, connection) -> Optional[S3CheckCacheEntry]:
60-
"""
61-
Checks if an entry exists in the cache, and returns it if it hasn't expired.
62-
"""
63-
64-
entry_vals = connection.execute(
65-
f"SELECT * FROM {self.table_name} WHERE s3_key=?",
66-
[s3_key],
67-
).fetchone()
68-
if entry_vals:
69-
entry = S3CheckCacheEntry(
70-
s3_key=entry_vals[0],
71-
last_seen_time=str(entry_vals[1]),
72-
)
73-
try:
74-
last_seen = datetime.fromtimestamp(float(entry.last_seen_time))
75-
if (datetime.now() - last_seen).days < self.ENTRY_EXPIRY_DAYS:
76-
return entry
77-
except ValueError:
78-
logger.warning(f"Timestamp for S3 key {s3_key} is not valid. Ignoring.")
79-
80-
return None
81-
8259
def get_entry(self, s3_key: str) -> Optional[S3CheckCacheEntry]:
8360
"""
8461
Checks if an entry exists in the cache, and returns it if it hasn't expired.
@@ -87,7 +64,23 @@ def get_entry(self, s3_key: str) -> Optional[S3CheckCacheEntry]:
8764
return None
8865

8966
with self.db_lock, self.db_connection:
90-
return self.get_connection_entry(s3_key, self.db_connection)
67+
entry_vals = self.db_connection.execute(
68+
f"SELECT * FROM {self.table_name} WHERE s3_key=?",
69+
[s3_key],
70+
).fetchone()
71+
if entry_vals:
72+
entry = S3CheckCacheEntry(
73+
s3_key=entry_vals[0],
74+
last_seen_time=str(entry_vals[1]),
75+
)
76+
try:
77+
last_seen = datetime.fromtimestamp(float(entry.last_seen_time))
78+
if (datetime.now() - last_seen).days < self.ENTRY_EXPIRY_DAYS:
79+
return entry
80+
except ValueError:
81+
logger.warning(f"Timestamp for S3 key {s3_key} is not valid. Ignoring.")
82+
83+
return None
9184

9285
def put_entry(self, entry: S3CheckCacheEntry) -> None:
9386
"""Inserts or replaces an entry into the cache database."""

src/deadline/job_attachments/upload.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -593,11 +593,8 @@ def verify_hash_cache_integrity(
593593
random.shuffle(s3_upload_keys)
594594
sampled_cache_entries: List[S3CheckCacheEntry] = []
595595
with S3CheckCache(s3_check_cache_dir) as s3_cache:
596-
local_connection = s3_cache.get_local_connection()
597596
for upload_key in s3_upload_keys:
598-
this_entry = s3_cache.get_connection_entry(
599-
s3_key=f"{s3_bucket}/{upload_key}", connection=local_connection
600-
)
597+
this_entry = s3_cache.get_entry(s3_key=f"{s3_bucket}/{upload_key}")
601598
if this_entry is not None:
602599
sampled_cache_entries.append(this_entry)
603600
if len(sampled_cache_entries) >= 30:
@@ -654,9 +651,7 @@ def upload_object_to_cas(
654651
s3_upload_key = self._generate_s3_upload_key(file, hash_algorithm, s3_cas_prefix)
655652
is_uploaded = False
656653

657-
if s3_check_cache.get_connection_entry(
658-
s3_key=f"{s3_bucket}/{s3_upload_key}", connection=s3_check_cache.get_local_connection()
659-
):
654+
if s3_check_cache.get_entry(s3_key=f"{s3_bucket}/{s3_upload_key}"):
660655
logger.debug(
661656
f"skipping {local_path} because {s3_bucket}/{s3_upload_key} exists in the cache"
662657
)
@@ -1073,9 +1068,7 @@ def _process_input_path(
10731068
file_status: FileStatus = FileStatus.UNCHANGED
10741069
actual_modified_time = str(datetime.fromtimestamp(path.stat().st_mtime))
10751070

1076-
entry: Optional[HashCacheEntry] = hash_cache.get_connection_entry(
1077-
full_path, hash_alg, connection=hash_cache.get_local_connection()
1078-
)
1071+
entry: Optional[HashCacheEntry] = hash_cache.get_entry(full_path, hash_alg)
10791072
if entry is not None:
10801073
# If the file was modified, we need to rehash it
10811074
if actual_modified_time != entry.last_modified_time:

test/unit/conftest.py

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,10 @@
1313

1414

1515
@pytest.fixture(scope="function")
16-
def fresh_deadline_config(monkeypatch):
16+
def fresh_deadline_config():
1717
"""
18-
Fixture to start with a blank AWS Deadline Cloud config file and isolated cache directories.
18+
Fixture to start with a blank AWS Deadline Cloud config file.
1919
20-
This fixture also overrides the HOME environment variable to ensure cache isolation
21-
between tests. Both HashCache and S3CheckCache will use temporary directories under
22-
the isolated HOME directory.
2320
"""
2421

2522
# Clear the session cache. Importing the cache invalidator at runtime is necessary
@@ -36,23 +33,6 @@ def fresh_deadline_config(monkeypatch):
3633
with open(temp_file_path, "w+t", encoding="utf8") as temp_file:
3734
temp_file.write("")
3835

39-
# Create a temporary HOME directory for cache isolation
40-
temp_home_dir = tempfile.TemporaryDirectory()
41-
temp_home_path = Path(temp_home_dir.name)
42-
43-
# Override HOME environment variable to isolate cache directories
44-
# This affects both:
45-
# - HashCache default: $HOME/.deadline/job_attachments/
46-
# - S3CheckCache via config_file.get_cache_directory(): $HOME/.deadline/cache/
47-
monkeypatch.setenv("HOME", str(temp_home_path))
48-
49-
# On Windows, os.path.expanduser("~") uses USERPROFILE instead of HOME
50-
# So we need to override USERPROFILE as well for Windows compatibility
51-
import sys
52-
53-
if sys.platform == "win32":
54-
monkeypatch.setenv("USERPROFILE", str(temp_home_path))
55-
5636
# Yield the temp file name with it patched in as the
5737
# AWS Deadline Cloud config file
5838
with patch.object(config_file, "CONFIG_FILE_PATH", str(temp_file_path)):
@@ -64,7 +44,6 @@ def fresh_deadline_config(monkeypatch):
6444
yield str(temp_file_path)
6545
finally:
6646
temp_dir.cleanup()
67-
temp_home_dir.cleanup()
6847

6948

7049
@pytest.fixture(scope="function", autouse=True)

0 commit comments

Comments
 (0)