2
2
import itertools
3
3
import logging
4
4
import sys
5
+ import time
5
6
import typing
6
7
import random
7
8
import weakref
21
22
from ..utils .deprecated import deprecated
22
23
from ..utils .enums import StateLists , MProcessingType
23
24
from ..utils .event import Eventful
24
- from ..utils .helpers import PickleSerializer , pretty_print_state_descriptors
25
+ from ..utils .helpers import PickleSerializer , pretty_print_state_descriptors , deque
25
26
from ..utils .log import set_verbosity
26
27
from ..utils .nointerrupt import WithKeyboardInterruptAs
27
28
from .workspace import Workspace , Testcase
28
- from .worker import WorkerSingle , WorkerThread , WorkerProcess , DaemonThread
29
+ from .worker import (
30
+ WorkerSingle ,
31
+ WorkerThread ,
32
+ WorkerProcess ,
33
+ DaemonThread ,
34
+ LogCaptureWorker ,
35
+ state_monitor ,
36
+ )
29
37
30
38
from multiprocessing .managers import SyncManager
31
39
import threading
@@ -88,6 +96,7 @@ def wait_for(self, condition, *args, **kwargs):
88
96
self ._terminated_states = []
89
97
self ._busy_states = []
90
98
self ._killed_states = []
99
+ self ._log_queue = deque (maxlen = 5000 )
91
100
self ._shared_context = {}
92
101
93
102
def _manticore_threading (self ):
@@ -99,6 +108,7 @@ def _manticore_threading(self):
99
108
self ._terminated_states = []
100
109
self ._busy_states = []
101
110
self ._killed_states = []
111
+ self ._log_queue = deque (maxlen = 5000 )
102
112
self ._shared_context = {}
103
113
104
114
def _manticore_multiprocessing (self ):
@@ -120,6 +130,9 @@ def raise_signal():
120
130
self ._terminated_states = self ._manager .list ()
121
131
self ._busy_states = self ._manager .list ()
122
132
self ._killed_states = self ._manager .list ()
133
+ # The multiprocessing queue is much slower than the deque when it gets full, so we
134
+ # triple the size in order to prevent that from happening.
135
+ self ._log_queue = self ._manager .Queue (15000 )
123
136
self ._shared_context = self ._manager .dict ()
124
137
self ._context_value_types = {list : self ._manager .list , dict : self ._manager .dict }
125
138
@@ -370,8 +383,10 @@ def __init__(
370
383
# Workers will use manticore __dict__ So lets spawn them last
371
384
self ._workers = [self ._worker_type (id = i , manticore = self ) for i in range (consts .procs )]
372
385
373
- # We won't create the daemons until .run() is called
374
- self ._daemon_threads : typing .List [DaemonThread ] = []
386
+ # Create log capture worker. We won't create the rest of the daemons until .run() is called
387
+ self ._daemon_threads : typing .Dict [int , DaemonThread ] = {
388
+ - 1 : LogCaptureWorker (id = - 1 , manticore = self )
389
+ }
375
390
self ._daemon_callbacks : typing .List [typing .Callable ] = []
376
391
377
392
self ._snapshot = None
@@ -1102,21 +1117,27 @@ def run(self):
1102
1117
# User subscription to events is disabled from now on
1103
1118
self .subscribe = None
1104
1119
1120
+ self .register_daemon (state_monitor )
1121
+ self ._daemon_threads [- 1 ].start () # Start log capture worker
1122
+
1105
1123
# Passing generators to callbacks is a bit hairy because the first callback would drain it if we didn't
1106
1124
# clone the iterator in event.py. We're preserving the old API here, but it's something to avoid in the future.
1107
1125
self ._publish ("will_run" , self .ready_states )
1108
1126
self ._running .value = True
1127
+
1109
1128
# start all the workers!
1110
1129
for w in self ._workers :
1111
1130
w .start ()
1112
1131
1113
1132
# Create each daemon thread and pass it `self`
1114
- if not self ._daemon_threads : # Don't recreate the threads if we call run multiple times
1115
- for i , cb in enumerate (self ._daemon_callbacks ):
1133
+ for i , cb in enumerate (self ._daemon_callbacks ):
1134
+ if (
1135
+ i not in self ._daemon_threads
1136
+ ): # Don't recreate the threads if we call run multiple times
1116
1137
dt = DaemonThread (
1117
1138
id = i , manticore = self
1118
1139
) # Potentially duplicated ids with workers. Don't mix!
1119
- self ._daemon_threads . append ( dt )
1140
+ self ._daemon_threads [ dt . id ] = dt
1120
1141
dt .start (cb )
1121
1142
1122
1143
# Main process. Lets just wait and capture CTRL+C at main
@@ -1173,6 +1194,17 @@ def finalize(self):
1173
1194
self .generate_testcase (state )
1174
1195
self .remove_all ()
1175
1196
1197
+ def wait_for_log_purge (self ):
1198
+ """
1199
+ If a client has accessed the log server, and there are still buffered logs,
1200
+ waits up to 2 seconds for the client to retrieve the logs.
1201
+ """
1202
+ if self ._daemon_threads [- 1 ].activated :
1203
+ for _ in range (8 ):
1204
+ if self ._log_queue .empty ():
1205
+ break
1206
+ time .sleep (0.25 )
1207
+
1176
1208
############################################################################
1177
1209
############################################################################
1178
1210
############################################################################
@@ -1188,6 +1220,7 @@ def save_run_data(self):
1188
1220
config .save (f )
1189
1221
1190
1222
logger .info ("Results in %s" , self ._output .store .uri )
1223
+ self .wait_for_log_purge ()
1191
1224
1192
1225
def introspect (self ) -> typing .Dict [int , StateDescriptor ]:
1193
1226
"""
0 commit comments