1515from kombu .utils .json import dumps , loads
1616
1717from django .conf import settings
18- from django .db import transaction , close_old_connections
18+ from django .db import (
19+ DEFAULT_DB_ALIAS ,
20+ close_old_connections ,
21+ router ,
22+ transaction
23+ )
1924from django .db .utils import DatabaseError , InterfaceError
2025from django .core .exceptions import ObjectDoesNotExist
2126
@@ -258,7 +263,7 @@ def schedule_changed(self):
258263 # other transactions until the current transaction is
259264 # committed (Issue #41).
260265 try :
261- transaction .commit ()
266+ transaction .commit (using = self . target_db )
262267 except transaction .TransactionManagementError :
263268 pass # not in transaction management.
264269
@@ -287,7 +292,18 @@ def reserve(self, entry):
287292 self ._dirty .add (new_entry .name )
288293 return new_entry
289294
290- def sync (self ):
295+ @property
296+ def target_db (self ):
297+ """Determine if there is a django route"""
298+ if not settings .DATABASE_ROUTERS :
299+ return DEFAULT_DB_ALIAS
300+ # If the project does not actually implement this method,
301+ # DEFAULT_DB_ALIAS will be automatically returned.
302+ # The exception will be located to the django routing section
303+ db = router .db_for_write (self .Model )
304+ return db
305+
306+ def _sync (self ):
291307 if logger .isEnabledFor (logging .DEBUG ):
292308 debug ('Writing entries...' )
293309 _tried = set ()
@@ -313,6 +329,10 @@ def sync(self):
313329 # retry later, only for the failed ones
314330 self ._dirty |= _failed
315331
332+ def sync (self ):
333+ with transaction .atomic (using = self .target_db ):
334+ self ._sync ()
335+
316336 def update_from_dict (self , mapping ):
317337 s = {}
318338 for name , entry_fields in mapping .items ():
0 commit comments