diff options
author | Mahadev Konar <mahadev@apache.org> | 2012-09-06 07:42:18 +0000 |
---|---|---|
committer | Mahadev Konar <mahadev@apache.org> | 2012-09-06 07:42:18 +0000 |
commit | 0d08e61db8bc902b8f99ff4c526379b3a968144d (patch) | |
tree | 007fab72e72a71f8c5f87c80dc0d1bd03953aab9 /ambari-agent | |
parent | 073c43ab2501a9ff3494c73ce49f17e0a2372356 (diff) |
AMBARI-702. Add skeleton for Ambari agent that talks to the server and collects information for host. (mahadev)
git-svn-id: https://svn.apache.org/repos/asf/incubator/ambari/branches/AMBARI-666@1381493 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'ambari-agent')
26 files changed, 2244 insertions, 18 deletions
diff --git a/ambari-agent/pom.xml b/ambari-agent/pom.xml index ef130ec1ed..ba8504a59e 100644 --- a/ambari-agent/pom.xml +++ b/ambari-agent/pom.xml @@ -1,5 +1,6 @@ <?xml version="1.0"?> -<!-- +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. @@ -15,28 +16,91 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.apache.ambari</groupId> <artifactId>ambari-agent</artifactId> - <version>0.10.0-SNAPSHOT</version> - <name>ambari-agent</name> - <url>http://maven.apache.org</url> - <parent> - <groupId>org.apache.ambari</groupId> - <version>0.10.0-SNAPSHOT</version> - <artifactId>ambari-project</artifactId> - <relativePath>../ambari-project</relativePath> - </parent> - <dependencies> - <!-- TEST SCOPE DEPENDENCIES --> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </dependency> - </dependencies> + <packaging>pom</packaging> + <version>1.0.3-SNAPSHOT</version> + <name>agent</name> + <description>Ambari Agent</description> + <properties> + <final.name>${project.artifactId}-${project.version}</final.name> + <package.release>1</package.release> + <package.prefix>/usr</package.prefix> + <package.conf.dir>/etc/ambari</package.conf.dir> + <package.log.dir>/var/log/ambari</package.log.dir> + <package.pid.dir>/var/run/ambari</package.pid.dir> + </properties> <build> <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <tarLongFileMode>gnu</tarLongFileMode> + <descriptors> + <descriptor>src/packages/tarball/all.xml</descriptor> + </descriptors> + </configuration> + <executions> + <execution> + <id>build-tarball</id> + <phase>prepare-package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2</version> + <executions> + <execution> + <configuration> + <executable>python2.6</executable> + <workingDirectory>src/test/python</workingDirectory> + <arguments> + <argument>unitTests.py</argument> + </arguments> + <environmentVariables> + <PYTHONPATH>../../main/python:$PYTHONPATH</PYTHONPATH> + </environmentVariables> + <skip>true</skip> + </configuration> + <id>python-test</id> + <phase>test</phase> + <goals> + <goal>exec</goal> + </goals> + </execution> + <execution> + <configuration> + <executable>python2.6</executable> + <workingDirectory>target/ambari-agent-${project.version}</workingDirectory> + <arguments> + <argument>${project.basedir}/src/main/python/setup.py</argument> + <argument>clean</argument> + <argument>bdist_dumb</argument> + </arguments> + <environmentVariables> + <PYTHONPATH>target/ambari-agent-${project.version}:$PYTHONPATH</PYTHONPATH> + </environmentVariables> + </configuration> + <id>python-package</id> + <phase>package</phase> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> + <extensions> + <extension> + <groupId>org.apache.maven.wagon</groupId> + <artifactId>wagon-ssh-external</artifactId> + </extension> + </extensions> </build> </project> diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py new file mode 100644 index 0000000000..231fa82474 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +import traceback +import logging.handlers +import Queue +import threading +import AmbariConfig +from shell import shellRunner +from FileUtil import writeFile, createStructure, deleteStructure, getFilePath, appendToFile +from shell import shellRunner +import json +import os +import time +import subprocess +import copy + +logger = logging.getLogger() +installScriptHash = -1 + +class ActionQueue(threading.Thread): + global q, r, clusterId, clusterDefinitionRevision + q = Queue.Queue() + r = Queue.Queue() + clusterId = 'unknown' + clusterDefinitionRevision = 0 + + def __init__(self, config): + global clusterId, clusterDefinitionRevision + super(ActionQueue, self).__init__() + #threading.Thread.__init__(self) + self.config = config + self.sh = shellRunner() + self._stop = threading.Event() + self.maxRetries = config.getint('command', 'maxretries') + self.sleepInterval = config.getint('command', 'sleepBetweenRetries') + + def stop(self): + self._stop.set() + + def stopped(self): + return self._stop.isSet() + + #For unittest + def getshellinstance(self): + return self.sh + + def put(self, response): + if 'actions' in response: + actions = response['actions'] + logger.debug(actions) + # for the servers, take a diff of what's running, and what the controller + # asked the agent to start. Kill all those servers that the controller + # didn't ask us to start + sh = shellRunner() + runningServers = sh.getServerTracker() + + # get the list of servers the controller wants running + serversToRun = {} + for action in actions: + if action['kind'] == 'START_ACTION': + processKey = sh.getServerKey(action['clusterId'],action['clusterDefinitionRevision'], + action['component'], action['role']) + serversToRun[processKey] = 1 + + # create stop actions for the servers that the controller wants stopped + for server in runningServers.keys(): + if server not in serversToRun: + sh.stopProcess(server) + # now put all the actions in the queue. The ordering is important (we stopped + # all unneeded servers first) + for action in actions: + q.put(action) + + def run(self): + global clusterId, clusterDefinitionRevision + while not self.stopped(): + while not q.empty(): + action = q.get() + switches = { + 'START_ACTION' : self.startAction, + 'RUN_ACTION' : self.runAction, + 'CREATE_STRUCTURE_ACTION' : self.createStructureAction, + 'DELETE_STRUCTURE_ACTION' : self.deleteStructureAction, + 'WRITE_FILE_ACTION' : self.writeFileAction, + 'INSTALL_AND_CONFIG_ACTION' : self.installAndConfigAction, + 'NO_OP_ACTION' : self.noOpAction + } + + exitCode = 1 + retryCount = 1 + while (exitCode != 0 and retryCount <= self.maxRetries): + result={} + try: + #pass a copy of action since we don't want anything to change in the + #action dict + actionCopy = copy.copy(action) + result = switches.get(action['kind'], self.unknownAction)(actionCopy) + if ('commandResult' in result): + commandResult = result['commandResult'] + exitCode = commandResult['exitCode'] + if (exitCode == 0): + break + else: + logger.warn(str(action) + " exited with code " + str(exitCode)) + else: + #Really, no commandResult? Is this possible? + #TODO: check + exitCode = 0 + break + except Exception, err: + traceback.print_exc() + logger.warn(err) + if ('commandResult' in result): + commandResult = result['commandResult'] + if ('exitCode' in commandResult): + exitCode = commandResult['exitCode'] + else: + exitCode = 1 + else: + result['commandResult'] = {'exitCode': 1, 'output':"", 'error':""} + + #retry in some time + logger.warn("Retrying %s in %d seconds" % (str(action),self.sleepInterval)) + time.sleep(self.sleepInterval) + retryCount += 1 + + if (exitCode != 0): + result['exitCode']=exitCode + result['retryActionCount'] = retryCount - 1 + else: + result['retryActionCount'] = retryCount + # Update the result + r.put(result) + if not self.stopped(): + time.sleep(5) + + # Store action result to agent response queue + def result(self): + result = [] + while not r.empty(): + result.append(r.get()) + return result + + # Generate default action response + def genResult(self, action): + result={} + if (action['kind'] == 'INSTALL_AND_CONFIG_ACTION' or action['kind'] == 'NO_OP_ACTION'): + result = { + 'id' : action['id'], + 'kind' : action['kind'], + } + else: + result = { + 'id' : action['id'], + 'clusterId' : action['clusterId'], + 'kind' : action['kind'], + 'clusterDefinitionRevision' : action['clusterDefinitionRevision'], + 'componentName' : action['component'], + 'role' : action['role'] + } + return result + + # Run start action, start a server process and + # track the liveness of the children process + def startAction(self, action): + result = self.genResult(action) + return self.sh.startProcess(action['clusterId'], + action['clusterDefinitionRevision'], + action['component'], + action['role'], + action['command'], + action['user'], result) + + # Write file action + def writeFileAction(self, action, fileName=""): + result = self.genResult(action) + return writeFile(action, result, fileName) + + # get the install file + def getInstallFilename(self,id): + return "ambari-install-file-"+id + + # Install and configure action + def installAndConfigAction(self, action): + global installScriptHash + r=self.genResult(action) + w = self.writeFileAction(action,self.getInstallFilename(action['id'])) + commandResult = {} + if w['exitCode']!=0: + commandResult['error'] = w['stderr'] + commandResult['exitCode'] = w['exitCode'] + r['commandResult'] = commandResult + return r + + if 'command' not in action: + # this is hardcoded to do puppet specific stuff for now + # append the content of the puppet file to the file written above + filepath = getFilePath(action,self.getInstallFilename(action['id'])) + logger.info("File path for puppet top level script: " + filepath) + p = self.sh.run(['/bin/cat',AmbariConfig.config.get('puppet','driver')]) + if p['exitCode']!=0: + commandResult['error'] = p['error'] + commandResult['exitCode'] = p['exitCode'] + r['commandResult'] = commandResult + return r + logger.debug("The contents of the static file " + p['output']) + appendToFile(p['output'],filepath) + arr = [AmbariConfig.config.get('puppet','commandpath') , filepath] + logger.debug(arr) + action['command'] = arr + logger.debug(action['command']) + commandResult = self.sh.run(action['command']) + logger.debug("PUPPET COMMAND OUTPUT: " + commandResult['output']) + logger.debug("PUPPET COMMAND ERROR: " + commandResult['error']) + if commandResult['exitCode'] == 0: + installScriptHash = action['id'] + r['commandResult'] = commandResult + return r + + # Run command action + def runAction(self, action): + result = self.genResult(action) + return self.sh.runAction(action['clusterId'], + action['component'], + action['role'], + action['user'], + action['command'], + action['cleanUpCommand'], result) + + # Create directory structure for cluster + def createStructureAction(self, action): + result = self.genResult(action) + result['exitCode'] = 0 + return createStructure(action, result) + + # Delete directory structure for cluster + def deleteStructureAction(self, action): + result = self.genResult(action) + result['exitCode'] = 0 + return deleteStructure(action, result) + + def noOpAction(self, action): + r = {'id' : action['id']} + return r + + # Handle unknown action + def unknownAction(self, action): + logger.error('Unknown action: %s' % action['id']) + result = { 'id': action['id'] } + return result + + # Discover agent idle state + def isIdle(self): + return q.empty() + + # Get the hash of the script currently used for install/config + def getInstallScriptHash(self): + return installScriptHash diff --git a/ambari-agent/src/main/python/ambari_agent/ActionResults.py b/ambari-agent/src/main/python/ambari_agent/ActionResults.py new file mode 100644 index 0000000000..7603fa1bb6 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/ActionResults.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +import logging.handlers +import Queue +import ActionQueue + +logger = logging.getLogger() + +class ActionResults: + global r + + # Build action results list from memory queue + def build(self): + results = [] + while not ActionQueue.r.empty(): + result = { + 'clusterId': 'unknown', + 'id' : 'action-001', + 'kind' : 'STOP_ACTION', + 'commandResults' : [], + 'cleanUpCommandResults' : [], + 'serverName' : 'hadoop.datanode' + } + results.append(result) + logger.info(results) + return results + +def main(argv=None): + ar = ActionResults() + print ar.build() + +if __name__ == '__main__': + main() diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py new file mode 100644 index 0000000000..96041970e7 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +import logging.handlers +import ConfigParser +import StringIO + +config = ConfigParser.RawConfigParser() +content = """ +[server] +url=http://localhost:4080 + +[agent] +prefix=/tmp/ambari + +[stack] +installprefix=/var/ambari/ + +[puppet] +puppet_home=/usr/local/bin +facter_home=/usr/local/bin + +[command] +maxretries=2 +sleepBetweenRetries=1 + +""" +s = StringIO.StringIO(content) +config.readfp(s) + +class AmbariConfig: + def getConfig(self): + global config + return config + +def setConfig(customConfig): + global config + config = customConfig + +def main(): + print config + +if __name__ == "__main__": + main() diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py new file mode 100644 index 0000000000..30ea9d77ab --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +import logging.handlers +import signal +import json +import socket +import sys, traceback +import time +import threading +import urllib2 +from urllib2 import Request, urlopen, URLError +import AmbariConfig +from Heartbeat import Heartbeat +from ActionQueue import ActionQueue +from optparse import OptionParser +from wsgiref.simple_server import ServerHandler + +logger = logging.getLogger() + +class Controller(threading.Thread): + + def __init__(self, config): + threading.Thread.__init__(self) + logger.debug('Initializing Controller RPC thread.') + self.lock = threading.Lock() + self.safeMode = True + self.credential = None + self.config = config + #Disabled security until we have fix for AMBARI-157 + #if(config.get('controller', 'user')!=None and config.get('controller', 'password')!=None): + # self.credential = { 'user' : config.get('controller', 'user'), + # 'password' : config.get('controller', 'password') + # } + self.url = config.get('server', 'url') + '/agent/heartbeat/' + socket.gethostname() + + def start(self): + self.actionQueue = ActionQueue(self.config) + self.actionQueue.start() + self.heartbeat = Heartbeat(self.actionQueue) + + def __del__(self): + logger.info("Server connection disconnected.") + + def run(self): + id='-1' + opener = urllib2.build_opener() + urllib2.install_opener(opener) + retry=False + firstTime=True + while True: + try: + if retry==False: + data = json.dumps(self.heartbeat.build(id)) + logger.debug(data) + req = urllib2.Request(self.url, data, {'Content-Type': 'application/json'}) + f = urllib2.urlopen(req) + response = f.read() + f.close() + data = json.loads(response) + id=int(data['responseId']) + self.actionQueue.put(data) + if retry==True or firstTime==True: + logger.info("Controller connection established") + firstTime=False + retry=False + except Exception, err: + retry=True + if "code" in err: + logger.error(err.code) + else: + logger.error("Unable to connect to: "+self.url,exc_info=True) + if self.actionQueue.isIdle(): + time.sleep(30) + else: + time.sleep(1) + +def main(argv=None): + # Allow Ctrl-C + signal.signal(signal.SIGINT, signal.SIG_DFL) + + logger.setLevel(logging.INFO) + formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - %(message)s") + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + logger.addHandler(stream_handler) + + logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv)) + + config = AmbariConfig.config + collector = Controller(config) + collector.start() + collector.run() + +if __name__ == '__main__': + main() diff --git a/ambari-agent/src/main/python/ambari_agent/DaemonHandler.py b/ambari-agent/src/main/python/ambari_agent/DaemonHandler.py new file mode 100644 index 0000000000..b72662165b --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/DaemonHandler.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import simplejson +import web +from mimerender import mimerender +from Runner import Runner + +render_json = lambda **args: simplejson.dumps(args) + +class DaemonHandler: + @mimerender( + default = 'json', + json = render_json + ) + + def GET(self, cmd, daemonName): + data = [] + data['cmd']=cmd + data['daemonName']=daemonName + dispatcher = Runner() + return dispatcher.run(data) + + def POST(self, cmd): + web.header('Content-Type','application/json') + data = web.data(); + jsonp = simplejson.loads(data) + jsonp['cmd']=cmd + dispatcher = Runner() + return dispatcher.run(jsonp) + diff --git a/ambari-agent/src/main/python/ambari_agent/FileUtil.py b/ambari-agent/src/main/python/ambari_agent/FileUtil.py new file mode 100644 index 0000000000..f24046baf5 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/FileUtil.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from pwd import getpwnam +from grp import getgrnam +import logging +import logging.handlers +import getpass +import os, errno +import sys, traceback +import ConfigParser +import shutil +import StringIO +import AmbariConfig + +logger = logging.getLogger() + +def getFilePath(action, fileName=""): + #Change the method signature to take the individual action fields + pathComp="" + if 'clusterId' in action: + pathComp = action['clusterId'] + if 'role' in action: + pathComp = pathComp + "-" + action['role'] + path = os.path.join(AmbariConfig.config.get('agent','prefix'), + "clusters", + pathComp) + fullPathName="" + if fileName != "": + fullPathName=os.path.join(path, fileName) + else: + fileInfo = action['file'] + fullPathName=os.path.join(path, fileInfo['path']) + return fullPathName + +def appendToFile(data,absolutePath): + f = open(absolutePath, 'a') + f.write(data) + f.close() + +def writeFile(action, result, fileName=""): + fileInfo = action['file'] + pathComp="" + if 'clusterId' in action: + pathComp = action['clusterId'] + if 'role' in action: + pathComp = pathComp + "-" + action['role'] + try: + path = os.path.join(AmbariConfig.config.get('agent','prefix'), + "clusters", + pathComp) + user=getpass.getuser() + if 'owner' in fileInfo: + user=fileInfo['owner'] + group=os.getgid() + if 'group' in fileInfo: + group=fileInfo['group'] + fullPathName="" + if fileName != "": + fullPathName=os.path.join(path, fileName) + else: + fullPathName=os.path.join(path, fileInfo['path']) + logger.debug("path in writeFile: %s" % fullPathName) + content=fileInfo['data'] + try: + if isinstance(user, int)!=True: + user=getpwnam(user)[2] + if isinstance(group, int)!=True: + group=getgrnam(group)[2] + except Exception: + logger.warn("can not find user uid/gid: (%s/%s) for writing %s" % (user, group, fullPathName)) + if 'permission' in fileInfo: + if fileInfo['permission'] is not None: + permission=fileInfo['permission'] + else: + permission=0750 + oldMask = os.umask(0) + if 'umask' in fileInfo: + if fileInfo['umask'] is not None: + umask=int(fileInfo['umask']) + else: + umask=oldMask + os.umask(int(umask)) + prefix = os.path.dirname(fullPathName) + try: + os.makedirs(prefix) + except OSError as err: + if err.errno == errno.EEXIST: + pass + else: + raise + f = open(fullPathName, 'w') + f.write(content) + f.close() + if os.getuid()==0: + os.chmod(fullPathName, permission) + os.chown(fullPathName, user, group) + os.umask(oldMask) + result['exitCode'] = 0 + except Exception, err: + traceback.print_exc() + result['exitCode'] = 1 + result['error'] = traceback.format_exc() + return result + +def createStructure(action, result): + try: + workdir = action['workDirComponent'] + path = AmbariConfig.config.get('agent','prefix')+"/clusters/"+workdir + shutil.rmtree(path, 1) + os.makedirs(path+"/stack") + os.makedirs(path+"/logs") + os.makedirs(path+"/data") + os.makedirs(path+"/pkgs") + os.makedirs(path+"/config") + result['exitCode'] = 0 + except Exception, err: + traceback.print_exc() + result['exitCode'] = 1 + result['error'] = traceback.format_exc() + return result + +def deleteStructure(action, result): + try: + workdir = action['workDirComponent'] + path = AmbariConfig.config.get('agent','prefix')+"/clusters/"+workdir + if os.path.exists(path): + shutil.rmtree(path) + result['exitCode'] = 0 + except Exception, err: + result['exitCode'] = 1 + result['error'] = traceback.format_exc() + return result + +def main(): + + action = { 'clusterId' : 'abc', 'role' : 'hdfs' } + result = {} + print createStructure(action, result) + + configFile = { + "data" : "test", + "owner" : os.getuid(), + "group" : os.getgid() , + "permission" : 0700, + "path" : "/tmp/ambari_file_test/_file_write_test", + "umask" : 022 + } + action = { 'file' : configFile } + result = { } + print writeFile(action, result) + + configFile = { + "data" : "test", + "owner" : "eyang", + "group" : "staff", + "permission" : "0700", + "path" : "/tmp/ambari_file_test/_file_write_test", + "umask" : "022" + } + result = { } + action = { 'file' : configFile } + print writeFile(action, result) + + print deleteStructure(action, result) + +if __name__ == "__main__": + main() diff --git a/ambari-agent/src/main/python/ambari_agent/Hardware.py b/ambari-agent/src/main/python/ambari_agent/Hardware.py new file mode 100644 index 0000000000..d3a4774cb8 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/Hardware.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from shell import shellRunner +import multiprocessing +import platform +import AmbariConfig + +class Hardware: + def __init__(self): + facterHome = AmbariConfig.config.get('puppet', 'facter_home') + + self.hardware = { 'coreCount' : 4, + 'cpuSpeed' : 4, + 'cpuFlag' : 4, + 'diskCount' : 3, + 'netSpeed' : 3, + 'ramSize' : 2 + } + + def get(self): + return self.hardware + + +def main(argv=None): + hardware = Hardware() + print hardware.get() + +if __name__ == '__main__': + main() diff --git a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py new file mode 100644 index 0000000000..1b22249263 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import json +from Hardware import Hardware +from ActionQueue import ActionQueue +from ServerStatus import ServerStatus +import socket +import time + +firstContact = True +class Heartbeat: + + def __init__(self, actionQueue): + self.actionQueue = actionQueue + self.hardware = Hardware() + + def build(self, id='-1'): + global clusterId, clusterDefinitionRevision, firstContact + serverStatus = ServerStatus() + timestamp = int(time.time()*1000) + queueResult = self.actionQueue.result() + installedRoleStates = serverStatus.build() + heartbeat = { 'responseId' : int(id), + 'timestamp' : timestamp, + 'hostname' : socket.gethostname(), + 'hardwareProfile' : self.hardware.get(), + 'idle' : self.actionQueue.isIdle(), + 'installScriptHash' : self.actionQueue.getInstallScriptHash(), + 'firstContact' : firstContact + } + if len(queueResult)!=0: + heartbeat['actionResults'] = queueResult + if len(installedRoleStates)!=0: + heartbeat['installedRoleStates'] = installedRoleStates + firstContact = False + return heartbeat + +def main(argv=None): + actionQueue = ActionQueue() + heartbeat = Heartbeat(actionQueue) + print json.dumps(heartbeat.build()) + +if __name__ == '__main__': + main() diff --git a/ambari-agent/src/main/python/ambari_agent/PackageHandler.py b/ambari-agent/src/main/python/ambari_agent/PackageHandler.py new file mode 100644 index 0000000000..be05575b47 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/PackageHandler.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import simplejson +import web +from mimerender import mimerender +from Runner import Runner + +render_json = lambda **args: simplejson.dumps(args) + +class PackageHandler: + @mimerender( + default = 'json', + json = render_json + ) + + def GET(self, cmd, packageName): + data = [] + data['cmd'] = cmd + data['package'] = { "name" : packageName } + dispatcher = Runner() + return dispatcher.run(data) + + def POST(self, cmd): + web.header('Content-Type','application/json') + data = web.data() + jsonp = simplejson.loads(data) + jsonp['cmd']=cmd + dispatcher = Runner() + return dispatcher.run(jsonp) + diff --git a/ambari-agent/src/main/python/ambari_agent/Runner.py b/ambari-agent/src/main/python/ambari_agent/Runner.py new file mode 100644 index 0000000000..4b86381095 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/Runner.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import threading +from daemon import daemonRunner +from package import packageRunner +from shell import shellRunner + +class Runner(threading.Thread): + """ Runs the actions coming from the server """ + __instance = None + lock = None + def __init__(self): + if Runner.__instance is None: + Runner.lock = threading.RLock() + Runner.__instance = self + + def run(self, data): + Runner.lock.acquire() + try: + if data['actionType']=='info': + ph = packageRunner() + result = ph.info(data['packages']) + elif data['actionType']=='install': + ph = packageRunner() + if 'dry-run' in data: + opt = data['dry-run'] + else: + opt = 'false' + result = ph.install(data['packages'], opt) + elif data['actionType']=='remove': + ph = packageRunner() + if 'dry-run' in data: + opt = data['dry-run'] + else: + opt = 'false' + result = ph.remove(data['packages'], opt) + elif data['actionType']=='status': + dh = daemonRunner() + result = dh.status(data['daemonName']) + elif data['actionType']=='start': + dh = daemonRunner() + result = dh.start(data['daemonName']) + elif data['actionType']=='stop': + dh = daemonRunner() + result = dh.stop(data['daemonName']) + return result + finally: + Runner.lock.release() diff --git a/ambari-agent/src/main/python/ambari_agent/ServerStatus.py b/ambari-agent/src/main/python/ambari_agent/ServerStatus.py new file mode 100644 index 0000000000..53a0a9a722 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/ServerStatus.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from shell import shellRunner +import logging +import logging.handlers + +logger = logging.getLogger() +global serverTracker + +class ServerStatus: + def build(self): + sh = shellRunner() + list = [] + servers = sh.getServerTracker() + for server in servers: + (clusterId, clusterDefinitionRevision, component, role) = server.split("/") + result = { + 'clusterId' : clusterId, + 'clusterDefinitionRevision' : clusterDefinitionRevision, + 'componentName' : component, + 'roleName' : role, + 'serverStatus' : 'STARTED' + } + list.append(result) + return list + +def main(argv=None): + serverStatus = ServerStatus() + print serverStatus.build() + +if __name__ == '__main__': + main() diff --git a/ambari-agent/src/main/python/ambari_agent/__init__.py b/ambari-agent/src/main/python/ambari_agent/__init__.py new file mode 100644 index 0000000000..3bfb534323 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/__init__.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python2.6 +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Ambari Agent + +""" + +from __future__ import generators + +__version__ = "0.1.0" +__author__ = [ + "Eric Yang <eyang@apache.org>", + "Kan Zhang <kanzhangmail@yahoo.com>" +] +__license__ = "Apache License v2.0" +__contributors__ = "see http://incubator.apache.org/ambari/contributors" + +import logging +import logging.handlers +import threading +import sys +import time +import signal + diff --git a/ambari-agent/src/main/python/ambari_agent/createDaemon.py b/ambari-agent/src/main/python/ambari_agent/createDaemon.py new file mode 100644 index 0000000000..764211ca00 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/createDaemon.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +"""Disk And Execution MONitor (Daemon) + +Configurable daemon behaviors: + + 1.) The current working directory set to the "/" directory. + 2.) The current file creation mode mask set to 0. + 3.) Close all open files (1024). + 4.) Redirect standard I/O streams to "/dev/null". + +A failed call to fork() now raises an exception. + +References: + 1) Advanced Programming in the Unix Environment: W. Richard Stevens + 2) Unix Programming Frequently Asked Questions: + http://www.erlenstar.demon.co.uk/unix/faq_toc.html +""" + +__author__ = "Chad J. Schroeder" +__copyright__ = "Copyright (C) 2005 Chad J. Schroeder" + +__revision__ = "$Id$" +__version__ = "0.2" + +# Standard Python modules. +import os # Miscellaneous OS interfaces. +import sys # System-specific parameters and functions. + +# Default daemon parameters. +# File mode creation mask of the daemon. +UMASK = 0022 + +# Default working directory for the daemon. +WORKDIR = "/" + +# Default maximum for the number of available file descriptors. +MAXFD = 1024 + +# The standard I/O file descriptors are redirected to /dev/null by default. +if (hasattr(os, "devnull")): + REDIRECT_TO = os.devnull +else: + REDIRECT_TO = "/dev/null" + +def createDaemon(): + """Detach a process from the controlling terminal and run it in the + background as a daemon. + """ + + try: + # Fork a child process so the parent can exit. This returns control to + # the command-line or shell. It also guarantees that the child will not + # be a process group leader, since the child receives a new process ID + # and inherits the parent's process group ID. This step is required + # to insure that the next call to os.setsid is successful. + pid = os.fork() + except OSError, e: + raise Exception, "%s [%d]" % (e.strerror, e.errno) + + if (pid == 0): # The first child. + # To become the session leader of this new session and the process group + # leader of the new process group, we call os.setsid(). The process is + # also guaranteed not to have a controlling terminal. + os.setsid() + + # Is ignoring SIGHUP necessary? + # + # It's often suggested that the SIGHUP signal should be ignored before + # the second fork to avoid premature termination of the process. The + # reason is that when the first child terminates, all processes, e.g. + # the second child, in the orphaned group will be sent a SIGHUP. + # + # "However, as part of the session management system, there are exactly + # two cases where SIGHUP is sent on the death of a process: + # + # 1) When the process that dies is the session leader of a session that + # is attached to a terminal device, SIGHUP is sent to all processes + # in the foreground process group of that terminal device. + # 2) When the death of a process causes a process group to become + # orphaned, and one or more processes in the orphaned group are + # stopped, then SIGHUP and SIGCONT are sent to all members of the + # orphaned group." [2] + # + # The first case can be ignored since the child is guaranteed not to have + # a controlling terminal. The second case isn't so easy to dismiss. + # The process group is orphaned when the first child terminates and + # POSIX.1 requires that every STOPPED process in an orphaned process + # group be sent a SIGHUP signal followed by a SIGCONT signal. Since the + # second child is not STOPPED though, we can safely forego ignoring the + # SIGHUP signal. In any case, there are no ill-effects if it is ignored. + # + # import signal # Set handlers for asynchronous events. + # signal.signal(signal.SIGHUP, signal.SIG_IGN) + + try: + # Fork a second child and exit immediately to prevent zombies. This + # causes the second child process to be orphaned, making the init + # process responsible for its cleanup. And, since the first child is + # a session leader without a controlling terminal, it's possible for + # it to acquire one by opening a terminal in the future (System V- + # based systems). This second fork guarantees that the child is no + # longer a session leader, preventing the daemon from ever acquiring + # a controlling terminal. + pid = os.fork() # Fork a second child. + except OSError, e: + raise Exception, "%s [%d]" % (e.strerror, e.errno) + + if (pid == 0): # The second child. + # Since the current working directory may be a mounted filesystem, we + # avoid the issue of not being able to unmount the filesystem at + # shutdown time by changing it to the root directory. + os.chdir(WORKDIR) + # We probably don't want the file mode creation mask inherited from + # the parent, so we give the child complete control over permissions. + os.umask(UMASK) + else: + # exit() or _exit()? See below. + os._exit(0) # Exit parent (the first child) of the second child. + else: + # exit() or _exit()? + # _exit is like exit(), but it doesn't call any functions registered + # with atexit (and on_exit) or any registered signal handlers. It also + # closes any open file descriptors. Using exit() may cause all stdio + # streams to be flushed twice and any temporary files may be unexpectedly + # removed. It's therefore recommended that child branches of a fork() + # and the parent branch(es) of a daemon use _exit(). + os._exit(0) # Exit parent of the first child. + + # Close all open file descriptors. This prevents the child from keeping + # open any file descriptors inherited from the parent. There is a variety + # of methods to accomplish this task. Three are listed below. + # + # Try the system configuration variable, SC_OPEN_MAX, to obtain the maximum + # number of open file descriptors to close. If it doesn't exists, use + # the default value (configurable). + # + # try: + # maxfd = os.sysconf("SC_OPEN_MAX") + # except (AttributeError, ValueError): + # maxfd = MAXFD + # + # OR + # + # if (os.sysconf_names.has_key("SC_OPEN_MAX")): + # maxfd = os.sysconf("SC_OPEN_MAX") + # else: + # maxfd = MAXFD + # + # OR + # + # Use the getrlimit method to retrieve the maximum file descriptor number + # that can be opened by this process. If there is not limit on the + # resource, use the default value. + # + import resource # Resource usage information. + maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + if (maxfd == resource.RLIM_INFINITY): + maxfd = MAXFD + + # Iterate through and close all file descriptors. + for fd in range(0, maxfd): + try: + os.close(fd) + except OSError: # ERROR, fd wasn't open to begin with (ignored) + pass + + # Redirect the standard I/O file descriptors to the specified file. Since + # the daemon has no controlling terminal, most daemons redirect stdin, + # stdout, and stderr to /dev/null. This is done to prevent side-effects + # from reads and writes to the standard I/O file descriptors. + + # This call to open is guaranteed to return the lowest file descriptor, + # which will be 0 (stdin), since it was closed above. + os.open(REDIRECT_TO, os.O_RDWR) # standard input (0) + + # Duplicate standard input to standard output and standard error. + os.dup2(0, 1) # standard output (1) + os.dup2(0, 2) # standard error (2) + + return(0) + +if __name__ == "__main__": + + retCode = createDaemon() + + sys.exit(retCode) diff --git a/ambari-agent/src/main/python/ambari_agent/daemon.py b/ambari-agent/src/main/python/ambari_agent/daemon.py new file mode 100644 index 0000000000..31607f56fa --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/daemon.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from shell import shellRunner + +class daemonRunner: + def start(self, name): + sh = shellRunner() + script = [ '/etc/init.d/'+name, 'start' ] + return sh.run(script) + + def stop(self, name): + sh = shellRunner() + script = [ '/etc/init.d/'+name, 'stop' ] + return sh.run(script) + + def status(self, name): + sh = shellRunner() + script = [ '/etc/init.d/'+name, 'stop' ] + return sh.run(script) diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py new file mode 100644 index 0000000000..f7e9abc7e8 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +import logging.handlers +import code +import signal +import sys, traceback +import os +import time +import ConfigParser +from createDaemon import createDaemon +from Controller import Controller +from shell import getTempFiles +from shell import killstaleprocesses +import AmbariConfig + +logger = logging.getLogger() +agentPid = os.getpid() + +if 'AMBARI_PID_DIR' in os.environ: + pidfile = os.environ['AMBARI_PID_DIR'] + "/ambari-agent.pid" +else: + pidfile = "/var/run/ambari/ambari-agent.pid" + +if 'AMBARI_LOG_DIR' in os.environ: + logfile = os.environ['AMBARI_LOG_DIR'] + "/ambari-agent.log" +else: + logfile = "/var/log/ambari/ambari-agent.log" + +def signal_handler(signum, frame): + #we want the handler to run only for the agent process and not + #for the children (e.g. namenode, etc.) + if (os.getpid() != agentPid): + os._exit(0) + logger.info('signal received, exiting.') + try: + os.unlink(pidfile) + except Exception: + logger.warn("Unable to remove: "+pidfile) + traceback.print_exc() + + tempFiles = getTempFiles() + for tempFile in tempFiles: + if os.path.exists(tempFile): + try: + os.unlink(tempFile) + except Exception: + traceback.print_exc() + logger.warn("Unable to remove: "+tempFile) + os._exit(0) + +def debug(sig, frame): + """Interrupt running process, and provide a python prompt for + interactive debugging.""" + d={'_frame':frame} # Allow access to frame object. + d.update(frame.f_globals) # Unless shadowed by global + d.update(frame.f_locals) + + message = "Signal recieved : entering python shell.\nTraceback:\n" + message += ''.join(traceback.format_stack(frame)) + logger.info(message) + +def main(): + global config + default_cfg = { 'agent' : { 'prefix' : '/home/ambari' } } + config = ConfigParser.RawConfigParser(default_cfg) + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGUSR1, debug) + if (len(sys.argv) >1) and sys.argv[1]=='stop': + # stop existing Ambari agent + try: + f = open(pidfile, 'r') + pid = f.read() + pid = int(pid) + f.close() + os.kill(pid, signal.SIGTERM) + time.sleep(5) + if os.path.exists(pidfile): + raise Exception("PID file still exists.") + os._exit(0) + except Exception, err: + os.kill(pid, signal.SIGKILL) + os._exit(1) + + # Check if there is another instance running + if os.path.isfile(pidfile): + print("%s already exists, exiting" % pidfile) + sys.exit(1) + else: + # Daemonize current instance of Ambari Agent + #retCode = createDaemon() + pid = str(os.getpid()) + file(pidfile, 'w').write(pid) + + + logger.setLevel(logging.INFO) + formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - %(message)s") + rotateLog = logging.handlers.RotatingFileHandler(logfile, "a", 10000000, 10) + rotateLog.setFormatter(formatter) + logger.addHandler(rotateLog) + credential = None + + # Check for ambari configuration file. + try: + config = AmbariConfig.config + if(os.path.exists('/etc/ambari/ambari.ini')): + config.read('/etc/ambari/ambari.ini') + AmbariConfig.setConfig(config) + else: + raise Exception("No config found, use default") + except Exception, err: + logger.warn(err) + + killstaleprocesses() + logger.info("Connecting to Server at: "+config.get('server', 'url')) + + # Launch Controller communication + controller = Controller(config) + controller.start() + controller.run() + logger.info("finished") + +if __name__ == "__main__": + main() diff --git a/ambari-agent/src/main/python/ambari_agent/shell.py b/ambari-agent/src/main/python/ambari_agent/shell.py new file mode 100644 index 0000000000..75541208a8 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/shell.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from pwd import getpwnam +from grp import getgrnam +import AmbariConfig +import logging +import logging.handlers +import subprocess +import os +import tempfile +import signal +import sys +import threading +import time +import traceback +import shutil + +global serverTracker +serverTracker = {} +logger = logging.getLogger() + +threadLocal = threading.local() + +tempFiles = [] +def noteTempFile(filename): + tempFiles.append(filename) + +def getTempFiles(): + return tempFiles + +def killstaleprocesses(): + logger.info ("Killing stale processes") + prefix = AmbariConfig.config.get('stack','installprefix') + files = os.listdir(prefix) + for file in files: + if str(file).endswith(".pid"): + pid = str(file).split('.')[0] + killprocessgrp(int(pid)) + os.unlink(os.path.join(prefix,file)) + logger.info ("Killed stale processes") + +def killprocessgrp(pid): + try: + os.killpg(pid, signal.SIGTERM) + time.sleep(5) + try: + os.killpg(pid, signal.SIGKILL) + except: + logger.warn("Failed to send SIGKILL to PID %d. Process exited?" % (pid)) + except: + logger.warn("Failed to kill PID %d" % (pid)) + +def changeUid(): + try: + os.setuid(threadLocal.uid) + except Exception: + logger.warn("can not switch user for running command.") + +class shellRunner: + # Run any command + def run(self, script, user=None): + try: + if user!=None: + user=getpwnam(user)[2] + else: + user = os.getuid() + threadLocal.uid = user + except Exception: + logger.warn("can not switch user for RUN_COMMAND.") + code = 0 + cmd = " " + cmd = cmd.join(script) + p = subprocess.Popen(cmd, preexec_fn=changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True) + out, err = p.communicate() + code = p.wait() + logger.debug("Exitcode for %s is %d" % (cmd,code)) + return {'exitCode': code, 'output': out, 'error': err} + + # dispatch action types + def runAction(self, clusterId, component, role, user, command, cleanUpCommand, result): + oldDir = os.getcwd() + #TODO: handle this better. Don't like that it is doing a chdir for the main process + os.chdir(self.getWorkDir(clusterId, role)) + oldUid = os.getuid() + try: + if user is not None: + user=getpwnam(user)[2] + else: + user = oldUid + threadLocal.uid = user + except Exception: + logger.warn("%s %s %s can not switch user for RUN_ACTION." % (clusterId, component, role)) + code = 0 + cmd = sys.executable + tempfilename = tempfile.mktemp() + tmp = open(tempfilename, 'w') + tmp.write(command['script']) + tmp.close() + cmd = "%s %s %s" % (cmd, tempfilename, " ".join(command['param'])) + commandResult = {} + p = subprocess.Popen(cmd, preexec_fn=changeUid, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True) + out, err = p.communicate() + code = p.wait() + if code != 0: + commandResult['output'] = out + commandResult['error'] = err + commandResult['exitCode'] = code + result['commandResult'] = commandResult + os.unlink(tempfilename) + if code != 0: + tempfilename = tempfile.mktemp() + tmp = open(tempfilename, 'w') + tmp.write(command['script']) + tmp.close() + cmd = sys.executable + cmd = "%s %s %s" % (cmd, tempfilename, " ".join(cleanUpCommand['param'])) + cleanUpCode = 0 + cleanUpResult = {} + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True) + out, err = p.communicate() + cleanUpCode = p.wait() + if cleanUpCode != 0: + cleanUpResult['output'] = out + cleanUpResult['error'] = err + cleanUpResult['exitCode'] = cleanUpCode + result['cleanUpResult'] = cleanUpResult + os.unlink(tempfilename) + os._exit(1) + try: + os.chdir(oldDir) + except Exception: + logger.warn("%s %s %s can not restore environment for RUN_ACTION." % (clusterId, component, role)) + return result + + # Start a process and presist its state + def startProcess(self, clusterId, clusterDefinitionRevision, component, role, script, user, result): + global serverTracker + oldDir = os.getcwd() + try: + os.chdir(self.getWorkDir(clusterId,role)) + except Exception: + logger.warn("%s %s %s can not switch dir for START_ACTION." % (clusterId, component, role)) + oldUid = os.getuid() + try: + if user is not None: + user=getpwnam(user)[2] + else: + user = os.getuid() + threadLocal.uid = user + except Exception: + logger.warn("%s %s %s can not switch user for START_ACTION." % (clusterId, component, role)) + code = 0 + commandResult = {} + process = self.getServerKey(clusterId,clusterDefinitionRevision,component,role) + if not process in serverTracker: + try: + plauncher = processlauncher(script,user) + plauncher.start() + plauncher.blockUntilProcessCreation() + except Exception: + traceback.print_exc() + logger.warn("Can not launch process for %s %s %s" % (clusterId, component, role)) + code = -1 + serverTracker[process] = plauncher + commandResult['exitCode'] = code + result['commandResult'] = commandResult + try: + os.chdir(oldDir) + except Exception: + logger.warn("%s %s %s can not restore environment for START_ACTION." % (clusterId, component, role)) + return result + + # Stop a process and remove presisted state + def stopProcess(self, processKey): + global serverTracker + keyFragments = processKey.split('/') + process = self.getServerKey(keyFragments[0],keyFragments[1],keyFragments[2],keyFragments[3]) + if process in serverTracker: + logger.info ("Sending %s with PID %d the SIGTERM signal" % (process,serverTracker[process].getpid())) + killprocessgrp(serverTracker[process].getpid()) + del serverTracker[process] + + def getServerTracker(self): + return serverTracker + + def getServerKey(self,clusterId, clusterDefinitionRevision, component, role): + return clusterId+"/"+str(clusterDefinitionRevision)+"/"+component+"/"+role + + def getWorkDir(self, clusterId, role): + prefix = AmbariConfig.config.get('stack','installprefix') + return str(os.path.join(prefix, clusterId, role)) + + +class processlauncher(threading.Thread): + def __init__(self,script,uid): + threading.Thread.__init__(self) + self.script = script + self.serverpid = -1 + self.uid = uid + self.out = None + self.err = None + + def run(self): + try: + tempfilename = tempfile.mktemp() + noteTempFile(tempfilename) + pythoncmd = sys.executable + tmp = open(tempfilename, 'w') + tmp.write(self.script['script']) + tmp.close() + threadLocal.uid = self.uid + self.cmd = "%s %s %s" % (pythoncmd, tempfilename, " ".join(self.script['param'])) + logger.info("Launching %s as uid %d" % (self.cmd,self.uid) ) + p = subprocess.Popen(self.cmd, preexec_fn=self.changeUidAndSetSid, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, shell=True, close_fds=True) + logger.info("Launched %s; PID %d" % (self.cmd,p.pid)) + self.serverpid = p.pid + self.out, self.err = p.communicate() + self.code = p.wait() + logger.info("%s; PID %d exited with code %d \nSTDOUT: %s\nSTDERR %s" % + (self.cmd,p.pid,self.code,self.out,self.err)) + except: + logger.warn("Exception encountered while launching : " + self.cmd) + traceback.print_exc() + + os.unlink(self.getpidfile()) + os.unlink(tempfilename) + + def blockUntilProcessCreation(self): + self.getpid() + + def getpid(self): + sleepCount = 1 + while (self.serverpid == -1): + time.sleep(1) + logger.info("Waiting for process %s to start" % self.cmd) + if sleepCount > 10: + logger.warn("Couldn't start process %s even after %d seconds" % (self.cmd,sleepCount)) + os._exit(1) + return self.serverpid + + def getpidfile(self): + prefix = AmbariConfig.config.get('stack','installprefix') + pidfile = os.path.join(prefix,str(self.getpid())+".pid") + return pidfile + + def changeUidAndSetSid(self): + prefix = AmbariConfig.config.get('stack','installprefix') + pidfile = os.path.join(prefix,str(os.getpid())+".pid") + #TODO remove try/except (when there is a way to provide + #config files for testcases). The default config will want + #to create files in /var/ambari which may not exist unless + #specifically created. + #At that point add a testcase for the pid file management. + try: + f = open(pidfile,'w') + f.close() + except: + logger.warn("Couldn't write pid file %s for %s" % (pidfile,self.cmd)) + changeUid() + os.setsid() diff --git a/ambari-agent/src/main/python/setup.cfg b/ambari-agent/src/main/python/setup.cfg new file mode 100644 index 0000000000..a73754ef79 --- /dev/null +++ b/ambari-agent/src/main/python/setup.cfg @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +[egg_info] +tag_build = +tag_date = 0 +tag_svn_revision = 0 diff --git a/ambari-agent/src/main/python/setup.py b/ambari-agent/src/main/python/setup.py new file mode 100644 index 0000000000..41c2097830 --- /dev/null +++ b/ambari-agent/src/main/python/setup.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from setuptools import setup + +setup( + name = "ambari-agent", + version = "1.0.3-SNAPSHOT", + packages = ['ambari_agent'], + # metadata for upload to PyPI + author = "Apache Software Foundation", + author_email = "ambari-dev@incubator.apache.org", + description = "Ambari agent", + license = "Apache License v2.0", + keywords = "hadoop, ambari", + url = "http://incubator.apache.org/ambari", + long_description = "This package implements the Ambari agent for installing Hadoop on large clusters.", + platforms=["any"], + entry_points = { + "console_scripts": [ + "ambari-agent = ambari_agent.main:main", + ], + } +) diff --git a/ambari-agent/src/test/python/TestActionQueue.py b/ambari-agent/src/test/python/TestActionQueue.py new file mode 100644 index 0000000000..fbfbf24e77 --- /dev/null +++ b/ambari-agent/src/test/python/TestActionQueue.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from unittest import TestCase +from ambari_agent.ActionQueue import ActionQueue +from ambari_agent.AmbariConfig import AmbariConfig +from ambari_agent.FileUtil import getFilePath +import os, errno, time + +class TestActionQueue(TestCase): + def test_ActionQueueStartStop(self): + actionQueue = ActionQueue(AmbariConfig().getConfig()) + actionQueue.start() + actionQueue.stop() + actionQueue.join() + self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') + + def test_RetryAction(self): + action={'id' : 'tttt'} + config = AmbariConfig().getConfig() + actionQueue = ActionQueue(config) + path = actionQueue.getInstallFilename(action['id']) + configFile = { + "data" : "test", + "owner" : os.getuid(), + "group" : os.getgid() , + "permission" : 0700, + "path" : path, + "umask" : 022 + } + + #note that the command in the action is just a listing of the path created + #we just want to ensure that 'ls' can run on the data file (in the actual world + #this 'ls' would be a puppet or a chef command that would work on a data + #file + badAction = { + 'id' : 'tttt', + 'kind' : 'INSTALL_AND_CONFIG_ACTION', + 'workDirComponent' : 'abc-hdfs', + 'file' : configFile, + 'clusterDefinitionRevision' : 12, + 'command' : ['/bin/ls',"/foo/bar/badPath1234"] + } + path=getFilePath(action,path) + goodAction = { + 'id' : 'tttt', + 'kind' : 'INSTALL_AND_CONFIG_ACTION', + 'workDirComponent' : 'abc-hdfs', + 'file' : configFile, + 'clusterDefinitionRevision' : 12, + 'command' : ['/bin/ls',path] + } + actionQueue.start() + response = {'actions' : [badAction,goodAction]} + actionQueue.maxRetries = 2 + actionQueue.sleepInterval = 1 + result = actionQueue.put(response) + results = actionQueue.result() + sleptCount = 1 + while (len(results) < 2 and sleptCount < 15): + time.sleep(1) + sleptCount += 1 + results = actionQueue.result() + actionQueue.stop() + actionQueue.join() + self.assertEqual(len(results), 2, 'Number of results is not 2.') + result = results[0] + maxretries = config.get('command', 'maxretries') + self.assertEqual(int(result['retryActionCount']), + int(maxretries), + "Number of retries is %d and not %d" % + (int(result['retryActionCount']), int(str(maxretries)))) + result = results[1] + self.assertEqual(int(result['retryActionCount']), + 1, + "Number of retries is %d and not %d" % + (int(result['retryActionCount']), 1)) diff --git a/ambari-agent/src/test/python/TestAgentActions.py b/ambari-agent/src/test/python/TestAgentActions.py new file mode 100644 index 0000000000..e844bd9cb1 --- /dev/null +++ b/ambari-agent/src/test/python/TestAgentActions.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from unittest import TestCase +import os, errno, getpass +from ambari_agent.ActionQueue import ActionQueue +from ambari_agent.AmbariConfig import AmbariConfig +from ambari_agent.FileUtil import getFilePath +from ambari_agent import shell +from ambari_agent.shell import serverTracker +import time + +class TestAgentActions(TestCase): + def test_installAndConfigAction(self): + action={'id' : 'tttt'} + actionQueue = ActionQueue(AmbariConfig().getConfig()) + path = actionQueue.getInstallFilename(action['id']) + configFile = { + "data" : "test", + "owner" : os.getuid(), + "group" : os.getgid() , + "permission" : 0700, + "path" : path, + "umask" : 022 + } + + #note that the command in the action is just a listing of the path created + #we just want to ensure that 'ls' can run on the data file (in the actual world + #this 'ls' would be a puppet or a chef command that would work on a data + #file + path=getFilePath(action,path) + action = { + 'id' : 'tttt', + 'kind' : 'INSTALL_AND_CONFIG_ACTION', + 'workDirComponent' : 'abc-hdfs', + 'file' : configFile, + 'clusterDefinitionRevision' : 12, + 'command' : ['/bin/ls',path] + } + result = { } + actionQueue = ActionQueue(AmbariConfig().getConfig()) + result = actionQueue.installAndConfigAction(action) + cmdResult = result['commandResult'] + self.assertEqual(cmdResult['exitCode'], 0, "installAndConfigAction test failed. Returned %d " % cmdResult['exitCode']) + self.assertEqual(cmdResult['output'], path + "\n", "installAndConfigAction test failed Returned %s " % cmdResult['output']) + + def test_startAndStopAction(self): + command = {'script' : 'import os,sys,time\ni = 0\nwhile (i < 1000):\n print "testhello"\n sys.stdout.flush()\n time.sleep(1)\n i+=1', + 'param' : ''} + action={'id' : 'ttt', + 'kind' : 'START_ACTION', + 'clusterId' : 'foobar', + 'clusterDefinitionRevision' : 1, + 'component' : 'foocomponent', + 'role' : 'foorole', + 'command' : command, + 'user' : getpass.getuser() + } + + actionQueue = ActionQueue(AmbariConfig().getConfig()) + result = actionQueue.startAction(action) + cmdResult = result['commandResult'] + self.assertEqual(cmdResult['exitCode'], 0, "starting a process failed") + shell = actionQueue.getshellinstance() + key = shell.getServerKey(action['clusterId'],action['clusterDefinitionRevision'], + action['component'],action['role']) + keyPresent = True + if not key in serverTracker: + keyPresent = False + self.assertEqual(keyPresent, True, "Key not present") + plauncher = serverTracker[key] + self.assertTrue(plauncher.getpid() > 0, "Pid less than 0!") + time.sleep(5) + shell.stopProcess(key) + keyPresent = False + if key in serverTracker: + keyPresent = True + self.assertEqual(keyPresent, False, "Key present") + processexists = True + try: + os.kill(serverTracker[key].getpid(),0) + except: + processexists = False + self.assertEqual(processexists, False, "Process still exists!") + self.assertTrue("testhello" in plauncher.out, "Output doesn't match!") diff --git a/ambari-agent/src/test/python/TestFileUtil.py b/ambari-agent/src/test/python/TestFileUtil.py new file mode 100644 index 0000000000..53e55c58d5 --- /dev/null +++ b/ambari-agent/src/test/python/TestFileUtil.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from unittest import TestCase +from ambari_agent.FileUtil import writeFile, createStructure, deleteStructure +import os, errno + +class TestFileUtil(TestCase): + def test_createStructure(self): + action = { 'clusterId' : 'abc', 'role' : 'hdfs', 'workDirComponent' : 'abc-hdfs' } + result = {} + result = createStructure(action, result) + self.assertEqual(result['exitCode'], 0, 'Create cluster structure failed.') + +# def test_writeFile(self): + configFile = { + "data" : "test", + "owner" : os.getuid(), + "group" : os.getgid() , + "permission" : 0700, + "path" : "/tmp/ambari_file_test/_file_write_test", + "umask" : 022 + } + action = { + 'clusterId' : 'abc', + 'role' : 'hdfs', + 'workDirComponent' : 'abc-hdfs', + 'file' : configFile + } + result = { } + result = writeFile(action, result) + self.assertEqual(result['exitCode'], 0, 'WriteFile test with uid/gid failed.') + +# def test_deleteStructure(self): + result = { } + action = { 'clusterId' : 'abc', 'role' : 'hdfs', 'workDirComponent' : 'abc-hdfs' } + result = deleteStructure(action, result) + self.assertEqual(result['exitCode'], 0, 'Delete cluster structure failed.') + diff --git a/ambari-agent/src/test/python/TestHardware.py b/ambari-agent/src/test/python/TestHardware.py new file mode 100644 index 0000000000..68ee8b2bcc --- /dev/null +++ b/ambari-agent/src/test/python/TestHardware.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from unittest import TestCase +from ambari_agent.Hardware import Hardware + +class TestHardware(TestCase): + def test_build(self): + hardware = Hardware() + result = hardware.get() + self.assertTrue(result['coreCount'] >= 1) + self.assertTrue(result['netSpeed'] != None) + diff --git a/ambari-agent/src/test/python/TestHeartbeat.py b/ambari-agent/src/test/python/TestHeartbeat.py new file mode 100644 index 0000000000..a596791a6d --- /dev/null +++ b/ambari-agent/src/test/python/TestHeartbeat.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from unittest import TestCase +from ambari_agent.Heartbeat import Heartbeat +from ambari_agent.ActionQueue import ActionQueue +from ambari_agent.AmbariConfig import AmbariConfig +import socket + +class TestHeartbeat(TestCase): + def test_build(self): + actionQueue = ActionQueue(AmbariConfig().getConfig()) + heartbeat = Heartbeat(actionQueue) + result = heartbeat.build(100) +
\ No newline at end of file diff --git a/ambari-agent/src/test/python/TestServerStatus.py b/ambari-agent/src/test/python/TestServerStatus.py new file mode 100644 index 0000000000..8d09037e68 --- /dev/null +++ b/ambari-agent/src/test/python/TestServerStatus.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from unittest import TestCase +from ambari_agent.ServerStatus import ServerStatus + +class TestServerStatus(TestCase): + def test_build(self): + serverStatus = ServerStatus() + result = serverStatus.build() + self.assertEqual(result, [], 'List of running servers should be 0.') + diff --git a/ambari-agent/src/test/python/unitTests.py b/ambari-agent/src/test/python/unitTests.py new file mode 100644 index 0000000000..233034bd95 --- /dev/null +++ b/ambari-agent/src/test/python/unitTests.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python2.6 + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import unittest +import doctest + +class TestAgent(unittest.TestSuite): + def run(self, result): + run = unittest.TestSuite.run + run(self, result) + return result + +def all_tests_suite(): + suite = unittest.TestLoader().loadTestsFromNames([ + 'TestHeartbeat', + 'TestHardware', + 'TestServerStatus', + 'TestFileUtil', + 'TestActionQueue', + 'TestAmbariComponent', + 'TestAgentActions' + ]) + return TestAgent([suite]) + +def main(): + runner = unittest.TextTestRunner() + suite = all_tests_suite() + raise SystemExit(not runner.run(suite).wasSuccessful()) + +if __name__ == '__main__': + import os + import sys + sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + main() |