summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Onishuk <aonishuk@hortonworks.com>2016-10-19 01:46:07 +0300
committerAndrew Onishuk <aonishuk@hortonworks.com>2016-10-19 01:46:07 +0300
commitaf528b5c9786b1b2d9a0dc7b3f609728acf4d87e (patch)
tree372eece6631eafa2048728bc7828e1a7ddaa8ef7
parent9332b381b451548375b0eae407b6eb8bcb6c84ce (diff)
Revert "AMBARI-18629. HDFS goes down after installing cluster (aonishuk) and AMBARI-18505. Ambari Status commands should enforce a timeout < heartbeat interval (aonishuk)"
-rw-r--r--ambari-agent/conf/unix/ambari-agent.ini1
-rw-r--r--ambari-agent/src/main/python/ambari_agent/ActionQueue.py16
-rw-r--r--ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py25
-rw-r--r--ambari-agent/src/test/python/ambari_agent/TestActionQueue.py3
-rw-r--r--ambari-common/src/main/python/ambari_commons/thread_utils.py43
5 files changed, 8 insertions, 80 deletions
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index 1c39c24368..914e09a9c8 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -32,7 +32,6 @@ tolerate_download_failures=true
run_as_user=root
parallel_execution=0
alert_grace_period=5
-status_command_timeout=2
alert_kinit_timeout=14400000
system_resource_overrides=/etc/resource_overrides
; memory_threshold_soft_mb=400
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index dedef76543..064e4f0fec 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -29,14 +29,12 @@ import time
import signal
from AgentException import AgentException
-from PythonReflectiveExecutor import PythonReflectiveExecutor
from LiveStatus import LiveStatus
from ActualConfigHandler import ActualConfigHandler
from CommandStatusDict import CommandStatusDict
from CustomServiceOrchestrator import CustomServiceOrchestrator
from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
from ambari_commons.str_utils import split_on_chunks
-from ambari_commons.thread_utils import terminate_thread
logger = logging.getLogger()
@@ -86,7 +84,6 @@ class ActionQueue(threading.Thread):
self.tmpdir = config.get('agent', 'prefix')
self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
self.parallel_execution = config.get_parallel_exec_option()
- self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 2))
if self.parallel_execution == 1:
logger.info("Parallel execution is enabled, will execute agent commands in parallel")
@@ -227,18 +224,7 @@ class ActionQueue(threading.Thread):
if self.controller.recovery_manager.enabled():
self.controller.recovery_manager.stop_execution_command()
elif commandType == self.STATUS_COMMAND:
- component_name = command['componentName']
-
- thread = threading.Thread(target = self.execute_status_command, args = (command,))
- thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping
- thread.start()
- thread.join(timeout=self.status_command_timeout)
-
- if thread.isAlive():
- terminate_thread(thread)
- # Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent.
- PythonReflectiveExecutor.last_context.revert()
- logger.warn("Command {0} for {1} was running for more than {2} seconds. Terminated due to timeout.".format(commandType, component_name, self.status_command_timeout))
+ self.execute_status_command(command)
else:
logger.error("Unrecognized command " + pprint.pformat(command))
except Exception:
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
index b476671f68..655b2fc80c 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
@@ -53,9 +53,7 @@ class PythonReflectiveExecutor(PythonExecutor):
returncode = 1
try:
- current_context = PythonContext(script_dir, pythonCommand)
- PythonReflectiveExecutor.last_context = current_context
- with current_context:
+ with PythonContext(script_dir, pythonCommand):
imp.load_source('__main__', script)
except SystemExit as e:
returncode = e.code
@@ -64,10 +62,7 @@ class PythonReflectiveExecutor(PythonExecutor):
except (ClientComponentHasNoStatus, ComponentIsNotRunning):
logger.debug("Reflective command failed with exception:", exc_info=1)
except Exception:
- if current_context.is_forced_revert:
- logger.info("Hanging status command finished its execution")
- else:
- logger.info("Reflective command failed with exception:", exc_info=1)
+ logger.info("Reflective command failed with exception:", exc_info=1)
else:
returncode = 0
@@ -81,8 +76,6 @@ class PythonContext:
def __init__(self, script_dir, pythonCommand):
self.script_dir = script_dir
self.pythonCommand = pythonCommand
- self.is_reverted = False
- self.is_forced_revert = False
def __enter__(self):
self.old_sys_path = copy.copy(sys.path)
@@ -95,18 +88,12 @@ class PythonContext:
sys.argv = self.pythonCommand[1:]
def __exit__(self, exc_type, exc_val, exc_tb):
- self.revert(is_forced_revert=False)
+ sys.path = self.old_sys_path
+ sys.argv = self.old_agv
+ logging.disable(self.old_logging_disable)
+ self.revert_sys_modules(self.old_sys_modules)
return False
- def revert(self, is_forced_revert=True):
- if not self.is_reverted:
- self.is_forced_revert = is_forced_revert
- self.is_reverted = True
- sys.path = self.old_sys_path
- sys.argv = self.old_agv
- logging.disable(self.old_logging_disable)
- self.revert_sys_modules(self.old_sys_modules)
-
def revert_sys_modules(self, value):
sys.modules.update(value)
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index b0488299a0..1805c9a2d3 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -212,7 +212,6 @@ class TestActionQueue(TestCase):
retryable_command = {
'commandType': 'EXECUTION_COMMAND',
'role': 'NAMENODE',
- 'componentName': 'NAMENODE',
'roleCommand': 'INSTALL',
'commandId': '1-1',
'taskId': 19,
@@ -310,7 +309,6 @@ class TestActionQueue(TestCase):
}
status_command = {
'commandType' : ActionQueue.STATUS_COMMAND,
- 'componentName': 'NAMENODE'
}
wrong_command = {
'commandType' : "SOME_WRONG_COMMAND",
@@ -1070,6 +1068,7 @@ class TestActionQueue(TestCase):
self.assertTrue(runCommand_mock.called)
self.assertEqual(2, runCommand_mock.call_count)
self.assertEqual(1, sleep_mock.call_count)
+ sleep_mock.assert_has_calls([call(1)], False)
runCommand_mock.assert_has_calls([
call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),
diff --git a/ambari-common/src/main/python/ambari_commons/thread_utils.py b/ambari-common/src/main/python/ambari_commons/thread_utils.py
deleted file mode 100644
index 952022ca30..0000000000
--- a/ambari-common/src/main/python/ambari_commons/thread_utils.py
+++ /dev/null
@@ -1,43 +0,0 @@
-#!/usr/bin/env python
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-
-def terminate_thread(thread):
- """Terminates a python thread abruptly from another thread.
-
- This is consider a bad pattern to do this.
- If possible, please consider handling stopping of the thread from inside of it
- or creating thread as a separate process (multiprocessing module).
-
- :param thread: a threading.Thread instance
- """
- import ctypes
- if not thread.isAlive():
- return
-
- exc = ctypes.py_object(SystemExit)
- res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
- ctypes.c_long(thread.ident), exc)
- if res == 0:
- raise ValueError("nonexistent thread id")
- elif res > 1:
- # """if it returns a number greater than one, you're in trouble,
- # and you should call it again with exc=NULL to revert the effect"""
- ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
- raise SystemError("PyThreadState_SetAsyncExc failed") \ No newline at end of file