summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Onishuk <aonishuk@hortonworks.com>2016-10-18 18:35:17 +0300
committerAndrew Onishuk <aonishuk@hortonworks.com>2016-10-18 18:35:17 +0300
commit9332b381b451548375b0eae407b6eb8bcb6c84ce (patch)
tree8b83a3ff45b3d18a2aa686566c32ee73da1c5339
parent33c347b07ca40768209eb06068179df300ae6d04 (diff)
AMBARI-18629. HDFS goes down after installing cluster (aonishuk)
-rw-r--r--ambari-agent/src/main/python/ambari_agent/ActionQueue.py26
-rw-r--r--ambari-common/src/main/python/ambari_commons/thread_utils.py43
2 files changed, 53 insertions, 16 deletions
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 198ae034c2..dedef76543 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -36,6 +36,7 @@ from CommandStatusDict import CommandStatusDict
from CustomServiceOrchestrator import CustomServiceOrchestrator
from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
from ambari_commons.str_utils import split_on_chunks
+from ambari_commons.thread_utils import terminate_thread
logger = logging.getLogger()
@@ -82,7 +83,6 @@ class ActionQueue(threading.Thread):
self.controller = controller
self.configTags = {}
self._stop = threading.Event()
- self.hangingStatusCommands = {}
self.tmpdir = config.get('agent', 'prefix')
self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
self.parallel_execution = config.get_parallel_exec_option()
@@ -229,22 +229,16 @@ class ActionQueue(threading.Thread):
elif commandType == self.STATUS_COMMAND:
component_name = command['componentName']
- if component_name in self.hangingStatusCommands and not self.hangingStatusCommands[component_name].isAlive():
- del self.hangingStatusCommands[component_name]
+ thread = threading.Thread(target = self.execute_status_command, args = (command,))
+ thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping
+ thread.start()
+ thread.join(timeout=self.status_command_timeout)
- if not component_name in self.hangingStatusCommands:
- thread = threading.Thread(target = self.execute_status_command, args = (command,))
- thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping
- thread.start()
- thread.join(timeout=self.status_command_timeout)
-
- if thread.isAlive():
- # Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent.
- PythonReflectiveExecutor.last_context.revert()
- logger.warn("Command {0} for {1} is running for more than {2} seconds. Skipping it for current pack of status commands.".format(commandType, component_name, self.status_command_timeout))
- self.hangingStatusCommands[component_name] = thread
- else:
- logger.info("Not running {0} for {1}, because previous one is still running.".format(commandType, component_name))
+ if thread.isAlive():
+ terminate_thread(thread)
+ # Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent.
+ PythonReflectiveExecutor.last_context.revert()
+ logger.warn("Command {0} for {1} was running for more than {2} seconds. Terminated due to timeout.".format(commandType, component_name, self.status_command_timeout))
else:
logger.error("Unrecognized command " + pprint.pformat(command))
except Exception:
diff --git a/ambari-common/src/main/python/ambari_commons/thread_utils.py b/ambari-common/src/main/python/ambari_commons/thread_utils.py
new file mode 100644
index 0000000000..952022ca30
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_commons/thread_utils.py
@@ -0,0 +1,43 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+def terminate_thread(thread):
+ """Terminates a python thread abruptly from another thread.
+
+ This is consider a bad pattern to do this.
+ If possible, please consider handling stopping of the thread from inside of it
+ or creating thread as a separate process (multiprocessing module).
+
+ :param thread: a threading.Thread instance
+ """
+ import ctypes
+ if not thread.isAlive():
+ return
+
+ exc = ctypes.py_object(SystemExit)
+ res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
+ ctypes.c_long(thread.ident), exc)
+ if res == 0:
+ raise ValueError("nonexistent thread id")
+ elif res > 1:
+ # """if it returns a number greater than one, you're in trouble,
+ # and you should call it again with exc=NULL to revert the effect"""
+ ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
+ raise SystemError("PyThreadState_SetAsyncExc failed") \ No newline at end of file