File tree Expand file tree Collapse file tree 2 files changed +10
-1
lines changed Expand file tree Collapse file tree 2 files changed +10
-1
lines changed Original file line number Diff line number Diff line change 1
1
import asyncio
2
+ import os
2
3
import queue
3
4
import sys
4
5
import threading
15
16
16
17
T = TypeVar ("T" )
17
18
19
+ WATCHDOG_TIMEOUT_SEC = "TORCHFT_WATCHDOG_TIMEOUT_SEC"
20
+
18
21
19
22
class _TimerHandle :
20
23
def __init__ (self ) -> None :
@@ -61,7 +64,9 @@ def __init__(self) -> None:
61
64
62
65
# Give this much time the the `_event_loop_thread` to confirm that
63
66
# it is not stuck
64
- self ._watchdog_interval = timedelta (seconds = 30 )
67
+ self ._watchdog_interval = timedelta (
68
+ seconds = int (os .environ .get (WATCHDOG_TIMEOUT_SEC , "30" ))
69
+ )
65
70
66
71
# This queue is used to delete events on the main thread as cudaEventDestroy
67
72
# can block if the CUDA queue is full.
Original file line number Diff line number Diff line change @@ -762,6 +762,7 @@ def _step_post_hook(
762
762
# can be overrepresented.
763
763
self ._manager .start_quorum ()
764
764
fragment = self ._current_fragment ()
765
+ logger .info (f"Preparing fragment={ fragment } step={ self ._local_step } " )
765
766
self ._fragments [fragment ].prepare_sync ()
766
767
767
768
if self ._local_step < self ._sync_every :
@@ -770,6 +771,9 @@ def _step_post_hook(
770
771
if self ._local_step == self ._sync_every :
771
772
# Time to sync a fragment
772
773
fragment = self ._current_fragment ()
774
+ logger .info (
775
+ f"Syncing fragment={ fragment } step={ self ._local_step } manager_step={ self ._manager .current_step ()} "
776
+ )
773
777
self ._fragments [fragment ].perform_sync ()
774
778
775
779
# If the allreduce truly failed, we'll keep retrying this fragment.
You can’t perform that action at this time.
0 commit comments