diff options
author | Andrew Onishuk <aonishuk@hortonworks.com> | 2016-10-18 18:35:17 +0300 |
---|---|---|
committer | Andrew Onishuk <aonishuk@hortonworks.com> | 2016-10-18 18:35:17 +0300 |
commit | 9332b381b451548375b0eae407b6eb8bcb6c84ce (patch) | |
tree | 8b83a3ff45b3d18a2aa686566c32ee73da1c5339 | |
parent | 33c347b07ca40768209eb06068179df300ae6d04 (diff) |
AMBARI-18629. HDFS goes down after installing cluster (aonishuk)
-rw-r--r-- | ambari-agent/src/main/python/ambari_agent/ActionQueue.py | 26 | ||||
-rw-r--r-- | ambari-common/src/main/python/ambari_commons/thread_utils.py | 43 |
2 files changed, 53 insertions, 16 deletions
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 198ae034c2..dedef76543 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -36,6 +36,7 @@ from CommandStatusDict import CommandStatusDict from CustomServiceOrchestrator import CustomServiceOrchestrator from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle from ambari_commons.str_utils import split_on_chunks +from ambari_commons.thread_utils import terminate_thread logger = logging.getLogger() @@ -82,7 +83,6 @@ class ActionQueue(threading.Thread): self.controller = controller self.configTags = {} self._stop = threading.Event() - self.hangingStatusCommands = {} self.tmpdir = config.get('agent', 'prefix') self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller) self.parallel_execution = config.get_parallel_exec_option() @@ -229,22 +229,16 @@ class ActionQueue(threading.Thread): elif commandType == self.STATUS_COMMAND: component_name = command['componentName'] - if component_name in self.hangingStatusCommands and not self.hangingStatusCommands[component_name].isAlive(): - del self.hangingStatusCommands[component_name] + thread = threading.Thread(target = self.execute_status_command, args = (command,)) + thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping + thread.start() + thread.join(timeout=self.status_command_timeout) - if not component_name in self.hangingStatusCommands: - thread = threading.Thread(target = self.execute_status_command, args = (command,)) - thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping - thread.start() - thread.join(timeout=self.status_command_timeout) - - if thread.isAlive(): - # Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent. - PythonReflectiveExecutor.last_context.revert() - logger.warn("Command {0} for {1} is running for more than {2} seconds. Skipping it for current pack of status commands.".format(commandType, component_name, self.status_command_timeout)) - self.hangingStatusCommands[component_name] = thread - else: - logger.info("Not running {0} for {1}, because previous one is still running.".format(commandType, component_name)) + if thread.isAlive(): + terminate_thread(thread) + # Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent. + PythonReflectiveExecutor.last_context.revert() + logger.warn("Command {0} for {1} was running for more than {2} seconds. Terminated due to timeout.".format(commandType, component_name, self.status_command_timeout)) else: logger.error("Unrecognized command " + pprint.pformat(command)) except Exception: diff --git a/ambari-common/src/main/python/ambari_commons/thread_utils.py b/ambari-common/src/main/python/ambari_commons/thread_utils.py new file mode 100644 index 0000000000..952022ca30 --- /dev/null +++ b/ambari-common/src/main/python/ambari_commons/thread_utils.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +def terminate_thread(thread): + """Terminates a python thread abruptly from another thread. + + This is consider a bad pattern to do this. + If possible, please consider handling stopping of the thread from inside of it + or creating thread as a separate process (multiprocessing module). + + :param thread: a threading.Thread instance + """ + import ctypes + if not thread.isAlive(): + return + + exc = ctypes.py_object(SystemExit) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(thread.ident), exc) + if res == 0: + raise ValueError("nonexistent thread id") + elif res > 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None) + raise SystemError("PyThreadState_SetAsyncExc failed")
\ No newline at end of file |