diff options
author | Florian Barca <fbarca@hortonworks.com> | 2014-12-30 10:35:45 -0800 |
---|---|---|
committer | Florian Barca <fbarca@hortonworks.com> | 2014-12-30 10:35:45 -0800 |
commit | 6c21b0942f40791ab4a461048d55e975593eab85 (patch) | |
tree | 6f3f996828f45a8473fa93e70058b42dd48bb653 /ambari-metrics/ambari-metrics-host-monitoring | |
parent | 9884cbdd51f6d465044d4573a265f124424ec821 (diff) |
Windows build for 2 Ambari Metrics service: Host Monitoring and Timeline Service (Collector).
+ Added Windows profiles to the Maven project files
+ Added the necessary Windows assemblies
+ Created Windows service skeletons
+ Host Monitoring: added OS-independent process termination handler
+ Collector: added debugging support for the Java process
+ Fixed services shutdown, especially when joining spawned threads
+ Fixed unit tests
+ Added support for unit testing on MacOS and Windows
Windows-specific:
+ Moved the assembly descriptors to ambari-metrics-assembly
+ Fixed comments in the configuration files
+ Added soft dependencies on the embedded HBase service
+ Added support for the embedded HBase service setup
Diffstat (limited to 'ambari-metrics/ambari-metrics-host-monitoring')
12 files changed, 714 insertions, 79 deletions
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/ambari-metrics-monitor.cmd b/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/ambari-metrics-monitor.cmd new file mode 100644 index 0000000000..115b6a6f7e --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/ambari-metrics-monitor.cmd @@ -0,0 +1,17 @@ +@rem Licensed to the Apache Software Foundation (ASF) under one or more +@rem contributor license agreements. See the NOTICE file distributed with +@rem this work for additional information regarding copyright ownership. +@rem The ASF licenses this file to You under the Apache License, Version 2.0 +@rem (the "License"); you may not use this file except in compliance with +@rem the License. You may obtain a copy of the License at +@rem +@rem http://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. + +@echo off +python.exe -u %~dp0\sbin\amhm_service.py %* 2>&1 diff --git a/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_groups.conf b/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_groups.conf new file mode 100644 index 0000000000..6d5a62ff59 --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_groups.conf @@ -0,0 +1,19 @@ +{ + "host_metric_groups": { + "all": { + "collect_every": "10", + "metrics": [ + { + "name": "bytes_out", + "value_threshold": "128" + } + ] + } + }, + "process_metric_groups": { + "": { + "collect_every": "15", + "metrics": [] + } + } +} diff --git a/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_monitor.ini b/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_monitor.ini new file mode 100644 index 0000000000..bc2b461e0a --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_monitor.ini @@ -0,0 +1,30 @@ +; +; Licensed to the Apache Software Foundation (ASF) under one +; or more contributor license agreements. See the NOTICE file +; distributed with this work for additional information +; regarding copyright ownership. The ASF licenses this file +; to you under the Apache License, Version 2.0 (the +; "License"); you may not use this file except in compliance +; with the License. You may obtain a copy of the License at +; +; http://www.apache.org/licenses/LICENSE-2.0 +; +; Unless required by applicable law or agreed to in writing, software +; distributed under the License is distributed on an "AS IS" BASIS, +; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +; See the License for the specific language governing permissions and +; limitations under the License. +; + +[default] +debug_level = INFO +metrics_server = {{ams_collector_host_single}}:{{ams_collector_port}} +enable_time_threshold = false +enable_value_threshold = false + +[emitter] +send_interval = 60 + +[collector] +collector_sleep_interval = 5 +max_queue_size = 5000 diff --git a/ambari-metrics/ambari-metrics-host-monitoring/pom.xml b/ambari-metrics/ambari-metrics-host-monitoring/pom.xml index c2130572a2..c2f322c95d 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/pom.xml +++ b/ambari-metrics/ambari-metrics-host-monitoring/pom.xml @@ -32,6 +32,7 @@ <resmonitor.install.dir> /usr/lib/python2.6/site-packages/resource_monitoring </resmonitor.install.dir> + <final.name>${project.artifactId}-${project.version}</final.name> </properties> <build> <plugins> @@ -82,55 +83,12 @@ <version>3.0</version> </plugin> <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <configuration> - <tarLongFileMode>gnu</tarLongFileMode> - <descriptors> - <descriptor>${project.basedir}/../../ambari-project/src/main/assemblies/empty.xml</descriptor> - </descriptors> - </configuration> - <executions> - <execution> - <id>build-tarball</id> - <phase>none</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <id>psutils-compile</id> - <phase>process-test-classes</phase> - <goals> - <goal>run</goal> - </goals> - <configuration> - <target name="psutils-compile"> - <exec dir="${basedir}/src/main/python/psutil" executable="${project.basedir}/../../ambari-common/src/main/unix/ambari-python-wrap" failonerror="true"> - <arg value="setup.py" /> - <arg value="build" /> - <arg value="--build-platlib" /> - <arg value="${basedir}/target/psutil_build" /> - </exec> - </target> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <executions> <execution> <configuration> - <executable>${project.basedir}/../../ambari-common/src/main/unix/ambari-python-wrap</executable> + <executable>${executable.python}</executable> <workingDirectory>src/test/python</workingDirectory> <arguments> <argument>unitTests.py</argument> @@ -153,6 +111,8 @@ <artifactId>apache-rat-plugin</artifactId> <configuration> <excludes> + <exclude>conf/unix/metric_groups.conf</exclude> + <exclude>conf/windows/metric_groups.conf</exclude> <exclude>src/main/python/psutil/**</exclude> <exclude>.pydevproject</exclude> </excludes> @@ -168,4 +128,102 @@ </plugin> </plugins> </build> + <profiles> + <profile> + <id>windows</id> + <activation> + <os> + <family>win</family> + </os> + </activation> + <properties> + <envClassifier>win</envClassifier> + <dirsep>\</dirsep> + <pathsep>;</pathsep> + <executable.python>python</executable.python> + <executable.shell>cmd</executable.shell> + <fileextension.shell>cmd</fileextension.shell> + <fileextension.dot.shell-default>.cmd</fileextension.dot.shell-default> + <assemblydescriptor>src/main/assemblies/amhm-windows.xml</assemblydescriptor> + <packagingFormat>jar</packagingFormat> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <id>psutils-compile</id> + <phase>process-test-classes</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target name="psutils-compile"> + <exec dir="${basedir}/src/main/python/psutil" executable="python" failonerror="true"> + <arg value="setup.py" /> + <arg value="bdist_egg" /> + <arg value="--bdist-dir" /> + <arg value="${basedir}/target/psutil_build_temp" /> + <arg value="--dist-dir" /> + <arg value="${basedir}/target/psutil_build" /> + </exec> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>linux</id> + <activation> + <os> + <family>unix</family> + </os> + </activation> + <properties> + <envClassifier>linux</envClassifier> + <dirsep>/</dirsep> + <pathsep>:</pathsep> + <executable.python>${project.basedir}/../../ambari-common/src/main/unix/ambari-python-wrap</executable.python> + <executable.shell>sh</executable.shell> + <fileextension.shell>sh</fileextension.shell> + <fileextension.dot.shell-default></fileextension.dot.shell-default> + <assemblydescriptor>src/main/assemblies/empty.xml</assemblydescriptor> + <packagingFormat>jar</packagingFormat> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <id>psutils-compile</id> + <phase>process-test-classes</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target name="psutils-compile"> + <exec dir="${basedir}/src/main/python/psutil" executable="python" failonerror="true"> + <arg value="setup.py" /> + <arg value="build" /> + <arg value="--build-platlib" /> + <arg value="${basedir}/target/psutil_build" /> + </exec> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> </project> diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/amhm_service.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/amhm_service.py new file mode 100644 index 0000000000..0f8daab1be --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/amhm_service.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' +import glob + +import optparse +import os +import sys + +from ambari_commons.ambari_service import AmbariService +from ambari_commons.exceptions import FatalException, NonFatalException +from ambari_commons.logging_utils import print_warning_msg, print_error_msg, print_info_msg +from ambari_commons.os_utils import search_file, run_os_command +from ambari_commons.os_windows import SvcStatusCallback +from core.config_reader import SERVER_OUT_FILE, SERVICE_USERNAME_KEY, SERVICE_PASSWORD_KEY, \ + SETUP_ACTION, START_ACTION, STOP_ACTION, RESTART_ACTION, STATUS_ACTION +from core.stop_handler import bind_signal_handlers, StopHandler +from main import server_process_main + + +# +# Windows-specific service implementation. This class will be instantiated directly by pythonservice.exe. +# +class AMHostMonitoringService(AmbariService): + AmbariService._svc_name_ = "AmbariMetricsHostMonitoring" + AmbariService._svc_display_name_ = "Ambari Metrics Host Monitoring" + AmbariService._svc_description_ = "Ambari Metrics Host Monitoring Service" + + AmbariService._AdjustServiceVersion() + + # Adds the necessary script dir to the Python's modules path. + # Modify this as the deployed product's dir structure changes. + def _adjustPythonPath(self, current_dir): + python_path = os.path.join(current_dir, "sbin") + sys.path.insert(0, python_path) + pass + + def SvcDoRun(self): + scmStatus = SvcStatusCallback(self) + + self.redirect_output_streams() + + stopHandler = StopHandler(AMHostMonitoringService._heventSvcStop) + bind_signal_handlers(stopHandler) + + AMHostMonitoringService.set_ctrl_c_handler(ctrlHandler) + + server_process_main(stopHandler, scmStatus) + pass + + def _InitOptionsParser(self): + return init_options_parser() + + def redirect_output_streams(self): + self._RedirectOutputStreamsToFile(SERVER_OUT_FILE) + pass + + +def ctrlHandler(ctrlType): + AMHostMonitoringService.DefCtrlCHandler() + return True + + +def svcsetup(): + AMHostMonitoringService.set_ctrl_c_handler(ctrlHandler) + # we don't save password between 'setup' runs, so we can't run Install every time. We run 'setup' only if user and + # password provided or if service not installed + if (SERVICE_USERNAME_KEY in os.environ and SERVICE_PASSWORD_KEY in os.environ): + AMHostMonitoringService.Install(username=os.environ[SERVICE_USERNAME_KEY], password=os.environ[SERVICE_PASSWORD_KEY]) + elif AMHostMonitoringService.QueryStatus() == "not installed": + AMHostMonitoringService.Install() + pass + + +# +# Starts the Ambari Metrics Collector. The server can start as a service or standalone process. +# args: +# options.is_process = True - start the server as a process. For now, there is no restrictions for the number of +# server instances that can run like this. +# options.is_process = False - start the server in normal mode, as a Windows service. If the Ambari Metrics Collector +# is not registered as a service, the function fails. By default, only one instance of the service can +# possibly run. +# +def start(options): + AMHostMonitoringService.set_ctrl_c_handler(ctrlHandler) + + if options.is_process: + #Run as a normal process. Invoke the ServiceMain directly. + stopHandler = StopHandler(AMHostMonitoringService._heventSvcStop) + bind_signal_handlers(stopHandler) + server_process_main(stopHandler) + else: + AMHostMonitoringService.Start() + +# +# Stops the Ambari Metrics Collector. Ineffective when the server is started as a standalone process. +# +def stop(): + AMHostMonitoringService.Stop() + +# +# Prints the Ambari Metrics Collector service status. +# +def svcstatus(options): + options.exit_message = None + + statusStr = AMHostMonitoringService.QueryStatus() + print "Ambari Metrics Collector is " + statusStr + + +def init_options_parser(): + parser = optparse.OptionParser(usage="usage: %prog action [options]", ) + parser.add_option('-d', '--debug', action="store_true", dest='debug', default=False, + help="Start Ambari Metrics Host Monitoring in debug mode") + parser.add_option('-p', '--process', action="store_true", dest='is_process', default=False, + help="Start Ambari Metrics Host Monitoring as a normal process, not as a service") + + # --help reserved for help + return parser + +def find_python_exe_path(): + paths = "." + os.pathsep + os.environ["PATH"] + + # Find python.exe by attempting to load it as a resource dll + python_path = search_file("python.exe", paths) + return os.path.dirname(python_path) + + +def find_psutil_egg(): + abs_subdir = os.path.join(os.getcwd(), "sbin", "psutil", "build") + egg_files = glob.glob(os.path.join(abs_subdir, "psutil*-win-amd64.egg")) + if egg_files is None or len(egg_files) == 0: + err = "Unable to find the expected psutil egg file in {0}. " \ + "Verify that the installation carried out correctly.".format(abs_subdir) + raise FatalException(1, err) + if len(egg_files) > 1: + err = "Multiple psutil egg files found in {0}".format(abs_subdir) + print_warning_msg(err) + #Return the latest + return egg_files[len(egg_files) - 1] + + +def setup_psutil(): + python_exe_path = find_python_exe_path() + egg_file = find_psutil_egg() + cmd = [os.path.join(python_exe_path, "Scripts", "easy_install"), "--upgrade", egg_file] + + retval, std_out, std_err = run_os_command(cmd) + if std_err is not None and std_err != '': + print_warning_msg(std_err) + print_info_msg(std_out) + if 0 != retval: + err = "Psutil egg installation failed. Exit code={0}, err output={1}".format(retval, std_err) + raise FatalException(1, err) + + # Now the egg should be installed in the site_packages dir + +def win_main(): + parser = init_options_parser() + (options, args) = parser.parse_args() + + options.warnings = [] + options.exit_message = None + + if options.debug: + sys.frozen = 'windows_exe' # Fake py2exe so we can debug + + if len(args) == 0: + print parser.print_help() + parser.error("No action entered") + + action = args[0] + + try: + if action == SETUP_ACTION: + setup_psutil() + svcsetup() + elif action == START_ACTION: + start(options) + elif action == STOP_ACTION: + stop() + elif action == RESTART_ACTION: + stop() + start(options) + elif action == STATUS_ACTION: + svcstatus(options) + else: + parser.error("Invalid action") + + if options.warnings: + for warning in options.warnings: + print_warning_msg(warning) + pass + options.exit_message = "Ambari Metrics Host Monitoring '%s' completed with warnings." % action + pass + except FatalException as e: + if e.reason is not None: + print_error_msg("Exiting with exit code {0}. \nREASON: {1}".format(e.code, e.reason)) + sys.exit(e.code) + except NonFatalException as e: + options.exit_message = "Ambari Metrics Host Monitoring '%s' completed with warnings." % action + if e.reason is not None: + print_warning_msg(e.reason) + + if options.exit_message is not None: + print options.exit_message + + sys.exit(0) + +if __name__ == "__main__": + try: + win_main() + except (KeyboardInterrupt, EOFError): + print("\nAborting ... Keyboard Interrupt.") + sys.exit(1) diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/__init__.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/__init__.py index 996120fd40..15ad117caa 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/__init__.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/__init__.py @@ -19,8 +19,9 @@ limitations under the License. """ import os, sys + path = os.path.abspath(__file__) -path = os.path.join(os.path.dirname(os.path.dirname(path)), "psutil/build/") +path = os.path.normpath(os.path.join(os.path.dirname(path), "psutil", "build")) for dir in os.walk(path).next()[1]: if 'lib' in dir: diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py index daabf37263..463c98cf53 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py @@ -22,10 +22,67 @@ import ConfigParser import StringIO import json import os +from ambari_commons import OSConst +from ambari_commons.os_family_impl import OsFamilyImpl + + +# +# Abstraction for OS-dependent configuration defaults +# +class ConfigDefaults(object): + def get_config_file_path(self): + pass + def get_metric_file_path(self): + pass + +@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) +class ConfigDefaultsWindows(ConfigDefaults): + def __init__(self): + self._CONFIG_FILE_PATH = "conf\\metric_monitor.ini" + self._METRIC_FILE_PATH = "conf\\metric_groups.conf" + pass + + def get_config_file_path(self): + return self._CONFIG_FILE_PATH + def get_metric_file_path(self): + return self._METRIC_FILE_PATH + +@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) +class ConfigDefaultsLinux(ConfigDefaults): + def __init__(self): + self._CONFIG_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_monitor.ini" + self._METRIC_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_groups.conf" + pass + + def get_config_file_path(self): + return self._CONFIG_FILE_PATH + def get_metric_file_path(self): + return self._METRIC_FILE_PATH + +configDefaults = ConfigDefaults() + config = ConfigParser.RawConfigParser() -CONFIG_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_monitor.ini" -METRIC_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_groups.conf" + +CONFIG_FILE_PATH = configDefaults.get_config_file_path() +METRIC_FILE_PATH = configDefaults.get_metric_file_path() + +OUT_DIR = os.path.join(os.sep, "var", "log", "ambari-metrics-host-monitoring") +SERVER_OUT_FILE = OUT_DIR + os.sep + "ambari-metrics-host-monitoring.out" +SERVER_LOG_FILE = OUT_DIR + os.sep + "ambari-metrics-host-monitoring.log" + +PID_DIR = os.path.join(os.sep, "var", "run", "ambari-metrics-host-monitoring") +PID_OUT_FILE = PID_DIR + os.sep + "ambari-metrics-host-monitoring.pid" +EXITCODE_OUT_FILE = PID_DIR + os.sep + "ambari-metrics-host-monitoring.exitcode" + +SERVICE_USERNAME_KEY = "TMP_AMHM_USERNAME" +SERVICE_PASSWORD_KEY = "TMP_AMHM_PASSWORD" + +SETUP_ACTION = "setup" +START_ACTION = "start" +STOP_ACTION = "stop" +RESTART_ACTION = "restart" +STATUS_ACTION = "status" config_content = """ [default] @@ -96,6 +153,11 @@ class Configuration: self.metric_groups = json.load(open(METRIC_FILE_PATH)) else: print 'No metric configs found at {0}'.format(METRIC_FILE_PATH) + self.metric_groups = \ + { + 'host_metric_groups': [], + 'process_metric_groups': [] + } pass def getConfig(self): diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py index 51f0980b82..e0ef804f47 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py @@ -33,7 +33,7 @@ logger = logging.getLogger() class Controller(threading.Thread): - def __init__(self, config): + def __init__(self, config, stop_handler): # Process initialization code threading.Thread.__init__(self) logger.debug('Initializing Controller thread.') @@ -48,27 +48,42 @@ class Controller(threading.Thread): self.metric_collector = MetricsCollector(self.event_queue, self.application_metric_map) self.server_url = config.get_server_address() self.sleep_interval = config.get_collector_sleep_interval() + self._stop_handler = stop_handler self.initialize_events_cache() - self.emitter = Emitter(self.config, self.application_metric_map) + self.emitter = Emitter(self.config, self.application_metric_map, stop_handler) + self._t = None def run(self): logger.info('Running Controller thread: %s' % threading.currentThread().getName()) - # Wake every 5 seconds to push events to the queue + + self.start_emitter() + + # Wake every 5 seconds to push events to the queue while True: if (self.event_queue.full()): logger.warn('Event Queue full!! Suspending further collections.') else: self.enqueque_events() pass - time.sleep(self.sleep_interval) + #Wait for the service stop event instead of sleeping blindly + if 0 == self._stop_handler.wait(self.sleep_interval): + logger.info('Shutting down Controller thread') + break + + if not self._t is None: + self._t.cancel() + self._t.join(5) + + #The emitter thread should have stopped by now, just ensure it has shut down properly + self.emitter.join(5) pass # TODO: Optimize to not use Timer class and use the Queue instead def enqueque_events(self): # Queue events for up to a minute for event in self.events_cache: - t = Timer(event.get_collect_interval(), self.metric_collector.process_event, args=(event,)) - t.start() + self._t = Timer(event.get_collect_interval(), self.metric_collector.process_event, args=(event,)) + self._t.start() pass def initialize_events_cache(self): diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py index be83250b9a..c3fd543e7f 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py @@ -32,12 +32,13 @@ class Emitter(threading.Thread): """ Wake up every send interval seconds and empty the application metric map. """ - def __init__(self, config, application_metric_map): + def __init__(self, config, application_metric_map, stop_handler): threading.Thread.__init__(self) logger.debug('Initializing Emitter thread.') self.lock = threading.Lock() self.collector_address = config.get_server_address() self.send_interval = config.get_send_interval() + self._stop_handler = stop_handler self.application_metric_map = application_metric_map def run(self): @@ -45,10 +46,16 @@ class Emitter(threading.Thread): while True: try: self.submit_metrics() - time.sleep(self.send_interval) + #Wait for the service stop event instead of sleeping blindly + if 0 == self._stop_handler.wait(self.send_interval): + logger.info('Shutting down Emitter thread') + return except Exception, e: logger.warn('Unable to emit events. %s' % str(e)) - time.sleep(self.RETRY_SLEEP_INTERVAL) + #Wait for the service stop event instead of sleeping blindly + if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL): + logger.info('Shutting down Emitter thread - abort retry') + return logger.info('Retrying emit after %s seconds.' % self.RETRY_SLEEP_INTERVAL) pass @@ -69,7 +76,9 @@ class Emitter(threading.Thread): logger.warn("Error sending metrics to server. Retrying after {0} " "...".format(self.RETRY_SLEEP_INTERVAL)) retry_count += 1 - time.sleep(self.RETRY_SLEEP_INTERVAL) + #Wait for the service stop event instead of sleeping blindly + if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL): + return pass pass diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py new file mode 100644 index 0000000000..bfb6957fa5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +import os +import signal +import threading +import traceback + +from ambari_commons import OSConst, OSCheck +from ambari_commons.exceptions import FatalException +from ambari_commons.os_family_impl import OsFamilyImpl + + +logger = logging.getLogger() + +_handler = None + + +class StopHandler(object): + def set_stop(self): + pass + + def wait(self, timeout=None): + return -1 + + +# +# Windows implementation +# +@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) +class StopHandlerWindows(StopHandler): + def __init__(self, stopEvent=None): + import win32event + # Event used to gracefully stop the process + if stopEvent is None: + # Allow standalone testing + self._heventStop = win32event.CreateEvent(None, 0, 0, None) + else: + # Allow one unique event per process + self._heventStop = stopEvent + + def set_stop(self): + import win32event + win32event.SetEvent(self._heventStop) + + def wait(self, timeout=None): + ''' + :param timeout: Time to wait, in seconds. + :return: 0 == stop event signaled, -1 = timeout + ''' + import win32event + + if timeout is None: + timeout = win32event.INFINITE + else: + timeout = timeout * 1000 + + result = win32event.WaitForSingleObject(self._heventStop, timeout) + if(win32event.WAIT_OBJECT_0 != result and win32event.WAIT_TIMEOUT != result): + raise FatalException(-1, "Error waiting for stop event: " + str(result)) + if (win32event.WAIT_TIMEOUT == result): + return -1 + logger.info("Stop event received") + return result # 0 -> stop + + +# +# Linux implementation +# +def signal_handler(signum, frame): + global _handler + _handler.set_stop() + +def debug(sig, frame): + """Interrupt running process, and provide a python prompt for + interactive debugging.""" + d = {'_frame': frame} # Allow access to frame object. + d.update(frame.f_globals) # Unless shadowed by global + d.update(frame.f_locals) + + message = "Signal received : entering python shell.\nTraceback:\n" + message += ''.join(traceback.format_stack(frame)) + logger.info(message) + + +@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) +class StopHandlerLinux(StopHandler): + def __init__(self, stopEvent=None): + # Event used to gracefully stop the process + if stopEvent is None: + # Allow standalone testing + self.stop_event = threading.Event() + else: + # Allow one unique event per process + self.stop_event = stopEvent + + def set_stop(self): + self.stop_event.set() + + def wait(self, timeout=None): + # Stop process when stop event received + if self.stop_event.wait(timeout): + logger.info("Stop event received") + return 0 + # Timeout + return -1 + + +def bind_signal_handlers(new_handler=None): + if OSCheck.get_os_family() != OSConst.WINSRV_FAMILY: + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGUSR1, debug) + + if new_handler is None: + global _handler + _handler = StopHandler() + else: + _handler = new_handler + return _handler diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py index 09ae7e4750..37e77e5a9f 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py @@ -18,27 +18,67 @@ See the License for the specific language governing permissions and limitations under the License. ''' -import core -from core.controller import Controller -from core.config_reader import Configuration import logging -import signal +import os import sys +from ambari_commons.os_utils import remove_file + +from core.controller import Controller +from core.config_reader import Configuration, PID_OUT_FILE, SERVER_LOG_FILE, SERVER_OUT_FILE +from core.stop_handler import bind_signal_handlers + + logger = logging.getLogger() + +def save_pid(pid, pidfile): + """ + Save pid to pidfile. + """ + try: + pfile = open(pidfile, "w") + pfile.write("%s\n" % pid) + except IOError: + pass + finally: + try: + pfile.close() + except: + pass + + def main(argv=None): # Allow Ctrl-C - signal.signal(signal.SIGINT, signal.SIG_DFL) + stop_handler = bind_signal_handlers() + + server_process_main(stop_handler) + +def server_process_main(stop_handler, scmStatus=None): + if scmStatus is not None: + scmStatus.reportStartPending() + + save_pid(os.getpid(), PID_OUT_FILE) config = Configuration() - controller = Controller(config) - + controller = Controller(config, stop_handler) + _init_logging(config) - + logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv)) controller.start() - controller.start_emitter() + + print "Server out at: " + SERVER_OUT_FILE + print "Server log at: " + SERVER_LOG_FILE + + if scmStatus is not None: + scmStatus.reportStarted() + + #The controller thread finishes when the stop event is signaled + controller.join() + + remove_file(PID_OUT_FILE) + pass def _init_logging(config): _levels = { diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py index 05362bfd55..56e6475df9 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py @@ -17,48 +17,63 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ''' + +import json +import urllib2 + import logging from unittest import TestCase - -from application_metric_map import ApplicationMetricMap -from config_reader import Configuration -from emitter import Emitter +from only_for_platform import get_platform, PLATFORM_WINDOWS from mock.mock import patch, MagicMock -import json -import urllib2 +if get_platform() != PLATFORM_WINDOWS: + os_distro_value = ('Suse','11','Final') +else: + os_distro_value = ('win2012serverr2','6.3','WindowsServer') + +with patch("platform.linux_distribution", return_value = os_distro_value): + from ambari_commons import OSCheck + from application_metric_map import ApplicationMetricMap + from config_reader import Configuration + from emitter import Emitter + from stop_handler import bind_signal_handlers logger = logging.getLogger() class TestEmitter(TestCase): - + + @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch("urllib2.urlopen") def testJavaHomeAvailableCheck(self, url_open_mock): url_open_mock.return_value = MagicMock() url_open_mock.return_value.getcode.return_value = 200 self.assertEqual(urllib2.urlopen(None, None).getcode(), 200) url_open_mock.reset_mock() - + + stop_handler = bind_signal_handlers() + config = Configuration() application_metric_map = ApplicationMetricMap("host","10.10.10.10") application_metric_map.clear() application_metric_map.put_metric("APP1", {"metric1":1}, 1) - emitter = Emitter(config, application_metric_map) + emitter = Emitter(config, application_metric_map, stop_handler) emitter.submit_metrics() self.assertEqual(url_open_mock.call_count, 1) self.assertUrlData(url_open_mock) - - + + + @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch("urllib2.urlopen") def testRetryFetch(self, url_open_mock): - + stop_handler = bind_signal_handlers() + config = Configuration() application_metric_map = ApplicationMetricMap("host","10.10.10.10") application_metric_map.clear() application_metric_map.put_metric("APP1", {"metric1":1}, 1) - emitter = Emitter(config, application_metric_map) + emitter = Emitter(config, application_metric_map, stop_handler) emitter.RETRY_SLEEP_INTERVAL = .001 emitter.submit_metrics() |