summaryrefslogtreecommitdiff
path: root/ambari-metrics/ambari-metrics-host-monitoring
diff options
context:
space:
mode:
authorFlorian Barca <fbarca@hortonworks.com>2014-12-30 10:35:45 -0800
committerFlorian Barca <fbarca@hortonworks.com>2014-12-30 10:35:45 -0800
commit6c21b0942f40791ab4a461048d55e975593eab85 (patch)
tree6f3f996828f45a8473fa93e70058b42dd48bb653 /ambari-metrics/ambari-metrics-host-monitoring
parent9884cbdd51f6d465044d4573a265f124424ec821 (diff)
Windows build for 2 Ambari Metrics service: Host Monitoring and Timeline Service (Collector).
+ Added Windows profiles to the Maven project files + Added the necessary Windows assemblies + Created Windows service skeletons + Host Monitoring: added OS-independent process termination handler + Collector: added debugging support for the Java process + Fixed services shutdown, especially when joining spawned threads + Fixed unit tests + Added support for unit testing on MacOS and Windows Windows-specific: + Moved the assembly descriptors to ambari-metrics-assembly + Fixed comments in the configuration files + Added soft dependencies on the embedded HBase service + Added support for the embedded HBase service setup
Diffstat (limited to 'ambari-metrics/ambari-metrics-host-monitoring')
-rw-r--r--ambari-metrics/ambari-metrics-host-monitoring/conf/windows/ambari-metrics-monitor.cmd17
-rw-r--r--ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_groups.conf19
-rw-r--r--ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_monitor.ini30
-rw-r--r--ambari-metrics/ambari-metrics-host-monitoring/pom.xml146
-rw-r--r--ambari-metrics/ambari-metrics-host-monitoring/src/main/python/amhm_service.py231
-rw-r--r--ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/__init__.py3
-rw-r--r--ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py66
-rw-r--r--ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py27
-rw-r--r--ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py17
-rw-r--r--ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py138
-rw-r--r--ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py58
-rw-r--r--ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py41
12 files changed, 714 insertions, 79 deletions
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/ambari-metrics-monitor.cmd b/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/ambari-metrics-monitor.cmd
new file mode 100644
index 0000000000..115b6a6f7e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/ambari-metrics-monitor.cmd
@@ -0,0 +1,17 @@
+@rem Licensed to the Apache Software Foundation (ASF) under one or more
+@rem contributor license agreements. See the NOTICE file distributed with
+@rem this work for additional information regarding copyright ownership.
+@rem The ASF licenses this file to You under the Apache License, Version 2.0
+@rem (the "License"); you may not use this file except in compliance with
+@rem the License. You may obtain a copy of the License at
+@rem
+@rem http://www.apache.org/licenses/LICENSE-2.0
+@rem
+@rem Unless required by applicable law or agreed to in writing, software
+@rem distributed under the License is distributed on an "AS IS" BASIS,
+@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@rem See the License for the specific language governing permissions and
+@rem limitations under the License.
+
+@echo off
+python.exe -u %~dp0\sbin\amhm_service.py %* 2>&1
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_groups.conf b/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_groups.conf
new file mode 100644
index 0000000000..6d5a62ff59
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_groups.conf
@@ -0,0 +1,19 @@
+{
+ "host_metric_groups": {
+ "all": {
+ "collect_every": "10",
+ "metrics": [
+ {
+ "name": "bytes_out",
+ "value_threshold": "128"
+ }
+ ]
+ }
+ },
+ "process_metric_groups": {
+ "": {
+ "collect_every": "15",
+ "metrics": []
+ }
+ }
+}
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_monitor.ini b/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_monitor.ini
new file mode 100644
index 0000000000..bc2b461e0a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_monitor.ini
@@ -0,0 +1,30 @@
+;
+; Licensed to the Apache Software Foundation (ASF) under one
+; or more contributor license agreements. See the NOTICE file
+; distributed with this work for additional information
+; regarding copyright ownership. The ASF licenses this file
+; to you under the Apache License, Version 2.0 (the
+; "License"); you may not use this file except in compliance
+; with the License. You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
+;
+
+[default]
+debug_level = INFO
+metrics_server = {{ams_collector_host_single}}:{{ams_collector_port}}
+enable_time_threshold = false
+enable_value_threshold = false
+
+[emitter]
+send_interval = 60
+
+[collector]
+collector_sleep_interval = 5
+max_queue_size = 5000
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/pom.xml b/ambari-metrics/ambari-metrics-host-monitoring/pom.xml
index c2130572a2..c2f322c95d 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/pom.xml
+++ b/ambari-metrics/ambari-metrics-host-monitoring/pom.xml
@@ -32,6 +32,7 @@
<resmonitor.install.dir>
/usr/lib/python2.6/site-packages/resource_monitoring
</resmonitor.install.dir>
+ <final.name>${project.artifactId}-${project.version}</final.name>
</properties>
<build>
<plugins>
@@ -82,55 +83,12 @@
<version>3.0</version>
</plugin>
<plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <tarLongFileMode>gnu</tarLongFileMode>
- <descriptors>
- <descriptor>${project.basedir}/../../ambari-project/src/main/assemblies/empty.xml</descriptor>
- </descriptors>
- </configuration>
- <executions>
- <execution>
- <id>build-tarball</id>
- <phase>none</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.7</version>
- <executions>
- <execution>
- <id>psutils-compile</id>
- <phase>process-test-classes</phase>
- <goals>
- <goal>run</goal>
- </goals>
- <configuration>
- <target name="psutils-compile">
- <exec dir="${basedir}/src/main/python/psutil" executable="${project.basedir}/../../ambari-common/src/main/unix/ambari-python-wrap" failonerror="true">
- <arg value="setup.py" />
- <arg value="build" />
- <arg value="--build-platlib" />
- <arg value="${basedir}/target/psutil_build" />
- </exec>
- </target>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<executions>
<execution>
<configuration>
- <executable>${project.basedir}/../../ambari-common/src/main/unix/ambari-python-wrap</executable>
+ <executable>${executable.python}</executable>
<workingDirectory>src/test/python</workingDirectory>
<arguments>
<argument>unitTests.py</argument>
@@ -153,6 +111,8 @@
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
+ <exclude>conf/unix/metric_groups.conf</exclude>
+ <exclude>conf/windows/metric_groups.conf</exclude>
<exclude>src/main/python/psutil/**</exclude>
<exclude>.pydevproject</exclude>
</excludes>
@@ -168,4 +128,102 @@
</plugin>
</plugins>
</build>
+ <profiles>
+ <profile>
+ <id>windows</id>
+ <activation>
+ <os>
+ <family>win</family>
+ </os>
+ </activation>
+ <properties>
+ <envClassifier>win</envClassifier>
+ <dirsep>\</dirsep>
+ <pathsep>;</pathsep>
+ <executable.python>python</executable.python>
+ <executable.shell>cmd</executable.shell>
+ <fileextension.shell>cmd</fileextension.shell>
+ <fileextension.dot.shell-default>.cmd</fileextension.dot.shell-default>
+ <assemblydescriptor>src/main/assemblies/amhm-windows.xml</assemblydescriptor>
+ <packagingFormat>jar</packagingFormat>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <id>psutils-compile</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target name="psutils-compile">
+ <exec dir="${basedir}/src/main/python/psutil" executable="python" failonerror="true">
+ <arg value="setup.py" />
+ <arg value="bdist_egg" />
+ <arg value="--bdist-dir" />
+ <arg value="${basedir}/target/psutil_build_temp" />
+ <arg value="--dist-dir" />
+ <arg value="${basedir}/target/psutil_build" />
+ </exec>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>linux</id>
+ <activation>
+ <os>
+ <family>unix</family>
+ </os>
+ </activation>
+ <properties>
+ <envClassifier>linux</envClassifier>
+ <dirsep>/</dirsep>
+ <pathsep>:</pathsep>
+ <executable.python>${project.basedir}/../../ambari-common/src/main/unix/ambari-python-wrap</executable.python>
+ <executable.shell>sh</executable.shell>
+ <fileextension.shell>sh</fileextension.shell>
+ <fileextension.dot.shell-default></fileextension.dot.shell-default>
+ <assemblydescriptor>src/main/assemblies/empty.xml</assemblydescriptor>
+ <packagingFormat>jar</packagingFormat>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <id>psutils-compile</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target name="psutils-compile">
+ <exec dir="${basedir}/src/main/python/psutil" executable="python" failonerror="true">
+ <arg value="setup.py" />
+ <arg value="build" />
+ <arg value="--build-platlib" />
+ <arg value="${basedir}/target/psutil_build" />
+ </exec>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/amhm_service.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/amhm_service.py
new file mode 100644
index 0000000000..0f8daab1be
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/amhm_service.py
@@ -0,0 +1,231 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+import glob
+
+import optparse
+import os
+import sys
+
+from ambari_commons.ambari_service import AmbariService
+from ambari_commons.exceptions import FatalException, NonFatalException
+from ambari_commons.logging_utils import print_warning_msg, print_error_msg, print_info_msg
+from ambari_commons.os_utils import search_file, run_os_command
+from ambari_commons.os_windows import SvcStatusCallback
+from core.config_reader import SERVER_OUT_FILE, SERVICE_USERNAME_KEY, SERVICE_PASSWORD_KEY, \
+ SETUP_ACTION, START_ACTION, STOP_ACTION, RESTART_ACTION, STATUS_ACTION
+from core.stop_handler import bind_signal_handlers, StopHandler
+from main import server_process_main
+
+
+#
+# Windows-specific service implementation. This class will be instantiated directly by pythonservice.exe.
+#
+class AMHostMonitoringService(AmbariService):
+ AmbariService._svc_name_ = "AmbariMetricsHostMonitoring"
+ AmbariService._svc_display_name_ = "Ambari Metrics Host Monitoring"
+ AmbariService._svc_description_ = "Ambari Metrics Host Monitoring Service"
+
+ AmbariService._AdjustServiceVersion()
+
+ # Adds the necessary script dir to the Python's modules path.
+ # Modify this as the deployed product's dir structure changes.
+ def _adjustPythonPath(self, current_dir):
+ python_path = os.path.join(current_dir, "sbin")
+ sys.path.insert(0, python_path)
+ pass
+
+ def SvcDoRun(self):
+ scmStatus = SvcStatusCallback(self)
+
+ self.redirect_output_streams()
+
+ stopHandler = StopHandler(AMHostMonitoringService._heventSvcStop)
+ bind_signal_handlers(stopHandler)
+
+ AMHostMonitoringService.set_ctrl_c_handler(ctrlHandler)
+
+ server_process_main(stopHandler, scmStatus)
+ pass
+
+ def _InitOptionsParser(self):
+ return init_options_parser()
+
+ def redirect_output_streams(self):
+ self._RedirectOutputStreamsToFile(SERVER_OUT_FILE)
+ pass
+
+
+def ctrlHandler(ctrlType):
+ AMHostMonitoringService.DefCtrlCHandler()
+ return True
+
+
+def svcsetup():
+ AMHostMonitoringService.set_ctrl_c_handler(ctrlHandler)
+ # we don't save password between 'setup' runs, so we can't run Install every time. We run 'setup' only if user and
+ # password provided or if service not installed
+ if (SERVICE_USERNAME_KEY in os.environ and SERVICE_PASSWORD_KEY in os.environ):
+ AMHostMonitoringService.Install(username=os.environ[SERVICE_USERNAME_KEY], password=os.environ[SERVICE_PASSWORD_KEY])
+ elif AMHostMonitoringService.QueryStatus() == "not installed":
+ AMHostMonitoringService.Install()
+ pass
+
+
+#
+# Starts the Ambari Metrics Collector. The server can start as a service or standalone process.
+# args:
+# options.is_process = True - start the server as a process. For now, there is no restrictions for the number of
+# server instances that can run like this.
+# options.is_process = False - start the server in normal mode, as a Windows service. If the Ambari Metrics Collector
+# is not registered as a service, the function fails. By default, only one instance of the service can
+# possibly run.
+#
+def start(options):
+ AMHostMonitoringService.set_ctrl_c_handler(ctrlHandler)
+
+ if options.is_process:
+ #Run as a normal process. Invoke the ServiceMain directly.
+ stopHandler = StopHandler(AMHostMonitoringService._heventSvcStop)
+ bind_signal_handlers(stopHandler)
+ server_process_main(stopHandler)
+ else:
+ AMHostMonitoringService.Start()
+
+#
+# Stops the Ambari Metrics Collector. Ineffective when the server is started as a standalone process.
+#
+def stop():
+ AMHostMonitoringService.Stop()
+
+#
+# Prints the Ambari Metrics Collector service status.
+#
+def svcstatus(options):
+ options.exit_message = None
+
+ statusStr = AMHostMonitoringService.QueryStatus()
+ print "Ambari Metrics Collector is " + statusStr
+
+
+def init_options_parser():
+ parser = optparse.OptionParser(usage="usage: %prog action [options]", )
+ parser.add_option('-d', '--debug', action="store_true", dest='debug', default=False,
+ help="Start Ambari Metrics Host Monitoring in debug mode")
+ parser.add_option('-p', '--process', action="store_true", dest='is_process', default=False,
+ help="Start Ambari Metrics Host Monitoring as a normal process, not as a service")
+
+ # --help reserved for help
+ return parser
+
+def find_python_exe_path():
+ paths = "." + os.pathsep + os.environ["PATH"]
+
+ # Find python.exe by attempting to load it as a resource dll
+ python_path = search_file("python.exe", paths)
+ return os.path.dirname(python_path)
+
+
+def find_psutil_egg():
+ abs_subdir = os.path.join(os.getcwd(), "sbin", "psutil", "build")
+ egg_files = glob.glob(os.path.join(abs_subdir, "psutil*-win-amd64.egg"))
+ if egg_files is None or len(egg_files) == 0:
+ err = "Unable to find the expected psutil egg file in {0}. " \
+ "Verify that the installation carried out correctly.".format(abs_subdir)
+ raise FatalException(1, err)
+ if len(egg_files) > 1:
+ err = "Multiple psutil egg files found in {0}".format(abs_subdir)
+ print_warning_msg(err)
+ #Return the latest
+ return egg_files[len(egg_files) - 1]
+
+
+def setup_psutil():
+ python_exe_path = find_python_exe_path()
+ egg_file = find_psutil_egg()
+ cmd = [os.path.join(python_exe_path, "Scripts", "easy_install"), "--upgrade", egg_file]
+
+ retval, std_out, std_err = run_os_command(cmd)
+ if std_err is not None and std_err != '':
+ print_warning_msg(std_err)
+ print_info_msg(std_out)
+ if 0 != retval:
+ err = "Psutil egg installation failed. Exit code={0}, err output={1}".format(retval, std_err)
+ raise FatalException(1, err)
+
+ # Now the egg should be installed in the site_packages dir
+
+def win_main():
+ parser = init_options_parser()
+ (options, args) = parser.parse_args()
+
+ options.warnings = []
+ options.exit_message = None
+
+ if options.debug:
+ sys.frozen = 'windows_exe' # Fake py2exe so we can debug
+
+ if len(args) == 0:
+ print parser.print_help()
+ parser.error("No action entered")
+
+ action = args[0]
+
+ try:
+ if action == SETUP_ACTION:
+ setup_psutil()
+ svcsetup()
+ elif action == START_ACTION:
+ start(options)
+ elif action == STOP_ACTION:
+ stop()
+ elif action == RESTART_ACTION:
+ stop()
+ start(options)
+ elif action == STATUS_ACTION:
+ svcstatus(options)
+ else:
+ parser.error("Invalid action")
+
+ if options.warnings:
+ for warning in options.warnings:
+ print_warning_msg(warning)
+ pass
+ options.exit_message = "Ambari Metrics Host Monitoring '%s' completed with warnings." % action
+ pass
+ except FatalException as e:
+ if e.reason is not None:
+ print_error_msg("Exiting with exit code {0}. \nREASON: {1}".format(e.code, e.reason))
+ sys.exit(e.code)
+ except NonFatalException as e:
+ options.exit_message = "Ambari Metrics Host Monitoring '%s' completed with warnings." % action
+ if e.reason is not None:
+ print_warning_msg(e.reason)
+
+ if options.exit_message is not None:
+ print options.exit_message
+
+ sys.exit(0)
+
+if __name__ == "__main__":
+ try:
+ win_main()
+ except (KeyboardInterrupt, EOFError):
+ print("\nAborting ... Keyboard Interrupt.")
+ sys.exit(1)
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/__init__.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/__init__.py
index 996120fd40..15ad117caa 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/__init__.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/__init__.py
@@ -19,8 +19,9 @@ limitations under the License.
"""
import os, sys
+
path = os.path.abspath(__file__)
-path = os.path.join(os.path.dirname(os.path.dirname(path)), "psutil/build/")
+path = os.path.normpath(os.path.join(os.path.dirname(path), "psutil", "build"))
for dir in os.walk(path).next()[1]:
if 'lib' in dir:
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
index daabf37263..463c98cf53 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
@@ -22,10 +22,67 @@ import ConfigParser
import StringIO
import json
import os
+from ambari_commons import OSConst
+from ambari_commons.os_family_impl import OsFamilyImpl
+
+
+#
+# Abstraction for OS-dependent configuration defaults
+#
+class ConfigDefaults(object):
+ def get_config_file_path(self):
+ pass
+ def get_metric_file_path(self):
+ pass
+
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class ConfigDefaultsWindows(ConfigDefaults):
+ def __init__(self):
+ self._CONFIG_FILE_PATH = "conf\\metric_monitor.ini"
+ self._METRIC_FILE_PATH = "conf\\metric_groups.conf"
+ pass
+
+ def get_config_file_path(self):
+ return self._CONFIG_FILE_PATH
+ def get_metric_file_path(self):
+ return self._METRIC_FILE_PATH
+
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class ConfigDefaultsLinux(ConfigDefaults):
+ def __init__(self):
+ self._CONFIG_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_monitor.ini"
+ self._METRIC_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_groups.conf"
+ pass
+
+ def get_config_file_path(self):
+ return self._CONFIG_FILE_PATH
+ def get_metric_file_path(self):
+ return self._METRIC_FILE_PATH
+
+configDefaults = ConfigDefaults()
+
config = ConfigParser.RawConfigParser()
-CONFIG_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_monitor.ini"
-METRIC_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_groups.conf"
+
+CONFIG_FILE_PATH = configDefaults.get_config_file_path()
+METRIC_FILE_PATH = configDefaults.get_metric_file_path()
+
+OUT_DIR = os.path.join(os.sep, "var", "log", "ambari-metrics-host-monitoring")
+SERVER_OUT_FILE = OUT_DIR + os.sep + "ambari-metrics-host-monitoring.out"
+SERVER_LOG_FILE = OUT_DIR + os.sep + "ambari-metrics-host-monitoring.log"
+
+PID_DIR = os.path.join(os.sep, "var", "run", "ambari-metrics-host-monitoring")
+PID_OUT_FILE = PID_DIR + os.sep + "ambari-metrics-host-monitoring.pid"
+EXITCODE_OUT_FILE = PID_DIR + os.sep + "ambari-metrics-host-monitoring.exitcode"
+
+SERVICE_USERNAME_KEY = "TMP_AMHM_USERNAME"
+SERVICE_PASSWORD_KEY = "TMP_AMHM_PASSWORD"
+
+SETUP_ACTION = "setup"
+START_ACTION = "start"
+STOP_ACTION = "stop"
+RESTART_ACTION = "restart"
+STATUS_ACTION = "status"
config_content = """
[default]
@@ -96,6 +153,11 @@ class Configuration:
self.metric_groups = json.load(open(METRIC_FILE_PATH))
else:
print 'No metric configs found at {0}'.format(METRIC_FILE_PATH)
+ self.metric_groups = \
+ {
+ 'host_metric_groups': [],
+ 'process_metric_groups': []
+ }
pass
def getConfig(self):
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
index 51f0980b82..e0ef804f47 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py
@@ -33,7 +33,7 @@ logger = logging.getLogger()
class Controller(threading.Thread):
- def __init__(self, config):
+ def __init__(self, config, stop_handler):
# Process initialization code
threading.Thread.__init__(self)
logger.debug('Initializing Controller thread.')
@@ -48,27 +48,42 @@ class Controller(threading.Thread):
self.metric_collector = MetricsCollector(self.event_queue, self.application_metric_map)
self.server_url = config.get_server_address()
self.sleep_interval = config.get_collector_sleep_interval()
+ self._stop_handler = stop_handler
self.initialize_events_cache()
- self.emitter = Emitter(self.config, self.application_metric_map)
+ self.emitter = Emitter(self.config, self.application_metric_map, stop_handler)
+ self._t = None
def run(self):
logger.info('Running Controller thread: %s' % threading.currentThread().getName())
- # Wake every 5 seconds to push events to the queue
+
+ self.start_emitter()
+
+ # Wake every 5 seconds to push events to the queue
while True:
if (self.event_queue.full()):
logger.warn('Event Queue full!! Suspending further collections.')
else:
self.enqueque_events()
pass
- time.sleep(self.sleep_interval)
+ #Wait for the service stop event instead of sleeping blindly
+ if 0 == self._stop_handler.wait(self.sleep_interval):
+ logger.info('Shutting down Controller thread')
+ break
+
+ if not self._t is None:
+ self._t.cancel()
+ self._t.join(5)
+
+ #The emitter thread should have stopped by now, just ensure it has shut down properly
+ self.emitter.join(5)
pass
# TODO: Optimize to not use Timer class and use the Queue instead
def enqueque_events(self):
# Queue events for up to a minute
for event in self.events_cache:
- t = Timer(event.get_collect_interval(), self.metric_collector.process_event, args=(event,))
- t.start()
+ self._t = Timer(event.get_collect_interval(), self.metric_collector.process_event, args=(event,))
+ self._t.start()
pass
def initialize_events_cache(self):
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
index be83250b9a..c3fd543e7f 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
@@ -32,12 +32,13 @@ class Emitter(threading.Thread):
"""
Wake up every send interval seconds and empty the application metric map.
"""
- def __init__(self, config, application_metric_map):
+ def __init__(self, config, application_metric_map, stop_handler):
threading.Thread.__init__(self)
logger.debug('Initializing Emitter thread.')
self.lock = threading.Lock()
self.collector_address = config.get_server_address()
self.send_interval = config.get_send_interval()
+ self._stop_handler = stop_handler
self.application_metric_map = application_metric_map
def run(self):
@@ -45,10 +46,16 @@ class Emitter(threading.Thread):
while True:
try:
self.submit_metrics()
- time.sleep(self.send_interval)
+ #Wait for the service stop event instead of sleeping blindly
+ if 0 == self._stop_handler.wait(self.send_interval):
+ logger.info('Shutting down Emitter thread')
+ return
except Exception, e:
logger.warn('Unable to emit events. %s' % str(e))
- time.sleep(self.RETRY_SLEEP_INTERVAL)
+ #Wait for the service stop event instead of sleeping blindly
+ if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL):
+ logger.info('Shutting down Emitter thread - abort retry')
+ return
logger.info('Retrying emit after %s seconds.' % self.RETRY_SLEEP_INTERVAL)
pass
@@ -69,7 +76,9 @@ class Emitter(threading.Thread):
logger.warn("Error sending metrics to server. Retrying after {0} "
"...".format(self.RETRY_SLEEP_INTERVAL))
retry_count += 1
- time.sleep(self.RETRY_SLEEP_INTERVAL)
+ #Wait for the service stop event instead of sleeping blindly
+ if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL):
+ return
pass
pass
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
new file mode 100644
index 0000000000..bfb6957fa5
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
@@ -0,0 +1,138 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import os
+import signal
+import threading
+import traceback
+
+from ambari_commons import OSConst, OSCheck
+from ambari_commons.exceptions import FatalException
+from ambari_commons.os_family_impl import OsFamilyImpl
+
+
+logger = logging.getLogger()
+
+_handler = None
+
+
+class StopHandler(object):
+ def set_stop(self):
+ pass
+
+ def wait(self, timeout=None):
+ return -1
+
+
+#
+# Windows implementation
+#
+@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
+class StopHandlerWindows(StopHandler):
+ def __init__(self, stopEvent=None):
+ import win32event
+ # Event used to gracefully stop the process
+ if stopEvent is None:
+ # Allow standalone testing
+ self._heventStop = win32event.CreateEvent(None, 0, 0, None)
+ else:
+ # Allow one unique event per process
+ self._heventStop = stopEvent
+
+ def set_stop(self):
+ import win32event
+ win32event.SetEvent(self._heventStop)
+
+ def wait(self, timeout=None):
+ '''
+ :param timeout: Time to wait, in seconds.
+ :return: 0 == stop event signaled, -1 = timeout
+ '''
+ import win32event
+
+ if timeout is None:
+ timeout = win32event.INFINITE
+ else:
+ timeout = timeout * 1000
+
+ result = win32event.WaitForSingleObject(self._heventStop, timeout)
+ if(win32event.WAIT_OBJECT_0 != result and win32event.WAIT_TIMEOUT != result):
+ raise FatalException(-1, "Error waiting for stop event: " + str(result))
+ if (win32event.WAIT_TIMEOUT == result):
+ return -1
+ logger.info("Stop event received")
+ return result # 0 -> stop
+
+
+#
+# Linux implementation
+#
+def signal_handler(signum, frame):
+ global _handler
+ _handler.set_stop()
+
+def debug(sig, frame):
+ """Interrupt running process, and provide a python prompt for
+ interactive debugging."""
+ d = {'_frame': frame} # Allow access to frame object.
+ d.update(frame.f_globals) # Unless shadowed by global
+ d.update(frame.f_locals)
+
+ message = "Signal received : entering python shell.\nTraceback:\n"
+ message += ''.join(traceback.format_stack(frame))
+ logger.info(message)
+
+
+@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
+class StopHandlerLinux(StopHandler):
+ def __init__(self, stopEvent=None):
+ # Event used to gracefully stop the process
+ if stopEvent is None:
+ # Allow standalone testing
+ self.stop_event = threading.Event()
+ else:
+ # Allow one unique event per process
+ self.stop_event = stopEvent
+
+ def set_stop(self):
+ self.stop_event.set()
+
+ def wait(self, timeout=None):
+ # Stop process when stop event received
+ if self.stop_event.wait(timeout):
+ logger.info("Stop event received")
+ return 0
+ # Timeout
+ return -1
+
+
+def bind_signal_handlers(new_handler=None):
+ if OSCheck.get_os_family() != OSConst.WINSRV_FAMILY:
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+ signal.signal(signal.SIGUSR1, debug)
+
+ if new_handler is None:
+ global _handler
+ _handler = StopHandler()
+ else:
+ _handler = new_handler
+ return _handler
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
index 09ae7e4750..37e77e5a9f 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/main.py
@@ -18,27 +18,67 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
-import core
-from core.controller import Controller
-from core.config_reader import Configuration
import logging
-import signal
+import os
import sys
+from ambari_commons.os_utils import remove_file
+
+from core.controller import Controller
+from core.config_reader import Configuration, PID_OUT_FILE, SERVER_LOG_FILE, SERVER_OUT_FILE
+from core.stop_handler import bind_signal_handlers
+
+
logger = logging.getLogger()
+
+def save_pid(pid, pidfile):
+ """
+ Save pid to pidfile.
+ """
+ try:
+ pfile = open(pidfile, "w")
+ pfile.write("%s\n" % pid)
+ except IOError:
+ pass
+ finally:
+ try:
+ pfile.close()
+ except:
+ pass
+
+
def main(argv=None):
# Allow Ctrl-C
- signal.signal(signal.SIGINT, signal.SIG_DFL)
+ stop_handler = bind_signal_handlers()
+
+ server_process_main(stop_handler)
+
+def server_process_main(stop_handler, scmStatus=None):
+ if scmStatus is not None:
+ scmStatus.reportStartPending()
+
+ save_pid(os.getpid(), PID_OUT_FILE)
config = Configuration()
- controller = Controller(config)
-
+ controller = Controller(config, stop_handler)
+
_init_logging(config)
-
+
logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
controller.start()
- controller.start_emitter()
+
+ print "Server out at: " + SERVER_OUT_FILE
+ print "Server log at: " + SERVER_LOG_FILE
+
+ if scmStatus is not None:
+ scmStatus.reportStarted()
+
+ #The controller thread finishes when the stop event is signaled
+ controller.join()
+
+ remove_file(PID_OUT_FILE)
+ pass
def _init_logging(config):
_levels = {
diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
index 05362bfd55..56e6475df9 100644
--- a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
+++ b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
@@ -17,48 +17,63 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
+
+import json
+import urllib2
+
import logging
from unittest import TestCase
-
-from application_metric_map import ApplicationMetricMap
-from config_reader import Configuration
-from emitter import Emitter
+from only_for_platform import get_platform, PLATFORM_WINDOWS
from mock.mock import patch, MagicMock
-import json
-import urllib2
+if get_platform() != PLATFORM_WINDOWS:
+ os_distro_value = ('Suse','11','Final')
+else:
+ os_distro_value = ('win2012serverr2','6.3','WindowsServer')
+
+with patch("platform.linux_distribution", return_value = os_distro_value):
+ from ambari_commons import OSCheck
+ from application_metric_map import ApplicationMetricMap
+ from config_reader import Configuration
+ from emitter import Emitter
+ from stop_handler import bind_signal_handlers
logger = logging.getLogger()
class TestEmitter(TestCase):
-
+
+ @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
@patch("urllib2.urlopen")
def testJavaHomeAvailableCheck(self, url_open_mock):
url_open_mock.return_value = MagicMock()
url_open_mock.return_value.getcode.return_value = 200
self.assertEqual(urllib2.urlopen(None, None).getcode(), 200)
url_open_mock.reset_mock()
-
+
+ stop_handler = bind_signal_handlers()
+
config = Configuration()
application_metric_map = ApplicationMetricMap("host","10.10.10.10")
application_metric_map.clear()
application_metric_map.put_metric("APP1", {"metric1":1}, 1)
- emitter = Emitter(config, application_metric_map)
+ emitter = Emitter(config, application_metric_map, stop_handler)
emitter.submit_metrics()
self.assertEqual(url_open_mock.call_count, 1)
self.assertUrlData(url_open_mock)
-
-
+
+
+ @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
@patch("urllib2.urlopen")
def testRetryFetch(self, url_open_mock):
-
+ stop_handler = bind_signal_handlers()
+
config = Configuration()
application_metric_map = ApplicationMetricMap("host","10.10.10.10")
application_metric_map.clear()
application_metric_map.put_metric("APP1", {"metric1":1}, 1)
- emitter = Emitter(config, application_metric_map)
+ emitter = Emitter(config, application_metric_map, stop_handler)
emitter.RETRY_SLEEP_INTERVAL = .001
emitter.submit_metrics()