summaryrefslogtreecommitdiff
path: root/ambari-agent/src/main/python/ambari_agent/Controller.py
diff options
context:
space:
mode:
Diffstat (limited to 'ambari-agent/src/main/python/ambari_agent/Controller.py')
-rw-r--r--ambari-agent/src/main/python/ambari_agent/Controller.py44
1 files changed, 31 insertions, 13 deletions
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index dc3a1cfd37..d985b91b5a 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -22,6 +22,7 @@ import logging
import signal
import json
import sys
+import platform
import os
import socket
import time
@@ -46,11 +47,21 @@ logger = logging.getLogger()
AGENT_AUTO_RESTART_EXIT_CODE = 77
+IS_WINDOWS = platform.system() == "Windows"
+
class Controller(threading.Thread):
- def __init__(self, config, range=30):
+ def __init__(self, config, heartbeat_stop_callback = None, range=30):
threading.Thread.__init__(self)
logger.debug('Initializing Controller RPC thread.')
+
+ if heartbeat_stop_callback is None:
+ if IS_WINDOWS:
+ from HeartbeatHandlers_windows import HeartbeatStopHandler
+ else:
+ from HeartbeatStopHandler_linux import HeartbeatStopHandler
+ heartbeat_stop_callback = HeartbeatStopHandler()
+
self.lock = threading.Lock()
self.safeMode = True
self.credential = None
@@ -62,7 +73,7 @@ class Controller(threading.Thread):
self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname
self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname
self.componentsUrl = server_secured_url + '/agent/v1/components/'
- self.netutil = NetUtil()
+ self.netutil = NetUtil(heartbeat_stop_callback)
self.responseId = -1
self.repeatRegistration = False
self.isRegistered = False
@@ -71,10 +82,10 @@ class Controller(threading.Thread):
self.hasMappedComponents = True
# Event is used for synchronizing heartbeat iterations (to make possible
# manual wait() interruption between heartbeats )
- self.heartbeat_wait_event = threading.Event()
+ self.heartbeat_stop_callback = heartbeat_stop_callback
# List of callbacks that are called at agent registration
self.registration_listeners = []
-
+
# pull config directory out of config
cache_dir = config.get('agent', 'cache_dir')
if cache_dir is None:
@@ -197,6 +208,9 @@ class Controller(threading.Thread):
DEBUG_SUCCESSFULL_HEARTBEATS = 0
DEBUG_STOP_HEARTBEATING = False
+ def trigger_heartbeat(self):
+ self.heartbeat_stop_callback.set_heartbeat()
+
def heartbeatWithServer(self):
self.DEBUG_HEARTBEAT_RETRIES = 0
self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
@@ -261,14 +275,14 @@ class Controller(threading.Thread):
if 'statusCommands' in response.keys():
self.addToStatusQueue(response['statusCommands'])
pass
-
+
if 'alertDefinitionCommands' in response.keys():
self.alert_scheduler_handler.update_definitions(response['alertDefinitionCommands'], True)
pass
if 'alertExecutionCommands' in response.keys():
self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])
- pass
+ pass
if "true" == response['restartAgent']:
logger.error("Received the restartAgent command")
@@ -284,7 +298,7 @@ class Controller(threading.Thread):
certVerifFailed = False
self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
self.DEBUG_HEARTBEAT_RETRIES = 0
- self.heartbeat_wait_event.clear()
+ self.heartbeat_stop_callback.reset_heartbeat()
except ssl.SSLError:
self.repeatRegistration=False
self.isRegistered = False
@@ -319,10 +333,10 @@ class Controller(threading.Thread):
# Sleep for some time
timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
- self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
- self.heartbeat_wait_event.wait(timeout=timeout)
- # Sleep a bit more to allow STATUS_COMMAND results to be collected
- # and sent in one heartbeat. Also avoid server overload with heartbeats
- time.sleep(self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
+ if 0 == self.heartbeat_stop_callback.wait(timeout, self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS):
+ # Stop loop when stop event received
+ logger.info("Stop event received")
+ self.DEBUG_STOP_HEARTBEATING=True
pass
def run(self):
@@ -405,7 +419,10 @@ class Controller(threading.Thread):
def main(argv=None):
# Allow Ctrl-C
- signal.signal(signal.SIGINT, signal.SIG_DFL)
+ if IS_WINDOWS:
+ from HeartbeatHandlers_windows import bind_signal_handlers
+ else:
+ from HeartbeatStopHandler_linux import bind_signal_handlers
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - \
@@ -417,7 +434,8 @@ def main(argv=None):
logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
config = AmbariConfig.config
- collector = Controller(config)
+ heartbeat_stop_callback = bind_signal_handlers()
+ collector = Controller(config, heartbeat_stop_callback)
collector.start()
collector.run()