summaryrefslogtreecommitdiff
path: root/ambari-agent
diff options
context:
space:
mode:
authorMahadev Konar <mahadev@apache.org>2012-09-06 07:42:18 +0000
committerMahadev Konar <mahadev@apache.org>2012-09-06 07:42:18 +0000
commit0d08e61db8bc902b8f99ff4c526379b3a968144d (patch)
tree007fab72e72a71f8c5f87c80dc0d1bd03953aab9 /ambari-agent
parent073c43ab2501a9ff3494c73ce49f17e0a2372356 (diff)
AMBARI-702. Add skeleton for Ambari agent that talks to the server and collects information for host. (mahadev)
git-svn-id: https://svn.apache.org/repos/asf/incubator/ambari/branches/AMBARI-666@1381493 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'ambari-agent')
-rw-r--r--ambari-agent/pom.xml100
-rw-r--r--ambari-agent/src/main/python/ambari_agent/ActionQueue.py277
-rw-r--r--ambari-agent/src/main/python/ambari_agent/ActionResults.py52
-rw-r--r--ambari-agent/src/main/python/ambari_agent/AmbariConfig.py62
-rw-r--r--ambari-agent/src/main/python/ambari_agent/Controller.py114
-rw-r--r--ambari-agent/src/main/python/ambari_agent/DaemonHandler.py48
-rw-r--r--ambari-agent/src/main/python/ambari_agent/FileUtil.py185
-rw-r--r--ambari-agent/src/main/python/ambari_agent/Hardware.py47
-rw-r--r--ambari-agent/src/main/python/ambari_agent/Heartbeat.py62
-rw-r--r--ambari-agent/src/main/python/ambari_agent/PackageHandler.py48
-rw-r--r--ambari-agent/src/main/python/ambari_agent/Runner.py66
-rw-r--r--ambari-agent/src/main/python/ambari_agent/ServerStatus.py50
-rw-r--r--ambari-agent/src/main/python/ambari_agent/__init__.py39
-rw-r--r--ambari-agent/src/main/python/ambari_agent/createDaemon.py205
-rw-r--r--ambari-agent/src/main/python/ambari_agent/daemon.py37
-rw-r--r--ambari-agent/src/main/python/ambari_agent/main.py143
-rw-r--r--ambari-agent/src/main/python/ambari_agent/shell.py279
-rw-r--r--ambari-agent/src/main/python/setup.cfg18
-rw-r--r--ambari-agent/src/main/python/setup.py36
-rw-r--r--ambari-agent/src/test/python/TestActionQueue.py94
-rw-r--r--ambari-agent/src/test/python/TestAgentActions.py102
-rw-r--r--ambari-agent/src/test/python/TestFileUtil.py56
-rw-r--r--ambari-agent/src/test/python/TestHardware.py30
-rw-r--r--ambari-agent/src/test/python/TestHeartbeat.py32
-rw-r--r--ambari-agent/src/test/python/TestServerStatus.py29
-rw-r--r--ambari-agent/src/test/python/unitTests.py51
26 files changed, 2244 insertions, 18 deletions
diff --git a/ambari-agent/pom.xml b/ambari-agent/pom.xml
index ef130ec1ed..ba8504a59e 100644
--- a/ambari-agent/pom.xml
+++ b/ambari-agent/pom.xml
@@ -1,5 +1,6 @@
<?xml version="1.0"?>
-<!--
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
@@ -15,28 +16,91 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.ambari</groupId>
<artifactId>ambari-agent</artifactId>
- <version>0.10.0-SNAPSHOT</version>
- <name>ambari-agent</name>
- <url>http://maven.apache.org</url>
- <parent>
- <groupId>org.apache.ambari</groupId>
- <version>0.10.0-SNAPSHOT</version>
- <artifactId>ambari-project</artifactId>
- <relativePath>../ambari-project</relativePath>
- </parent>
- <dependencies>
- <!-- TEST SCOPE DEPENDENCIES -->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
- </dependencies>
+ <packaging>pom</packaging>
+ <version>1.0.3-SNAPSHOT</version>
+ <name>agent</name>
+ <description>Ambari Agent</description>
+ <properties>
+ <final.name>${project.artifactId}-${project.version}</final.name>
+ <package.release>1</package.release>
+ <package.prefix>/usr</package.prefix>
+ <package.conf.dir>/etc/ambari</package.conf.dir>
+ <package.log.dir>/var/log/ambari</package.log.dir>
+ <package.pid.dir>/var/run/ambari</package.pid.dir>
+ </properties>
<build>
<plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <tarLongFileMode>gnu</tarLongFileMode>
+ <descriptors>
+ <descriptor>src/packages/tarball/all.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <executions>
+ <execution>
+ <id>build-tarball</id>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2</version>
+ <executions>
+ <execution>
+ <configuration>
+ <executable>python2.6</executable>
+ <workingDirectory>src/test/python</workingDirectory>
+ <arguments>
+ <argument>unitTests.py</argument>
+ </arguments>
+ <environmentVariables>
+ <PYTHONPATH>../../main/python:$PYTHONPATH</PYTHONPATH>
+ </environmentVariables>
+ <skip>true</skip>
+ </configuration>
+ <id>python-test</id>
+ <phase>test</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ <execution>
+ <configuration>
+ <executable>python2.6</executable>
+ <workingDirectory>target/ambari-agent-${project.version}</workingDirectory>
+ <arguments>
+ <argument>${project.basedir}/src/main/python/setup.py</argument>
+ <argument>clean</argument>
+ <argument>bdist_dumb</argument>
+ </arguments>
+ <environmentVariables>
+ <PYTHONPATH>target/ambari-agent-${project.version}:$PYTHONPATH</PYTHONPATH>
+ </environmentVariables>
+ </configuration>
+ <id>python-package</id>
+ <phase>package</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
+ <extensions>
+ <extension>
+ <groupId>org.apache.maven.wagon</groupId>
+ <artifactId>wagon-ssh-external</artifactId>
+ </extension>
+ </extensions>
</build>
</project>
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
new file mode 100644
index 0000000000..231fa82474
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -0,0 +1,277 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import traceback
+import logging.handlers
+import Queue
+import threading
+import AmbariConfig
+from shell import shellRunner
+from FileUtil import writeFile, createStructure, deleteStructure, getFilePath, appendToFile
+from shell import shellRunner
+import json
+import os
+import time
+import subprocess
+import copy
+
+logger = logging.getLogger()
+installScriptHash = -1
+
+class ActionQueue(threading.Thread):
+ global q, r, clusterId, clusterDefinitionRevision
+ q = Queue.Queue()
+ r = Queue.Queue()
+ clusterId = 'unknown'
+ clusterDefinitionRevision = 0
+
+ def __init__(self, config):
+ global clusterId, clusterDefinitionRevision
+ super(ActionQueue, self).__init__()
+ #threading.Thread.__init__(self)
+ self.config = config
+ self.sh = shellRunner()
+ self._stop = threading.Event()
+ self.maxRetries = config.getint('command', 'maxretries')
+ self.sleepInterval = config.getint('command', 'sleepBetweenRetries')
+
+ def stop(self):
+ self._stop.set()
+
+ def stopped(self):
+ return self._stop.isSet()
+
+ #For unittest
+ def getshellinstance(self):
+ return self.sh
+
+ def put(self, response):
+ if 'actions' in response:
+ actions = response['actions']
+ logger.debug(actions)
+ # for the servers, take a diff of what's running, and what the controller
+ # asked the agent to start. Kill all those servers that the controller
+ # didn't ask us to start
+ sh = shellRunner()
+ runningServers = sh.getServerTracker()
+
+ # get the list of servers the controller wants running
+ serversToRun = {}
+ for action in actions:
+ if action['kind'] == 'START_ACTION':
+ processKey = sh.getServerKey(action['clusterId'],action['clusterDefinitionRevision'],
+ action['component'], action['role'])
+ serversToRun[processKey] = 1
+
+ # create stop actions for the servers that the controller wants stopped
+ for server in runningServers.keys():
+ if server not in serversToRun:
+ sh.stopProcess(server)
+ # now put all the actions in the queue. The ordering is important (we stopped
+ # all unneeded servers first)
+ for action in actions:
+ q.put(action)
+
+ def run(self):
+ global clusterId, clusterDefinitionRevision
+ while not self.stopped():
+ while not q.empty():
+ action = q.get()
+ switches = {
+ 'START_ACTION' : self.startAction,
+ 'RUN_ACTION' : self.runAction,
+ 'CREATE_STRUCTURE_ACTION' : self.createStructureAction,
+ 'DELETE_STRUCTURE_ACTION' : self.deleteStructureAction,
+ 'WRITE_FILE_ACTION' : self.writeFileAction,
+ 'INSTALL_AND_CONFIG_ACTION' : self.installAndConfigAction,
+ 'NO_OP_ACTION' : self.noOpAction
+ }
+
+ exitCode = 1
+ retryCount = 1
+ while (exitCode != 0 and retryCount <= self.maxRetries):
+ result={}
+ try:
+ #pass a copy of action since we don't want anything to change in the
+ #action dict
+ actionCopy = copy.copy(action)
+ result = switches.get(action['kind'], self.unknownAction)(actionCopy)
+ if ('commandResult' in result):
+ commandResult = result['commandResult']
+ exitCode = commandResult['exitCode']
+ if (exitCode == 0):
+ break
+ else:
+ logger.warn(str(action) + " exited with code " + str(exitCode))
+ else:
+ #Really, no commandResult? Is this possible?
+ #TODO: check
+ exitCode = 0
+ break
+ except Exception, err:
+ traceback.print_exc()
+ logger.warn(err)
+ if ('commandResult' in result):
+ commandResult = result['commandResult']
+ if ('exitCode' in commandResult):
+ exitCode = commandResult['exitCode']
+ else:
+ exitCode = 1
+ else:
+ result['commandResult'] = {'exitCode': 1, 'output':"", 'error':""}
+
+ #retry in some time
+ logger.warn("Retrying %s in %d seconds" % (str(action),self.sleepInterval))
+ time.sleep(self.sleepInterval)
+ retryCount += 1
+
+ if (exitCode != 0):
+ result['exitCode']=exitCode
+ result['retryActionCount'] = retryCount - 1
+ else:
+ result['retryActionCount'] = retryCount
+ # Update the result
+ r.put(result)
+ if not self.stopped():
+ time.sleep(5)
+
+ # Store action result to agent response queue
+ def result(self):
+ result = []
+ while not r.empty():
+ result.append(r.get())
+ return result
+
+ # Generate default action response
+ def genResult(self, action):
+ result={}
+ if (action['kind'] == 'INSTALL_AND_CONFIG_ACTION' or action['kind'] == 'NO_OP_ACTION'):
+ result = {
+ 'id' : action['id'],
+ 'kind' : action['kind'],
+ }
+ else:
+ result = {
+ 'id' : action['id'],
+ 'clusterId' : action['clusterId'],
+ 'kind' : action['kind'],
+ 'clusterDefinitionRevision' : action['clusterDefinitionRevision'],
+ 'componentName' : action['component'],
+ 'role' : action['role']
+ }
+ return result
+
+ # Run start action, start a server process and
+ # track the liveness of the children process
+ def startAction(self, action):
+ result = self.genResult(action)
+ return self.sh.startProcess(action['clusterId'],
+ action['clusterDefinitionRevision'],
+ action['component'],
+ action['role'],
+ action['command'],
+ action['user'], result)
+
+ # Write file action
+ def writeFileAction(self, action, fileName=""):
+ result = self.genResult(action)
+ return writeFile(action, result, fileName)
+
+ # get the install file
+ def getInstallFilename(self,id):
+ return "ambari-install-file-"+id
+
+ # Install and configure action
+ def installAndConfigAction(self, action):
+ global installScriptHash
+ r=self.genResult(action)
+ w = self.writeFileAction(action,self.getInstallFilename(action['id']))
+ commandResult = {}
+ if w['exitCode']!=0:
+ commandResult['error'] = w['stderr']
+ commandResult['exitCode'] = w['exitCode']
+ r['commandResult'] = commandResult
+ return r
+
+ if 'command' not in action:
+ # this is hardcoded to do puppet specific stuff for now
+ # append the content of the puppet file to the file written above
+ filepath = getFilePath(action,self.getInstallFilename(action['id']))
+ logger.info("File path for puppet top level script: " + filepath)
+ p = self.sh.run(['/bin/cat',AmbariConfig.config.get('puppet','driver')])
+ if p['exitCode']!=0:
+ commandResult['error'] = p['error']
+ commandResult['exitCode'] = p['exitCode']
+ r['commandResult'] = commandResult
+ return r
+ logger.debug("The contents of the static file " + p['output'])
+ appendToFile(p['output'],filepath)
+ arr = [AmbariConfig.config.get('puppet','commandpath') , filepath]
+ logger.debug(arr)
+ action['command'] = arr
+ logger.debug(action['command'])
+ commandResult = self.sh.run(action['command'])
+ logger.debug("PUPPET COMMAND OUTPUT: " + commandResult['output'])
+ logger.debug("PUPPET COMMAND ERROR: " + commandResult['error'])
+ if commandResult['exitCode'] == 0:
+ installScriptHash = action['id']
+ r['commandResult'] = commandResult
+ return r
+
+ # Run command action
+ def runAction(self, action):
+ result = self.genResult(action)
+ return self.sh.runAction(action['clusterId'],
+ action['component'],
+ action['role'],
+ action['user'],
+ action['command'],
+ action['cleanUpCommand'], result)
+
+ # Create directory structure for cluster
+ def createStructureAction(self, action):
+ result = self.genResult(action)
+ result['exitCode'] = 0
+ return createStructure(action, result)
+
+ # Delete directory structure for cluster
+ def deleteStructureAction(self, action):
+ result = self.genResult(action)
+ result['exitCode'] = 0
+ return deleteStructure(action, result)
+
+ def noOpAction(self, action):
+ r = {'id' : action['id']}
+ return r
+
+ # Handle unknown action
+ def unknownAction(self, action):
+ logger.error('Unknown action: %s' % action['id'])
+ result = { 'id': action['id'] }
+ return result
+
+ # Discover agent idle state
+ def isIdle(self):
+ return q.empty()
+
+ # Get the hash of the script currently used for install/config
+ def getInstallScriptHash(self):
+ return installScriptHash
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionResults.py b/ambari-agent/src/main/python/ambari_agent/ActionResults.py
new file mode 100644
index 0000000000..7603fa1bb6
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/ActionResults.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import logging.handlers
+import Queue
+import ActionQueue
+
+logger = logging.getLogger()
+
+class ActionResults:
+ global r
+
+ # Build action results list from memory queue
+ def build(self):
+ results = []
+ while not ActionQueue.r.empty():
+ result = {
+ 'clusterId': 'unknown',
+ 'id' : 'action-001',
+ 'kind' : 'STOP_ACTION',
+ 'commandResults' : [],
+ 'cleanUpCommandResults' : [],
+ 'serverName' : 'hadoop.datanode'
+ }
+ results.append(result)
+ logger.info(results)
+ return results
+
+def main(argv=None):
+ ar = ActionResults()
+ print ar.build()
+
+if __name__ == '__main__':
+ main()
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
new file mode 100644
index 0000000000..96041970e7
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import logging.handlers
+import ConfigParser
+import StringIO
+
+config = ConfigParser.RawConfigParser()
+content = """
+[server]
+url=http://localhost:4080
+
+[agent]
+prefix=/tmp/ambari
+
+[stack]
+installprefix=/var/ambari/
+
+[puppet]
+puppet_home=/usr/local/bin
+facter_home=/usr/local/bin
+
+[command]
+maxretries=2
+sleepBetweenRetries=1
+
+"""
+s = StringIO.StringIO(content)
+config.readfp(s)
+
+class AmbariConfig:
+ def getConfig(self):
+ global config
+ return config
+
+def setConfig(customConfig):
+ global config
+ config = customConfig
+
+def main():
+ print config
+
+if __name__ == "__main__":
+ main()
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
new file mode 100644
index 0000000000..30ea9d77ab
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -0,0 +1,114 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import logging.handlers
+import signal
+import json
+import socket
+import sys, traceback
+import time
+import threading
+import urllib2
+from urllib2 import Request, urlopen, URLError
+import AmbariConfig
+from Heartbeat import Heartbeat
+from ActionQueue import ActionQueue
+from optparse import OptionParser
+from wsgiref.simple_server import ServerHandler
+
+logger = logging.getLogger()
+
+class Controller(threading.Thread):
+
+ def __init__(self, config):
+ threading.Thread.__init__(self)
+ logger.debug('Initializing Controller RPC thread.')
+ self.lock = threading.Lock()
+ self.safeMode = True
+ self.credential = None
+ self.config = config
+ #Disabled security until we have fix for AMBARI-157
+ #if(config.get('controller', 'user')!=None and config.get('controller', 'password')!=None):
+ # self.credential = { 'user' : config.get('controller', 'user'),
+ # 'password' : config.get('controller', 'password')
+ # }
+ self.url = config.get('server', 'url') + '/agent/heartbeat/' + socket.gethostname()
+
+ def start(self):
+ self.actionQueue = ActionQueue(self.config)
+ self.actionQueue.start()
+ self.heartbeat = Heartbeat(self.actionQueue)
+
+ def __del__(self):
+ logger.info("Server connection disconnected.")
+
+ def run(self):
+ id='-1'
+ opener = urllib2.build_opener()
+ urllib2.install_opener(opener)
+ retry=False
+ firstTime=True
+ while True:
+ try:
+ if retry==False:
+ data = json.dumps(self.heartbeat.build(id))
+ logger.debug(data)
+ req = urllib2.Request(self.url, data, {'Content-Type': 'application/json'})
+ f = urllib2.urlopen(req)
+ response = f.read()
+ f.close()
+ data = json.loads(response)
+ id=int(data['responseId'])
+ self.actionQueue.put(data)
+ if retry==True or firstTime==True:
+ logger.info("Controller connection established")
+ firstTime=False
+ retry=False
+ except Exception, err:
+ retry=True
+ if "code" in err:
+ logger.error(err.code)
+ else:
+ logger.error("Unable to connect to: "+self.url,exc_info=True)
+ if self.actionQueue.isIdle():
+ time.sleep(30)
+ else:
+ time.sleep(1)
+
+def main(argv=None):
+ # Allow Ctrl-C
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+ logger.setLevel(logging.INFO)
+ formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - %(message)s")
+ stream_handler = logging.StreamHandler()
+ stream_handler.setFormatter(formatter)
+ logger.addHandler(stream_handler)
+
+ logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
+
+ config = AmbariConfig.config
+ collector = Controller(config)
+ collector.start()
+ collector.run()
+
+if __name__ == '__main__':
+ main()
diff --git a/ambari-agent/src/main/python/ambari_agent/DaemonHandler.py b/ambari-agent/src/main/python/ambari_agent/DaemonHandler.py
new file mode 100644
index 0000000000..b72662165b
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/DaemonHandler.py
@@ -0,0 +1,48 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import simplejson
+import web
+from mimerender import mimerender
+from Runner import Runner
+
+render_json = lambda **args: simplejson.dumps(args)
+
+class DaemonHandler:
+ @mimerender(
+ default = 'json',
+ json = render_json
+ )
+
+ def GET(self, cmd, daemonName):
+ data = []
+ data['cmd']=cmd
+ data['daemonName']=daemonName
+ dispatcher = Runner()
+ return dispatcher.run(data)
+
+ def POST(self, cmd):
+ web.header('Content-Type','application/json')
+ data = web.data();
+ jsonp = simplejson.loads(data)
+ jsonp['cmd']=cmd
+ dispatcher = Runner()
+ return dispatcher.run(jsonp)
+
diff --git a/ambari-agent/src/main/python/ambari_agent/FileUtil.py b/ambari-agent/src/main/python/ambari_agent/FileUtil.py
new file mode 100644
index 0000000000..f24046baf5
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/FileUtil.py
@@ -0,0 +1,185 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from pwd import getpwnam
+from grp import getgrnam
+import logging
+import logging.handlers
+import getpass
+import os, errno
+import sys, traceback
+import ConfigParser
+import shutil
+import StringIO
+import AmbariConfig
+
+logger = logging.getLogger()
+
+def getFilePath(action, fileName=""):
+ #Change the method signature to take the individual action fields
+ pathComp=""
+ if 'clusterId' in action:
+ pathComp = action['clusterId']
+ if 'role' in action:
+ pathComp = pathComp + "-" + action['role']
+ path = os.path.join(AmbariConfig.config.get('agent','prefix'),
+ "clusters",
+ pathComp)
+ fullPathName=""
+ if fileName != "":
+ fullPathName=os.path.join(path, fileName)
+ else:
+ fileInfo = action['file']
+ fullPathName=os.path.join(path, fileInfo['path'])
+ return fullPathName
+
+def appendToFile(data,absolutePath):
+ f = open(absolutePath, 'a')
+ f.write(data)
+ f.close()
+
+def writeFile(action, result, fileName=""):
+ fileInfo = action['file']
+ pathComp=""
+ if 'clusterId' in action:
+ pathComp = action['clusterId']
+ if 'role' in action:
+ pathComp = pathComp + "-" + action['role']
+ try:
+ path = os.path.join(AmbariConfig.config.get('agent','prefix'),
+ "clusters",
+ pathComp)
+ user=getpass.getuser()
+ if 'owner' in fileInfo:
+ user=fileInfo['owner']
+ group=os.getgid()
+ if 'group' in fileInfo:
+ group=fileInfo['group']
+ fullPathName=""
+ if fileName != "":
+ fullPathName=os.path.join(path, fileName)
+ else:
+ fullPathName=os.path.join(path, fileInfo['path'])
+ logger.debug("path in writeFile: %s" % fullPathName)
+ content=fileInfo['data']
+ try:
+ if isinstance(user, int)!=True:
+ user=getpwnam(user)[2]
+ if isinstance(group, int)!=True:
+ group=getgrnam(group)[2]
+ except Exception:
+ logger.warn("can not find user uid/gid: (%s/%s) for writing %s" % (user, group, fullPathName))
+ if 'permission' in fileInfo:
+ if fileInfo['permission'] is not None:
+ permission=fileInfo['permission']
+ else:
+ permission=0750
+ oldMask = os.umask(0)
+ if 'umask' in fileInfo:
+ if fileInfo['umask'] is not None:
+ umask=int(fileInfo['umask'])
+ else:
+ umask=oldMask
+ os.umask(int(umask))
+ prefix = os.path.dirname(fullPathName)
+ try:
+ os.makedirs(prefix)
+ except OSError as err:
+ if err.errno == errno.EEXIST:
+ pass
+ else:
+ raise
+ f = open(fullPathName, 'w')
+ f.write(content)
+ f.close()
+ if os.getuid()==0:
+ os.chmod(fullPathName, permission)
+ os.chown(fullPathName, user, group)
+ os.umask(oldMask)
+ result['exitCode'] = 0
+ except Exception, err:
+ traceback.print_exc()
+ result['exitCode'] = 1
+ result['error'] = traceback.format_exc()
+ return result
+
+def createStructure(action, result):
+ try:
+ workdir = action['workDirComponent']
+ path = AmbariConfig.config.get('agent','prefix')+"/clusters/"+workdir
+ shutil.rmtree(path, 1)
+ os.makedirs(path+"/stack")
+ os.makedirs(path+"/logs")
+ os.makedirs(path+"/data")
+ os.makedirs(path+"/pkgs")
+ os.makedirs(path+"/config")
+ result['exitCode'] = 0
+ except Exception, err:
+ traceback.print_exc()
+ result['exitCode'] = 1
+ result['error'] = traceback.format_exc()
+ return result
+
+def deleteStructure(action, result):
+ try:
+ workdir = action['workDirComponent']
+ path = AmbariConfig.config.get('agent','prefix')+"/clusters/"+workdir
+ if os.path.exists(path):
+ shutil.rmtree(path)
+ result['exitCode'] = 0
+ except Exception, err:
+ result['exitCode'] = 1
+ result['error'] = traceback.format_exc()
+ return result
+
+def main():
+
+ action = { 'clusterId' : 'abc', 'role' : 'hdfs' }
+ result = {}
+ print createStructure(action, result)
+
+ configFile = {
+ "data" : "test",
+ "owner" : os.getuid(),
+ "group" : os.getgid() ,
+ "permission" : 0700,
+ "path" : "/tmp/ambari_file_test/_file_write_test",
+ "umask" : 022
+ }
+ action = { 'file' : configFile }
+ result = { }
+ print writeFile(action, result)
+
+ configFile = {
+ "data" : "test",
+ "owner" : "eyang",
+ "group" : "staff",
+ "permission" : "0700",
+ "path" : "/tmp/ambari_file_test/_file_write_test",
+ "umask" : "022"
+ }
+ result = { }
+ action = { 'file' : configFile }
+ print writeFile(action, result)
+
+ print deleteStructure(action, result)
+
+if __name__ == "__main__":
+ main()
diff --git a/ambari-agent/src/main/python/ambari_agent/Hardware.py b/ambari-agent/src/main/python/ambari_agent/Hardware.py
new file mode 100644
index 0000000000..d3a4774cb8
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/Hardware.py
@@ -0,0 +1,47 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from shell import shellRunner
+import multiprocessing
+import platform
+import AmbariConfig
+
+class Hardware:
+ def __init__(self):
+ facterHome = AmbariConfig.config.get('puppet', 'facter_home')
+
+ self.hardware = { 'coreCount' : 4,
+ 'cpuSpeed' : 4,
+ 'cpuFlag' : 4,
+ 'diskCount' : 3,
+ 'netSpeed' : 3,
+ 'ramSize' : 2
+ }
+
+ def get(self):
+ return self.hardware
+
+
+def main(argv=None):
+ hardware = Hardware()
+ print hardware.get()
+
+if __name__ == '__main__':
+ main()
diff --git a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
new file mode 100644
index 0000000000..1b22249263
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import json
+from Hardware import Hardware
+from ActionQueue import ActionQueue
+from ServerStatus import ServerStatus
+import socket
+import time
+
+firstContact = True
+class Heartbeat:
+
+ def __init__(self, actionQueue):
+ self.actionQueue = actionQueue
+ self.hardware = Hardware()
+
+ def build(self, id='-1'):
+ global clusterId, clusterDefinitionRevision, firstContact
+ serverStatus = ServerStatus()
+ timestamp = int(time.time()*1000)
+ queueResult = self.actionQueue.result()
+ installedRoleStates = serverStatus.build()
+ heartbeat = { 'responseId' : int(id),
+ 'timestamp' : timestamp,
+ 'hostname' : socket.gethostname(),
+ 'hardwareProfile' : self.hardware.get(),
+ 'idle' : self.actionQueue.isIdle(),
+ 'installScriptHash' : self.actionQueue.getInstallScriptHash(),
+ 'firstContact' : firstContact
+ }
+ if len(queueResult)!=0:
+ heartbeat['actionResults'] = queueResult
+ if len(installedRoleStates)!=0:
+ heartbeat['installedRoleStates'] = installedRoleStates
+ firstContact = False
+ return heartbeat
+
+def main(argv=None):
+ actionQueue = ActionQueue()
+ heartbeat = Heartbeat(actionQueue)
+ print json.dumps(heartbeat.build())
+
+if __name__ == '__main__':
+ main()
diff --git a/ambari-agent/src/main/python/ambari_agent/PackageHandler.py b/ambari-agent/src/main/python/ambari_agent/PackageHandler.py
new file mode 100644
index 0000000000..be05575b47
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/PackageHandler.py
@@ -0,0 +1,48 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import simplejson
+import web
+from mimerender import mimerender
+from Runner import Runner
+
+render_json = lambda **args: simplejson.dumps(args)
+
+class PackageHandler:
+ @mimerender(
+ default = 'json',
+ json = render_json
+ )
+
+ def GET(self, cmd, packageName):
+ data = []
+ data['cmd'] = cmd
+ data['package'] = { "name" : packageName }
+ dispatcher = Runner()
+ return dispatcher.run(data)
+
+ def POST(self, cmd):
+ web.header('Content-Type','application/json')
+ data = web.data()
+ jsonp = simplejson.loads(data)
+ jsonp['cmd']=cmd
+ dispatcher = Runner()
+ return dispatcher.run(jsonp)
+
diff --git a/ambari-agent/src/main/python/ambari_agent/Runner.py b/ambari-agent/src/main/python/ambari_agent/Runner.py
new file mode 100644
index 0000000000..4b86381095
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/Runner.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import threading
+from daemon import daemonRunner
+from package import packageRunner
+from shell import shellRunner
+
+class Runner(threading.Thread):
+ """ Runs the actions coming from the server """
+ __instance = None
+ lock = None
+ def __init__(self):
+ if Runner.__instance is None:
+ Runner.lock = threading.RLock()
+ Runner.__instance = self
+
+ def run(self, data):
+ Runner.lock.acquire()
+ try:
+ if data['actionType']=='info':
+ ph = packageRunner()
+ result = ph.info(data['packages'])
+ elif data['actionType']=='install':
+ ph = packageRunner()
+ if 'dry-run' in data:
+ opt = data['dry-run']
+ else:
+ opt = 'false'
+ result = ph.install(data['packages'], opt)
+ elif data['actionType']=='remove':
+ ph = packageRunner()
+ if 'dry-run' in data:
+ opt = data['dry-run']
+ else:
+ opt = 'false'
+ result = ph.remove(data['packages'], opt)
+ elif data['actionType']=='status':
+ dh = daemonRunner()
+ result = dh.status(data['daemonName'])
+ elif data['actionType']=='start':
+ dh = daemonRunner()
+ result = dh.start(data['daemonName'])
+ elif data['actionType']=='stop':
+ dh = daemonRunner()
+ result = dh.stop(data['daemonName'])
+ return result
+ finally:
+ Runner.lock.release()
diff --git a/ambari-agent/src/main/python/ambari_agent/ServerStatus.py b/ambari-agent/src/main/python/ambari_agent/ServerStatus.py
new file mode 100644
index 0000000000..53a0a9a722
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/ServerStatus.py
@@ -0,0 +1,50 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from shell import shellRunner
+import logging
+import logging.handlers
+
+logger = logging.getLogger()
+global serverTracker
+
+class ServerStatus:
+ def build(self):
+ sh = shellRunner()
+ list = []
+ servers = sh.getServerTracker()
+ for server in servers:
+ (clusterId, clusterDefinitionRevision, component, role) = server.split("/")
+ result = {
+ 'clusterId' : clusterId,
+ 'clusterDefinitionRevision' : clusterDefinitionRevision,
+ 'componentName' : component,
+ 'roleName' : role,
+ 'serverStatus' : 'STARTED'
+ }
+ list.append(result)
+ return list
+
+def main(argv=None):
+ serverStatus = ServerStatus()
+ print serverStatus.build()
+
+if __name__ == '__main__':
+ main()
diff --git a/ambari-agent/src/main/python/ambari_agent/__init__.py b/ambari-agent/src/main/python/ambari_agent/__init__.py
new file mode 100644
index 0000000000..3bfb534323
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/__init__.py
@@ -0,0 +1,39 @@
+#!/usr/bin/env python2.6
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+Ambari Agent
+
+"""
+
+from __future__ import generators
+
+__version__ = "0.1.0"
+__author__ = [
+ "Eric Yang <eyang@apache.org>",
+ "Kan Zhang <kanzhangmail@yahoo.com>"
+]
+__license__ = "Apache License v2.0"
+__contributors__ = "see http://incubator.apache.org/ambari/contributors"
+
+import logging
+import logging.handlers
+import threading
+import sys
+import time
+import signal
+
diff --git a/ambari-agent/src/main/python/ambari_agent/createDaemon.py b/ambari-agent/src/main/python/ambari_agent/createDaemon.py
new file mode 100644
index 0000000000..764211ca00
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/createDaemon.py
@@ -0,0 +1,205 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+"""Disk And Execution MONitor (Daemon)
+
+Configurable daemon behaviors:
+
+ 1.) The current working directory set to the "/" directory.
+ 2.) The current file creation mode mask set to 0.
+ 3.) Close all open files (1024).
+ 4.) Redirect standard I/O streams to "/dev/null".
+
+A failed call to fork() now raises an exception.
+
+References:
+ 1) Advanced Programming in the Unix Environment: W. Richard Stevens
+ 2) Unix Programming Frequently Asked Questions:
+ http://www.erlenstar.demon.co.uk/unix/faq_toc.html
+"""
+
+__author__ = "Chad J. Schroeder"
+__copyright__ = "Copyright (C) 2005 Chad J. Schroeder"
+
+__revision__ = "$Id$"
+__version__ = "0.2"
+
+# Standard Python modules.
+import os # Miscellaneous OS interfaces.
+import sys # System-specific parameters and functions.
+
+# Default daemon parameters.
+# File mode creation mask of the daemon.
+UMASK = 0022
+
+# Default working directory for the daemon.
+WORKDIR = "/"
+
+# Default maximum for the number of available file descriptors.
+MAXFD = 1024
+
+# The standard I/O file descriptors are redirected to /dev/null by default.
+if (hasattr(os, "devnull")):
+ REDIRECT_TO = os.devnull
+else:
+ REDIRECT_TO = "/dev/null"
+
+def createDaemon():
+ """Detach a process from the controlling terminal and run it in the
+ background as a daemon.
+ """
+
+ try:
+ # Fork a child process so the parent can exit. This returns control to
+ # the command-line or shell. It also guarantees that the child will not
+ # be a process group leader, since the child receives a new process ID
+ # and inherits the parent's process group ID. This step is required
+ # to insure that the next call to os.setsid is successful.
+ pid = os.fork()
+ except OSError, e:
+ raise Exception, "%s [%d]" % (e.strerror, e.errno)
+
+ if (pid == 0): # The first child.
+ # To become the session leader of this new session and the process group
+ # leader of the new process group, we call os.setsid(). The process is
+ # also guaranteed not to have a controlling terminal.
+ os.setsid()
+
+ # Is ignoring SIGHUP necessary?
+ #
+ # It's often suggested that the SIGHUP signal should be ignored before
+ # the second fork to avoid premature termination of the process. The
+ # reason is that when the first child terminates, all processes, e.g.
+ # the second child, in the orphaned group will be sent a SIGHUP.
+ #
+ # "However, as part of the session management system, there are exactly
+ # two cases where SIGHUP is sent on the death of a process:
+ #
+ # 1) When the process that dies is the session leader of a session that
+ # is attached to a terminal device, SIGHUP is sent to all processes
+ # in the foreground process group of that terminal device.
+ # 2) When the death of a process causes a process group to become
+ # orphaned, and one or more processes in the orphaned group are
+ # stopped, then SIGHUP and SIGCONT are sent to all members of the
+ # orphaned group." [2]
+ #
+ # The first case can be ignored since the child is guaranteed not to have
+ # a controlling terminal. The second case isn't so easy to dismiss.
+ # The process group is orphaned when the first child terminates and
+ # POSIX.1 requires that every STOPPED process in an orphaned process
+ # group be sent a SIGHUP signal followed by a SIGCONT signal. Since the
+ # second child is not STOPPED though, we can safely forego ignoring the
+ # SIGHUP signal. In any case, there are no ill-effects if it is ignored.
+ #
+ # import signal # Set handlers for asynchronous events.
+ # signal.signal(signal.SIGHUP, signal.SIG_IGN)
+
+ try:
+ # Fork a second child and exit immediately to prevent zombies. This
+ # causes the second child process to be orphaned, making the init
+ # process responsible for its cleanup. And, since the first child is
+ # a session leader without a controlling terminal, it's possible for
+ # it to acquire one by opening a terminal in the future (System V-
+ # based systems). This second fork guarantees that the child is no
+ # longer a session leader, preventing the daemon from ever acquiring
+ # a controlling terminal.
+ pid = os.fork() # Fork a second child.
+ except OSError, e:
+ raise Exception, "%s [%d]" % (e.strerror, e.errno)
+
+ if (pid == 0): # The second child.
+ # Since the current working directory may be a mounted filesystem, we
+ # avoid the issue of not being able to unmount the filesystem at
+ # shutdown time by changing it to the root directory.
+ os.chdir(WORKDIR)
+ # We probably don't want the file mode creation mask inherited from
+ # the parent, so we give the child complete control over permissions.
+ os.umask(UMASK)
+ else:
+ # exit() or _exit()? See below.
+ os._exit(0) # Exit parent (the first child) of the second child.
+ else:
+ # exit() or _exit()?
+ # _exit is like exit(), but it doesn't call any functions registered
+ # with atexit (and on_exit) or any registered signal handlers. It also
+ # closes any open file descriptors. Using exit() may cause all stdio
+ # streams to be flushed twice and any temporary files may be unexpectedly
+ # removed. It's therefore recommended that child branches of a fork()
+ # and the parent branch(es) of a daemon use _exit().
+ os._exit(0) # Exit parent of the first child.
+
+ # Close all open file descriptors. This prevents the child from keeping
+ # open any file descriptors inherited from the parent. There is a variety
+ # of methods to accomplish this task. Three are listed below.
+ #
+ # Try the system configuration variable, SC_OPEN_MAX, to obtain the maximum
+ # number of open file descriptors to close. If it doesn't exists, use
+ # the default value (configurable).
+ #
+ # try:
+ # maxfd = os.sysconf("SC_OPEN_MAX")
+ # except (AttributeError, ValueError):
+ # maxfd = MAXFD
+ #
+ # OR
+ #
+ # if (os.sysconf_names.has_key("SC_OPEN_MAX")):
+ # maxfd = os.sysconf("SC_OPEN_MAX")
+ # else:
+ # maxfd = MAXFD
+ #
+ # OR
+ #
+ # Use the getrlimit method to retrieve the maximum file descriptor number
+ # that can be opened by this process. If there is not limit on the
+ # resource, use the default value.
+ #
+ import resource # Resource usage information.
+ maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
+ if (maxfd == resource.RLIM_INFINITY):
+ maxfd = MAXFD
+
+ # Iterate through and close all file descriptors.
+ for fd in range(0, maxfd):
+ try:
+ os.close(fd)
+ except OSError: # ERROR, fd wasn't open to begin with (ignored)
+ pass
+
+ # Redirect the standard I/O file descriptors to the specified file. Since
+ # the daemon has no controlling terminal, most daemons redirect stdin,
+ # stdout, and stderr to /dev/null. This is done to prevent side-effects
+ # from reads and writes to the standard I/O file descriptors.
+
+ # This call to open is guaranteed to return the lowest file descriptor,
+ # which will be 0 (stdin), since it was closed above.
+ os.open(REDIRECT_TO, os.O_RDWR) # standard input (0)
+
+ # Duplicate standard input to standard output and standard error.
+ os.dup2(0, 1) # standard output (1)
+ os.dup2(0, 2) # standard error (2)
+
+ return(0)
+
+if __name__ == "__main__":
+
+ retCode = createDaemon()
+
+ sys.exit(retCode)
diff --git a/ambari-agent/src/main/python/ambari_agent/daemon.py b/ambari-agent/src/main/python/ambari_agent/daemon.py
new file mode 100644
index 0000000000..31607f56fa
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/daemon.py
@@ -0,0 +1,37 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from shell import shellRunner
+
+class daemonRunner:
+ def start(self, name):
+ sh = shellRunner()
+ script = [ '/etc/init.d/'+name, 'start' ]
+ return sh.run(script)
+
+ def stop(self, name):
+ sh = shellRunner()
+ script = [ '/etc/init.d/'+name, 'stop' ]
+ return sh.run(script)
+
+ def status(self, name):
+ sh = shellRunner()
+ script = [ '/etc/init.d/'+name, 'stop' ]
+ return sh.run(script)
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
new file mode 100644
index 0000000000..f7e9abc7e8
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -0,0 +1,143 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import logging.handlers
+import code
+import signal
+import sys, traceback
+import os
+import time
+import ConfigParser
+from createDaemon import createDaemon
+from Controller import Controller
+from shell import getTempFiles
+from shell import killstaleprocesses
+import AmbariConfig
+
+logger = logging.getLogger()
+agentPid = os.getpid()
+
+if 'AMBARI_PID_DIR' in os.environ:
+ pidfile = os.environ['AMBARI_PID_DIR'] + "/ambari-agent.pid"
+else:
+ pidfile = "/var/run/ambari/ambari-agent.pid"
+
+if 'AMBARI_LOG_DIR' in os.environ:
+ logfile = os.environ['AMBARI_LOG_DIR'] + "/ambari-agent.log"
+else:
+ logfile = "/var/log/ambari/ambari-agent.log"
+
+def signal_handler(signum, frame):
+ #we want the handler to run only for the agent process and not
+ #for the children (e.g. namenode, etc.)
+ if (os.getpid() != agentPid):
+ os._exit(0)
+ logger.info('signal received, exiting.')
+ try:
+ os.unlink(pidfile)
+ except Exception:
+ logger.warn("Unable to remove: "+pidfile)
+ traceback.print_exc()
+
+ tempFiles = getTempFiles()
+ for tempFile in tempFiles:
+ if os.path.exists(tempFile):
+ try:
+ os.unlink(tempFile)
+ except Exception:
+ traceback.print_exc()
+ logger.warn("Unable to remove: "+tempFile)
+ os._exit(0)
+
+def debug(sig, frame):
+ """Interrupt running process, and provide a python prompt for
+ interactive debugging."""
+ d={'_frame':frame} # Allow access to frame object.
+ d.update(frame.f_globals) # Unless shadowed by global
+ d.update(frame.f_locals)
+
+ message = "Signal recieved : entering python shell.\nTraceback:\n"
+ message += ''.join(traceback.format_stack(frame))
+ logger.info(message)
+
+def main():
+ global config
+ default_cfg = { 'agent' : { 'prefix' : '/home/ambari' } }
+ config = ConfigParser.RawConfigParser(default_cfg)
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+ signal.signal(signal.SIGUSR1, debug)
+ if (len(sys.argv) >1) and sys.argv[1]=='stop':
+ # stop existing Ambari agent
+ try:
+ f = open(pidfile, 'r')
+ pid = f.read()
+ pid = int(pid)
+ f.close()
+ os.kill(pid, signal.SIGTERM)
+ time.sleep(5)
+ if os.path.exists(pidfile):
+ raise Exception("PID file still exists.")
+ os._exit(0)
+ except Exception, err:
+ os.kill(pid, signal.SIGKILL)
+ os._exit(1)
+
+ # Check if there is another instance running
+ if os.path.isfile(pidfile):
+ print("%s already exists, exiting" % pidfile)
+ sys.exit(1)
+ else:
+ # Daemonize current instance of Ambari Agent
+ #retCode = createDaemon()
+ pid = str(os.getpid())
+ file(pidfile, 'w').write(pid)
+
+
+ logger.setLevel(logging.INFO)
+ formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - %(message)s")
+ rotateLog = logging.handlers.RotatingFileHandler(logfile, "a", 10000000, 10)
+ rotateLog.setFormatter(formatter)
+ logger.addHandler(rotateLog)
+ credential = None
+
+ # Check for ambari configuration file.
+ try:
+ config = AmbariConfig.config
+ if(os.path.exists('/etc/ambari/ambari.ini')):
+ config.read('/etc/ambari/ambari.ini')
+ AmbariConfig.setConfig(config)
+ else:
+ raise Exception("No config found, use default")
+ except Exception, err:
+ logger.warn(err)
+
+ killstaleprocesses()
+ logger.info("Connecting to Server at: "+config.get('server', 'url'))
+
+ # Launch Controller communication
+ controller = Controller(config)
+ controller.start()
+ controller.run()
+ logger.info("finished")
+
+if __name__ == "__main__":
+ main()
diff --git a/ambari-agent/src/main/python/ambari_agent/shell.py b/ambari-agent/src/main/python/ambari_agent/shell.py
new file mode 100644
index 0000000000..75541208a8
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/shell.py
@@ -0,0 +1,279 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from pwd import getpwnam
+from grp import getgrnam
+import AmbariConfig
+import logging
+import logging.handlers
+import subprocess
+import os
+import tempfile
+import signal
+import sys
+import threading
+import time
+import traceback
+import shutil
+
+global serverTracker
+serverTracker = {}
+logger = logging.getLogger()
+
+threadLocal = threading.local()
+
+tempFiles = []
+def noteTempFile(filename):
+ tempFiles.append(filename)
+
+def getTempFiles():
+ return tempFiles
+
+def killstaleprocesses():
+ logger.info ("Killing stale processes")
+ prefix = AmbariConfig.config.get('stack','installprefix')
+ files = os.listdir(prefix)
+ for file in files:
+ if str(file).endswith(".pid"):
+ pid = str(file).split('.')[0]
+ killprocessgrp(int(pid))
+ os.unlink(os.path.join(prefix,file))
+ logger.info ("Killed stale processes")
+
+def killprocessgrp(pid):
+ try:
+ os.killpg(pid, signal.SIGTERM)
+ time.sleep(5)
+ try:
+ os.killpg(pid, signal.SIGKILL)
+ except:
+ logger.warn("Failed to send SIGKILL to PID %d. Process exited?" % (pid))
+ except:
+ logger.warn("Failed to kill PID %d" % (pid))
+
+def changeUid():
+ try:
+ os.setuid(threadLocal.uid)
+ except Exception:
+ logger.warn("can not switch user for running command.")
+
+class shellRunner:
+ # Run any command
+ def run(self, script, user=None):
+ try:
+ if user!=None:
+ user=getpwnam(user)[2]
+ else:
+ user = os.getuid()
+ threadLocal.uid = user
+ except Exception:
+ logger.warn("can not switch user for RUN_COMMAND.")
+ code = 0
+ cmd = " "
+ cmd = cmd.join(script)
+ p = subprocess.Popen(cmd, preexec_fn=changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
+ out, err = p.communicate()
+ code = p.wait()
+ logger.debug("Exitcode for %s is %d" % (cmd,code))
+ return {'exitCode': code, 'output': out, 'error': err}
+
+ # dispatch action types
+ def runAction(self, clusterId, component, role, user, command, cleanUpCommand, result):
+ oldDir = os.getcwd()
+ #TODO: handle this better. Don't like that it is doing a chdir for the main process
+ os.chdir(self.getWorkDir(clusterId, role))
+ oldUid = os.getuid()
+ try:
+ if user is not None:
+ user=getpwnam(user)[2]
+ else:
+ user = oldUid
+ threadLocal.uid = user
+ except Exception:
+ logger.warn("%s %s %s can not switch user for RUN_ACTION." % (clusterId, component, role))
+ code = 0
+ cmd = sys.executable
+ tempfilename = tempfile.mktemp()
+ tmp = open(tempfilename, 'w')
+ tmp.write(command['script'])
+ tmp.close()
+ cmd = "%s %s %s" % (cmd, tempfilename, " ".join(command['param']))
+ commandResult = {}
+ p = subprocess.Popen(cmd, preexec_fn=changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
+ out, err = p.communicate()
+ code = p.wait()
+ if code != 0:
+ commandResult['output'] = out
+ commandResult['error'] = err
+ commandResult['exitCode'] = code
+ result['commandResult'] = commandResult
+ os.unlink(tempfilename)
+ if code != 0:
+ tempfilename = tempfile.mktemp()
+ tmp = open(tempfilename, 'w')
+ tmp.write(command['script'])
+ tmp.close()
+ cmd = sys.executable
+ cmd = "%s %s %s" % (cmd, tempfilename, " ".join(cleanUpCommand['param']))
+ cleanUpCode = 0
+ cleanUpResult = {}
+ p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
+ out, err = p.communicate()
+ cleanUpCode = p.wait()
+ if cleanUpCode != 0:
+ cleanUpResult['output'] = out
+ cleanUpResult['error'] = err
+ cleanUpResult['exitCode'] = cleanUpCode
+ result['cleanUpResult'] = cleanUpResult
+ os.unlink(tempfilename)
+ os._exit(1)
+ try:
+ os.chdir(oldDir)
+ except Exception:
+ logger.warn("%s %s %s can not restore environment for RUN_ACTION." % (clusterId, component, role))
+ return result
+
+ # Start a process and presist its state
+ def startProcess(self, clusterId, clusterDefinitionRevision, component, role, script, user, result):
+ global serverTracker
+ oldDir = os.getcwd()
+ try:
+ os.chdir(self.getWorkDir(clusterId,role))
+ except Exception:
+ logger.warn("%s %s %s can not switch dir for START_ACTION." % (clusterId, component, role))
+ oldUid = os.getuid()
+ try:
+ if user is not None:
+ user=getpwnam(user)[2]
+ else:
+ user = os.getuid()
+ threadLocal.uid = user
+ except Exception:
+ logger.warn("%s %s %s can not switch user for START_ACTION." % (clusterId, component, role))
+ code = 0
+ commandResult = {}
+ process = self.getServerKey(clusterId,clusterDefinitionRevision,component,role)
+ if not process in serverTracker:
+ try:
+ plauncher = processlauncher(script,user)
+ plauncher.start()
+ plauncher.blockUntilProcessCreation()
+ except Exception:
+ traceback.print_exc()
+ logger.warn("Can not launch process for %s %s %s" % (clusterId, component, role))
+ code = -1
+ serverTracker[process] = plauncher
+ commandResult['exitCode'] = code
+ result['commandResult'] = commandResult
+ try:
+ os.chdir(oldDir)
+ except Exception:
+ logger.warn("%s %s %s can not restore environment for START_ACTION." % (clusterId, component, role))
+ return result
+
+ # Stop a process and remove presisted state
+ def stopProcess(self, processKey):
+ global serverTracker
+ keyFragments = processKey.split('/')
+ process = self.getServerKey(keyFragments[0],keyFragments[1],keyFragments[2],keyFragments[3])
+ if process in serverTracker:
+ logger.info ("Sending %s with PID %d the SIGTERM signal" % (process,serverTracker[process].getpid()))
+ killprocessgrp(serverTracker[process].getpid())
+ del serverTracker[process]
+
+ def getServerTracker(self):
+ return serverTracker
+
+ def getServerKey(self,clusterId, clusterDefinitionRevision, component, role):
+ return clusterId+"/"+str(clusterDefinitionRevision)+"/"+component+"/"+role
+
+ def getWorkDir(self, clusterId, role):
+ prefix = AmbariConfig.config.get('stack','installprefix')
+ return str(os.path.join(prefix, clusterId, role))
+
+
+class processlauncher(threading.Thread):
+ def __init__(self,script,uid):
+ threading.Thread.__init__(self)
+ self.script = script
+ self.serverpid = -1
+ self.uid = uid
+ self.out = None
+ self.err = None
+
+ def run(self):
+ try:
+ tempfilename = tempfile.mktemp()
+ noteTempFile(tempfilename)
+ pythoncmd = sys.executable
+ tmp = open(tempfilename, 'w')
+ tmp.write(self.script['script'])
+ tmp.close()
+ threadLocal.uid = self.uid
+ self.cmd = "%s %s %s" % (pythoncmd, tempfilename, " ".join(self.script['param']))
+ logger.info("Launching %s as uid %d" % (self.cmd,self.uid) )
+ p = subprocess.Popen(self.cmd, preexec_fn=self.changeUidAndSetSid, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, shell=True, close_fds=True)
+ logger.info("Launched %s; PID %d" % (self.cmd,p.pid))
+ self.serverpid = p.pid
+ self.out, self.err = p.communicate()
+ self.code = p.wait()
+ logger.info("%s; PID %d exited with code %d \nSTDOUT: %s\nSTDERR %s" %
+ (self.cmd,p.pid,self.code,self.out,self.err))
+ except:
+ logger.warn("Exception encountered while launching : " + self.cmd)
+ traceback.print_exc()
+
+ os.unlink(self.getpidfile())
+ os.unlink(tempfilename)
+
+ def blockUntilProcessCreation(self):
+ self.getpid()
+
+ def getpid(self):
+ sleepCount = 1
+ while (self.serverpid == -1):
+ time.sleep(1)
+ logger.info("Waiting for process %s to start" % self.cmd)
+ if sleepCount > 10:
+ logger.warn("Couldn't start process %s even after %d seconds" % (self.cmd,sleepCount))
+ os._exit(1)
+ return self.serverpid
+
+ def getpidfile(self):
+ prefix = AmbariConfig.config.get('stack','installprefix')
+ pidfile = os.path.join(prefix,str(self.getpid())+".pid")
+ return pidfile
+
+ def changeUidAndSetSid(self):
+ prefix = AmbariConfig.config.get('stack','installprefix')
+ pidfile = os.path.join(prefix,str(os.getpid())+".pid")
+ #TODO remove try/except (when there is a way to provide
+ #config files for testcases). The default config will want
+ #to create files in /var/ambari which may not exist unless
+ #specifically created.
+ #At that point add a testcase for the pid file management.
+ try:
+ f = open(pidfile,'w')
+ f.close()
+ except:
+ logger.warn("Couldn't write pid file %s for %s" % (pidfile,self.cmd))
+ changeUid()
+ os.setsid()
diff --git a/ambari-agent/src/main/python/setup.cfg b/ambari-agent/src/main/python/setup.cfg
new file mode 100644
index 0000000000..a73754ef79
--- /dev/null
+++ b/ambari-agent/src/main/python/setup.cfg
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+[egg_info]
+tag_build =
+tag_date = 0
+tag_svn_revision = 0
diff --git a/ambari-agent/src/main/python/setup.py b/ambari-agent/src/main/python/setup.py
new file mode 100644
index 0000000000..41c2097830
--- /dev/null
+++ b/ambari-agent/src/main/python/setup.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from setuptools import setup
+
+setup(
+ name = "ambari-agent",
+ version = "1.0.3-SNAPSHOT",
+ packages = ['ambari_agent'],
+ # metadata for upload to PyPI
+ author = "Apache Software Foundation",
+ author_email = "ambari-dev@incubator.apache.org",
+ description = "Ambari agent",
+ license = "Apache License v2.0",
+ keywords = "hadoop, ambari",
+ url = "http://incubator.apache.org/ambari",
+ long_description = "This package implements the Ambari agent for installing Hadoop on large clusters.",
+ platforms=["any"],
+ entry_points = {
+ "console_scripts": [
+ "ambari-agent = ambari_agent.main:main",
+ ],
+ }
+)
diff --git a/ambari-agent/src/test/python/TestActionQueue.py b/ambari-agent/src/test/python/TestActionQueue.py
new file mode 100644
index 0000000000..fbfbf24e77
--- /dev/null
+++ b/ambari-agent/src/test/python/TestActionQueue.py
@@ -0,0 +1,94 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+from ambari_agent.ActionQueue import ActionQueue
+from ambari_agent.AmbariConfig import AmbariConfig
+from ambari_agent.FileUtil import getFilePath
+import os, errno, time
+
+class TestActionQueue(TestCase):
+ def test_ActionQueueStartStop(self):
+ actionQueue = ActionQueue(AmbariConfig().getConfig())
+ actionQueue.start()
+ actionQueue.stop()
+ actionQueue.join()
+ self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+
+ def test_RetryAction(self):
+ action={'id' : 'tttt'}
+ config = AmbariConfig().getConfig()
+ actionQueue = ActionQueue(config)
+ path = actionQueue.getInstallFilename(action['id'])
+ configFile = {
+ "data" : "test",
+ "owner" : os.getuid(),
+ "group" : os.getgid() ,
+ "permission" : 0700,
+ "path" : path,
+ "umask" : 022
+ }
+
+ #note that the command in the action is just a listing of the path created
+ #we just want to ensure that 'ls' can run on the data file (in the actual world
+ #this 'ls' would be a puppet or a chef command that would work on a data
+ #file
+ badAction = {
+ 'id' : 'tttt',
+ 'kind' : 'INSTALL_AND_CONFIG_ACTION',
+ 'workDirComponent' : 'abc-hdfs',
+ 'file' : configFile,
+ 'clusterDefinitionRevision' : 12,
+ 'command' : ['/bin/ls',"/foo/bar/badPath1234"]
+ }
+ path=getFilePath(action,path)
+ goodAction = {
+ 'id' : 'tttt',
+ 'kind' : 'INSTALL_AND_CONFIG_ACTION',
+ 'workDirComponent' : 'abc-hdfs',
+ 'file' : configFile,
+ 'clusterDefinitionRevision' : 12,
+ 'command' : ['/bin/ls',path]
+ }
+ actionQueue.start()
+ response = {'actions' : [badAction,goodAction]}
+ actionQueue.maxRetries = 2
+ actionQueue.sleepInterval = 1
+ result = actionQueue.put(response)
+ results = actionQueue.result()
+ sleptCount = 1
+ while (len(results) < 2 and sleptCount < 15):
+ time.sleep(1)
+ sleptCount += 1
+ results = actionQueue.result()
+ actionQueue.stop()
+ actionQueue.join()
+ self.assertEqual(len(results), 2, 'Number of results is not 2.')
+ result = results[0]
+ maxretries = config.get('command', 'maxretries')
+ self.assertEqual(int(result['retryActionCount']),
+ int(maxretries),
+ "Number of retries is %d and not %d" %
+ (int(result['retryActionCount']), int(str(maxretries))))
+ result = results[1]
+ self.assertEqual(int(result['retryActionCount']),
+ 1,
+ "Number of retries is %d and not %d" %
+ (int(result['retryActionCount']), 1))
diff --git a/ambari-agent/src/test/python/TestAgentActions.py b/ambari-agent/src/test/python/TestAgentActions.py
new file mode 100644
index 0000000000..e844bd9cb1
--- /dev/null
+++ b/ambari-agent/src/test/python/TestAgentActions.py
@@ -0,0 +1,102 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+import os, errno, getpass
+from ambari_agent.ActionQueue import ActionQueue
+from ambari_agent.AmbariConfig import AmbariConfig
+from ambari_agent.FileUtil import getFilePath
+from ambari_agent import shell
+from ambari_agent.shell import serverTracker
+import time
+
+class TestAgentActions(TestCase):
+ def test_installAndConfigAction(self):
+ action={'id' : 'tttt'}
+ actionQueue = ActionQueue(AmbariConfig().getConfig())
+ path = actionQueue.getInstallFilename(action['id'])
+ configFile = {
+ "data" : "test",
+ "owner" : os.getuid(),
+ "group" : os.getgid() ,
+ "permission" : 0700,
+ "path" : path,
+ "umask" : 022
+ }
+
+ #note that the command in the action is just a listing of the path created
+ #we just want to ensure that 'ls' can run on the data file (in the actual world
+ #this 'ls' would be a puppet or a chef command that would work on a data
+ #file
+ path=getFilePath(action,path)
+ action = {
+ 'id' : 'tttt',
+ 'kind' : 'INSTALL_AND_CONFIG_ACTION',
+ 'workDirComponent' : 'abc-hdfs',
+ 'file' : configFile,
+ 'clusterDefinitionRevision' : 12,
+ 'command' : ['/bin/ls',path]
+ }
+ result = { }
+ actionQueue = ActionQueue(AmbariConfig().getConfig())
+ result = actionQueue.installAndConfigAction(action)
+ cmdResult = result['commandResult']
+ self.assertEqual(cmdResult['exitCode'], 0, "installAndConfigAction test failed. Returned %d " % cmdResult['exitCode'])
+ self.assertEqual(cmdResult['output'], path + "\n", "installAndConfigAction test failed Returned %s " % cmdResult['output'])
+
+ def test_startAndStopAction(self):
+ command = {'script' : 'import os,sys,time\ni = 0\nwhile (i < 1000):\n print "testhello"\n sys.stdout.flush()\n time.sleep(1)\n i+=1',
+ 'param' : ''}
+ action={'id' : 'ttt',
+ 'kind' : 'START_ACTION',
+ 'clusterId' : 'foobar',
+ 'clusterDefinitionRevision' : 1,
+ 'component' : 'foocomponent',
+ 'role' : 'foorole',
+ 'command' : command,
+ 'user' : getpass.getuser()
+ }
+
+ actionQueue = ActionQueue(AmbariConfig().getConfig())
+ result = actionQueue.startAction(action)
+ cmdResult = result['commandResult']
+ self.assertEqual(cmdResult['exitCode'], 0, "starting a process failed")
+ shell = actionQueue.getshellinstance()
+ key = shell.getServerKey(action['clusterId'],action['clusterDefinitionRevision'],
+ action['component'],action['role'])
+ keyPresent = True
+ if not key in serverTracker:
+ keyPresent = False
+ self.assertEqual(keyPresent, True, "Key not present")
+ plauncher = serverTracker[key]
+ self.assertTrue(plauncher.getpid() > 0, "Pid less than 0!")
+ time.sleep(5)
+ shell.stopProcess(key)
+ keyPresent = False
+ if key in serverTracker:
+ keyPresent = True
+ self.assertEqual(keyPresent, False, "Key present")
+ processexists = True
+ try:
+ os.kill(serverTracker[key].getpid(),0)
+ except:
+ processexists = False
+ self.assertEqual(processexists, False, "Process still exists!")
+ self.assertTrue("testhello" in plauncher.out, "Output doesn't match!")
diff --git a/ambari-agent/src/test/python/TestFileUtil.py b/ambari-agent/src/test/python/TestFileUtil.py
new file mode 100644
index 0000000000..53e55c58d5
--- /dev/null
+++ b/ambari-agent/src/test/python/TestFileUtil.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+from ambari_agent.FileUtil import writeFile, createStructure, deleteStructure
+import os, errno
+
+class TestFileUtil(TestCase):
+ def test_createStructure(self):
+ action = { 'clusterId' : 'abc', 'role' : 'hdfs', 'workDirComponent' : 'abc-hdfs' }
+ result = {}
+ result = createStructure(action, result)
+ self.assertEqual(result['exitCode'], 0, 'Create cluster structure failed.')
+
+# def test_writeFile(self):
+ configFile = {
+ "data" : "test",
+ "owner" : os.getuid(),
+ "group" : os.getgid() ,
+ "permission" : 0700,
+ "path" : "/tmp/ambari_file_test/_file_write_test",
+ "umask" : 022
+ }
+ action = {
+ 'clusterId' : 'abc',
+ 'role' : 'hdfs',
+ 'workDirComponent' : 'abc-hdfs',
+ 'file' : configFile
+ }
+ result = { }
+ result = writeFile(action, result)
+ self.assertEqual(result['exitCode'], 0, 'WriteFile test with uid/gid failed.')
+
+# def test_deleteStructure(self):
+ result = { }
+ action = { 'clusterId' : 'abc', 'role' : 'hdfs', 'workDirComponent' : 'abc-hdfs' }
+ result = deleteStructure(action, result)
+ self.assertEqual(result['exitCode'], 0, 'Delete cluster structure failed.')
+
diff --git a/ambari-agent/src/test/python/TestHardware.py b/ambari-agent/src/test/python/TestHardware.py
new file mode 100644
index 0000000000..68ee8b2bcc
--- /dev/null
+++ b/ambari-agent/src/test/python/TestHardware.py
@@ -0,0 +1,30 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+from ambari_agent.Hardware import Hardware
+
+class TestHardware(TestCase):
+ def test_build(self):
+ hardware = Hardware()
+ result = hardware.get()
+ self.assertTrue(result['coreCount'] >= 1)
+ self.assertTrue(result['netSpeed'] != None)
+
diff --git a/ambari-agent/src/test/python/TestHeartbeat.py b/ambari-agent/src/test/python/TestHeartbeat.py
new file mode 100644
index 0000000000..a596791a6d
--- /dev/null
+++ b/ambari-agent/src/test/python/TestHeartbeat.py
@@ -0,0 +1,32 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+from ambari_agent.Heartbeat import Heartbeat
+from ambari_agent.ActionQueue import ActionQueue
+from ambari_agent.AmbariConfig import AmbariConfig
+import socket
+
+class TestHeartbeat(TestCase):
+ def test_build(self):
+ actionQueue = ActionQueue(AmbariConfig().getConfig())
+ heartbeat = Heartbeat(actionQueue)
+ result = heartbeat.build(100)
+ \ No newline at end of file
diff --git a/ambari-agent/src/test/python/TestServerStatus.py b/ambari-agent/src/test/python/TestServerStatus.py
new file mode 100644
index 0000000000..8d09037e68
--- /dev/null
+++ b/ambari-agent/src/test/python/TestServerStatus.py
@@ -0,0 +1,29 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+from ambari_agent.ServerStatus import ServerStatus
+
+class TestServerStatus(TestCase):
+ def test_build(self):
+ serverStatus = ServerStatus()
+ result = serverStatus.build()
+ self.assertEqual(result, [], 'List of running servers should be 0.')
+
diff --git a/ambari-agent/src/test/python/unitTests.py b/ambari-agent/src/test/python/unitTests.py
new file mode 100644
index 0000000000..233034bd95
--- /dev/null
+++ b/ambari-agent/src/test/python/unitTests.py
@@ -0,0 +1,51 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import unittest
+import doctest
+
+class TestAgent(unittest.TestSuite):
+ def run(self, result):
+ run = unittest.TestSuite.run
+ run(self, result)
+ return result
+
+def all_tests_suite():
+ suite = unittest.TestLoader().loadTestsFromNames([
+ 'TestHeartbeat',
+ 'TestHardware',
+ 'TestServerStatus',
+ 'TestFileUtil',
+ 'TestActionQueue',
+ 'TestAmbariComponent',
+ 'TestAgentActions'
+ ])
+ return TestAgent([suite])
+
+def main():
+ runner = unittest.TextTestRunner()
+ suite = all_tests_suite()
+ raise SystemExit(not runner.run(suite).wasSuccessful())
+
+if __name__ == '__main__':
+ import os
+ import sys
+ sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+ main()