diff options
Diffstat (limited to 'ambari-agent/src/main/python/ambari_agent/Controller.py')
-rw-r--r-- | ambari-agent/src/main/python/ambari_agent/Controller.py | 44 |
1 files changed, 31 insertions, 13 deletions
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index dc3a1cfd37..d985b91b5a 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -22,6 +22,7 @@ import logging import signal import json import sys +import platform import os import socket import time @@ -46,11 +47,21 @@ logger = logging.getLogger() AGENT_AUTO_RESTART_EXIT_CODE = 77 +IS_WINDOWS = platform.system() == "Windows" + class Controller(threading.Thread): - def __init__(self, config, range=30): + def __init__(self, config, heartbeat_stop_callback = None, range=30): threading.Thread.__init__(self) logger.debug('Initializing Controller RPC thread.') + + if heartbeat_stop_callback is None: + if IS_WINDOWS: + from HeartbeatHandlers_windows import HeartbeatStopHandler + else: + from HeartbeatStopHandler_linux import HeartbeatStopHandler + heartbeat_stop_callback = HeartbeatStopHandler() + self.lock = threading.Lock() self.safeMode = True self.credential = None @@ -62,7 +73,7 @@ class Controller(threading.Thread): self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname self.componentsUrl = server_secured_url + '/agent/v1/components/' - self.netutil = NetUtil() + self.netutil = NetUtil(heartbeat_stop_callback) self.responseId = -1 self.repeatRegistration = False self.isRegistered = False @@ -71,10 +82,10 @@ class Controller(threading.Thread): self.hasMappedComponents = True # Event is used for synchronizing heartbeat iterations (to make possible # manual wait() interruption between heartbeats ) - self.heartbeat_wait_event = threading.Event() + self.heartbeat_stop_callback = heartbeat_stop_callback # List of callbacks that are called at agent registration self.registration_listeners = [] - + # pull config directory out of config cache_dir = config.get('agent', 'cache_dir') if cache_dir is None: @@ -197,6 +208,9 @@ class Controller(threading.Thread): DEBUG_SUCCESSFULL_HEARTBEATS = 0 DEBUG_STOP_HEARTBEATING = False + def trigger_heartbeat(self): + self.heartbeat_stop_callback.set_heartbeat() + def heartbeatWithServer(self): self.DEBUG_HEARTBEAT_RETRIES = 0 self.DEBUG_SUCCESSFULL_HEARTBEATS = 0 @@ -261,14 +275,14 @@ class Controller(threading.Thread): if 'statusCommands' in response.keys(): self.addToStatusQueue(response['statusCommands']) pass - + if 'alertDefinitionCommands' in response.keys(): self.alert_scheduler_handler.update_definitions(response['alertDefinitionCommands'], True) pass if 'alertExecutionCommands' in response.keys(): self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands']) - pass + pass if "true" == response['restartAgent']: logger.error("Received the restartAgent command") @@ -284,7 +298,7 @@ class Controller(threading.Thread): certVerifFailed = False self.DEBUG_SUCCESSFULL_HEARTBEATS += 1 self.DEBUG_HEARTBEAT_RETRIES = 0 - self.heartbeat_wait_event.clear() + self.heartbeat_stop_callback.reset_heartbeat() except ssl.SSLError: self.repeatRegistration=False self.isRegistered = False @@ -319,10 +333,10 @@ class Controller(threading.Thread): # Sleep for some time timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \ - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS - self.heartbeat_wait_event.wait(timeout=timeout) - # Sleep a bit more to allow STATUS_COMMAND results to be collected - # and sent in one heartbeat. Also avoid server overload with heartbeats - time.sleep(self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS) + if 0 == self.heartbeat_stop_callback.wait(timeout, self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS): + # Stop loop when stop event received + logger.info("Stop event received") + self.DEBUG_STOP_HEARTBEATING=True pass def run(self): @@ -405,7 +419,10 @@ class Controller(threading.Thread): def main(argv=None): # Allow Ctrl-C - signal.signal(signal.SIGINT, signal.SIG_DFL) + if IS_WINDOWS: + from HeartbeatHandlers_windows import bind_signal_handlers + else: + from HeartbeatStopHandler_linux import bind_signal_handlers logger.setLevel(logging.INFO) formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - \ @@ -417,7 +434,8 @@ def main(argv=None): logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv)) config = AmbariConfig.config - collector = Controller(config) + heartbeat_stop_callback = bind_signal_handlers() + collector = Controller(config, heartbeat_stop_callback) collector.start() collector.run() |