diff --git a/product/CMFActivity/ActivityTool.py b/product/CMFActivity/ActivityTool.py index 402753a344aa9f540a74a4db5fc9697942c7ec37..4fde546b002ddd84e1effc9fd19783987d51d689 100644 --- a/product/CMFActivity/ActivityTool.py +++ b/product/CMFActivity/ActivityTool.py @@ -73,6 +73,7 @@ max_active_threads = 1 # 2 will cause more bug to appear (he he) is_initialized = False tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy +is_running_lock = threading.Lock() first_run = True currentNode = None ROLE_IDLE = 0 @@ -375,6 +376,11 @@ class ActivityTool (Folder, UniqueObject): distributingNode = '' _nodes = () + # Set to False when shutting down. Access outside of process_shutdown must + # be done under the protection of is_running_lock lock. + _is_running = True + # True when activities cannot be executing any more. + _has_processed_shutdown = False def __init__(self): return Folder.__init__(self, ActivityTool.id) @@ -592,6 +598,19 @@ class ActivityTool (Folder, UniqueObject): '/manageLoadBalancing?manage_tabs_message=' + urllib.quote(message)) + def process_shutdown(self, phase, time_in_phase): + """ + Prevent shutdown from happening while an activity queue is + processing a batch. + """ + self._is_running = False + if phase == 3 and not self._has_processed_shutdown: + self._has_processed_shutdown = True + LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.") + is_running_lock.acquire() + LOG('CMFActivity', INFO, "Shutdown: Activities finished.") + is_running_lock.release() + def process_timer(self, tick, interval, prev="", next=""): """ Call distribute() if we are the Distributing Node and call tic() @@ -706,8 +725,13 @@ class ActivityTool (Folder, UniqueObject): while has_awake_activity: has_awake_activity = 0 for activity in activity_list: - activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity - has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node) + is_running_lock.acquire() + try: + if self._is_running: + activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity + has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node) + finally: + is_running_lock.release() finally: # decrease the number of active_threads tic_lock.acquire()