#!/usr/bin/env python
###############################################################################
# Copyright (c) 2013 Linaro
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
# which accompanies this distribution, and is available at
# http://www.eclipse.org/legal/epl-v10.html
###############################################################################
import base64
import json
import os
import sys
import urllib2
import optparse
import getpass
from xml.dom import minidom
PRIVATE_ACL = """\
hudson.model.Item.Build:%(group)s
hudson.model.Item.Configure:%(group)s
hudson.model.Item.Read:%(group)s
hudson.model.Item.Cancel:%(group)s
hudson.model.Item.Discover:%(group)s
"""
class Jenkins(object):
def __init__(self, base_url, username, passwd):
self.base = base_url
self.auth_headers = {
'Authorization': 'Basic %s' % (
base64.b64encode('%s:%s' % (username, passwd)))}
self.csrf = None
def jenkins_rest(self, jenkins_path, data=None, extra_headers=None):
"""Make an authenticated request to jenkins.
@param jenkins_path: The path on the Jenkins instance to make the
request to.
@param data: Data to include in the request (if this is not None the
request will be a POST).
@param extra_headers: A dictionary of extra headers that will passed in
addition to Authorization.
@raises urllib2.HTTPError: If the response is not a HTTP 200.
@returns: the body of the response.
"""
headers = self.auth_headers.copy()
if extra_headers:
headers.update(extra_headers)
url = self.base + jenkins_path
print url
req = urllib2.Request(
url, data, headers)
resp = urllib2.urlopen(req)
data = resp.read()
resp.close()
return data
def get_csrf_headers(self):
if self.csrf is None:
try:
crumb_data = self.jenkins_rest('crumbIssuer/api/json')
data = json.loads(crumb_data)
self.csrf = {data['crumbRequestField']: data['crumb']}
except urllib2.HTTPError:
# Ignore errors in case CSRF protection is not enabled
self.csrf = {}
return self.csrf
def jenkins_rest_post(self, jenkins_path, data, extra_headers=None):
if extra_headers is None:
extra_headers = {}
extra_headers.update(self.get_csrf_headers())
return self.jenkins_rest(jenkins_path, data, extra_headers)
def get_job_config(self, job_name):
return self.jenkins_rest('job/' + job_name + '/config.xml')
def post_config(self, url, config_xml):
return self.jenkins_rest_post(url, config_xml,
{'Content-Type': 'text/xml'})
def set_job_config(self, job_name, config_xml):
return self.post_config('job/' + job_name + '/config.xml', config_xml)
def create_job(self, job_name, config_xml):
return self.post_config('createItem?name=' + job_name, config_xml)
def build_job(self, job_name):
self.jenkins_rest_post(
'job/' + job_name + '/buildWithParameters?delay=0sec', '')
def job2user_group(job_group):
if job_group == "linaro-android":
return "linaro-android-builders"
return job_group
def error(msg):
print msg
sys.exit(1)
def main():
global options
optparser = optparse.OptionParser(usage="%prog ...")
optparser.add_option("--url",
default="https://android-build.linaro.org/jenkins",
help="Jenkins base url, default: %default")
optparser.add_option("--user",
help="Jenkins username, default: $USER")
optparser.add_option("--apikey-file", metavar="FILE",
help="File holding Jenkins API key")
optparser.add_option("--cred-file", metavar="FILE",
help="File holding Jenkins username:API key pair")
optparser.add_option("--private", metavar="GROUP",
help="Create privaet job accessible to GROUP")
options, args = optparser.parse_args(sys.argv[1:])
if len(args) < 1:
optparser.error("Wrong number of arguments")
config_dir = os.path.expanduser("~/.config/android-build-client")
password = None
if options.cred_file:
options.user, password = open(options.cred_file).read().strip().split(":")
elif options.apikey_file:
password = open(options.passwd_file).read().strip()
elif os.path.exists(config_dir + "/cred"):
options.user, password = open(config_dir + "/cred").read().strip().split(":")
print "INFO: Using cached authorization for: %s" % options.user
elif args[0] != "authorize":
password = getpass.getpass("API Token:")
if options.url[-1] != '/':
options.url += '/'
j = Jenkins(options.url, options.user, password)
if args[0] == "authorize":
if len(args) != 1:
optparser.error("Usage: authorize")
import getpass
user = raw_input("Enter user name: ")
while True:
key = getpass.getpass("API key: ")
if len(key) == 32:
break
print "This does not look like Jenkisn API key, please try again"
try:
os.makedirs(config_dir)
except OSError:
pass
f = open(config_dir + "/cred", "w")
print >>f, "%s:%s" % (user, key)
f.close()
print "Credentials cached for future use"
elif args[0] == "create":
if len(args) != 3:
optparser.error("Usage: create ")
job_group, job_subname = args[1].split("/")
template = j.get_job_config("template_" + job2user_group(job_group))
dom = minidom.parseString(template)
nodes = dom.getElementsByTagName("hudson.model.StringParameterDefinition")
assert len(nodes) == 1
n = nodes[0].getElementsByTagName("defaultValue")[0]
build_config = open(args[2]).read()
n.childNodes[0].data = base64.encodestring(build_config)
if options.private:
n = dom.getElementsByTagName("properties")[0]
c = n.getElementsByTagName("hudson.security.AuthorizationMatrixProperty")[0]
acl_dom = minidom.parseString(PRIVATE_ACL % {"group": options.private})
n.replaceChild(acl_dom.documentElement, c)
job_conf = dom.toxml()
jenkins_job = args[1].replace("/", "_")
try:
j.create_job(jenkins_job, job_conf)
except urllib2.HTTPError:
error("Error creating job '%s' (job exists?)" % jenkins_job)
print "Job created successfully: %s/job/%s/" % (options.url, jenkins_job)
elif args[0] == "build":
if len(args) != 2:
optparser.error("Usage: build ")
j.build_job(args[1].replace("/", "_"))
print "Build queued"
else:
optparser.error("Unknown command '%s'" % args[0])
if __name__ == "__main__":
main()