diff options
author | Danilo Šegan <danilo@segan.org> | 2012-05-17 20:24:36 +0200 |
---|---|---|
committer | Danilo Šegan <danilo@segan.org> | 2012-05-17 20:24:36 +0200 |
commit | ab13ebe1efe613cf026c82644bfefd360ce9b430 (patch) | |
tree | c3087be3232cfa01ffa8b8853c30e608c135ba1c /tests | |
parent | 686c39dbcc05c18acf87e3e106c3c26f8af10c39 (diff) |
Rename "testing" subdirectory to "tests".
Diffstat (limited to 'tests')
-rw-r--r-- | tests/LicenseHelperTest.php | 167 | ||||
-rw-r--r-- | tests/__init__.py | 16 | ||||
-rw-r--r-- | tests/apache2.conf.tmpl | 117 | ||||
-rw-r--r-- | tests/doctest_production_browser.py | 172 | ||||
-rw-r--r-- | tests/license_protected_file_downloader.py | 287 | ||||
-rw-r--r-- | tests/mime.types | 11 | ||||
-rw-r--r-- | tests/test_click_through_license.py | 292 | ||||
-rw-r--r-- | tests/test_php_unit.py | 36 | ||||
-rw-r--r-- | tests/test_publish_to_snapshots.py | 464 |
9 files changed, 1562 insertions, 0 deletions
diff --git a/tests/LicenseHelperTest.php b/tests/LicenseHelperTest.php new file mode 100644 index 0000000..b7bc855 --- /dev/null +++ b/tests/LicenseHelperTest.php @@ -0,0 +1,167 @@ +<?php + +require_once("licenses/LicenseHelper.php"); + +class LicenseHelperTest extends PHPUnit_Framework_TestCase +{ + + private $temp_filename; + + /** + * Running checkFile on a directory path returns false. + */ + public function test_checkFile_nonFile() + { + $this->assertFalse(LicenseHelper::checkFile(dirname(__FILE__))); + } + + /** + * Running checkFile on a symbolic link to an existing file returns true. + */ + public function test_checkFile_link() + { + $this->temp_filename = tempnam(sys_get_temp_dir(), "unittest"); + symlink($this->temp_filename, "test_link"); + + $this->assertTrue(LicenseHelper::checkFile("test_link")); + + unlink("test_link"); + unlink($this->temp_filename); + } + + /** + * Running checkFile on a broken symbolic link returns true. + * This is because PHP function is_link returns true on broken soft links. + */ + public function test_checkFile_brokenLink() + { + $this->temp_filename = tempnam(sys_get_temp_dir(), "unittest"); + symlink($this->temp_filename, "test_link"); + unlink($this->temp_filename); + + $this->assertTrue(LicenseHelper::checkFile("test_link")); + + unlink("test_link"); + } + + /** + * Running checkFile on a regular file returns true. + */ + public function test_checkFile_file() + { + $this->assertTrue(LicenseHelper::checkFile(__FILE__)); + } + + /** + * getFileList throws an InvalidArgumentException when passed + * an argument pointing to a file. + * @expectedException InvalidArgumentException + */ + public function test_getFilesList_file() + { + $file_list = LicenseHelper::getFilesList(__FILE__); + } + + /** + * getFileList returns a list of filenames in that directory. + */ + public function test_getFilesList_dir() + { + $temp_dir_name = tempnam(sys_get_temp_dir(), "unittest"); + if (file_exists($temp_dir_name)) { + unlink($temp_dir_name); + } + mkdir($temp_dir_name); + + touch($temp_dir_name . "/unittest1"); + touch($temp_dir_name . "/unittest2"); + + $file_list = LicenseHelper::getFilesList($temp_dir_name); + $this->assertCount(2, $file_list); + + // Sort the file list, this function returns the files as they are + // written on the filesystem. + sort($file_list); + $this->assertEquals("unittest1", $file_list[0]); + $this->assertEquals("unittest2", $file_list[1]); + + unlink($temp_dir_name . "/unittest1"); + unlink($temp_dir_name . "/unittest2"); + rmdir($temp_dir_name); + } + + /** + * Running findFileByPattern on an array without matches returns false. + */ + public function test_findFileByPattern_noMatch() + { + $file_list = array("test.txt", "new_file.pdf"); + $pattern = "/^abc/"; + $this->assertFalse( + LicenseHelper::findFileByPattern($file_list, $pattern)); + } + + /** + * Running findFileByPattern on an array with matches returns first + * matching element. + */ + public function test_findFileByPattern_match() + { + $file_list = array("test.txt", "new_file.pdf"); + $pattern = "/test/"; + $this->assertEquals("test.txt", + LicenseHelper::findFileByPattern($file_list, + $pattern)); + } + + /** + * getTheme returns a ST-E Linaro-branded template when + * no EULA is present (indicated by eula filename being named + * EULA.txt or not). + */ + public function test_getTheme_noEula_snowball() + { + $eula = "EULA.txt"; + $filename = "snowball.build.tar.bz2"; + $this->assertEquals("ste", LicenseHelper::getTheme($eula, $filename)); + } + + /** + * getTheme returns a Samsung Linaro-branded template when + * no EULA is present (indicated by eula filename being named + * EULA.txt or not). + */ + public function test_getTheme_noEula_origen() + { + $eula = "EULA.txt"; + $filename = "origen.build.tar.bz2"; + $this->assertEquals("samsung", + LicenseHelper::getTheme($eula, $filename)); + } + + /** + * getTheme returns a generic Linaro-branded template when + * no EULA is present (indicated by eula filename being named + * EULA.txt or not). + */ + public function test_getTheme_noEula_generic() + { + $eula = "EULA.txt"; + $filename = "build.tar.bz2"; + $this->assertEquals("linaro", + LicenseHelper::getTheme($eula, $filename)); + } + + /** + * Running getTheme with eula file present (indicated by eula filename + * being named EULA.txt or not) returns extension of eula file. + */ + public function test_getTheme_eula() + { + $eula = "EULA.txt.test"; + $this->assertEquals("test", LicenseHelper::getTheme($eula, "")); + } + +} + +?>
\ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..a9da24d --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,16 @@ +import os +import unittest + +from tests.test_click_through_license import * +from tests.test_publish_to_snapshots import * + + +def test_suite(): + module_names = [ + 'tests.test_click_through_license.TestLicense', + 'tests.test_publish_to_snapshots.TestSnapshotsPublisher', + 'tests.test_php_unit.PhpUnitTest', + ] + loader = unittest.TestLoader() + suite = loader.loadTestsFromNames(module_names) + return suite diff --git a/tests/apache2.conf.tmpl b/tests/apache2.conf.tmpl new file mode 100644 index 0000000..4cfd851 --- /dev/null +++ b/tests/apache2.conf.tmpl @@ -0,0 +1,117 @@ +ServerRoot "" + +LockFile click_through_license.lock +PidFile click_through_license.pid + +Timeout 300 + +KeepAlive On + +KeepAliveTimeout 5 + +Listen + +ServerAdmin you@example.com + +#ServerName www.example.com:80 + +DocumentRoot "" + +RewriteLog rewrite.log +RewriteLogLevel 7 + +AccessFileName .htaccess + +LoadModule authz_host_module /usr/lib/apache2/modules/mod_authz_host.so +LoadModule cache_module /usr/lib/apache2/modules/mod_cache.so +LoadModule mime_module /usr/lib/apache2/modules/mod_mime.so +LoadModule autoindex_module /usr/lib/apache2/modules/mod_autoindex.so +LoadModule rewrite_module /usr/lib/apache2/modules/mod_rewrite.so +LoadModule php5_module /usr/lib/apache2/modules/libphp5.so + +User daemon +Group daemon + +<IfModule mpm_prefork_module> + StartServers 5 + MinSpareServers 5 + MaxSpareServers 10 + MaxClients 150 + MaxRequestsPerChild 0 +</IfModule> + +<IfModule mpm_worker_module> + StartServers 2 + MinSpareThreads 25 + MaxSpareThreads 75. + ThreadLimit 64 + ThreadsPerChild 25 + MaxClients 150 + MaxRequestsPerChild 0 +</IfModule> + +<IfModule mpm_event_module> + StartServers 2 + MinSpareThreads 25 + MaxSpareThreads 75. + ThreadLimit 64 + ThreadsPerChild 25 + MaxClients 150 + MaxRequestsPerChild 0 +</IfModule> + +<Directory /> + Options +FollowSymLinks + AllowOverride All + Order deny,allow + Deny from all +</Directory> + +<Directory ""> + Options Indexes +FollowSymLinks MultiViews + AllowOverride All + Order allow,deny + Allow from all +</Directory> + +<IfModule dir_module> + DirectoryIndex index.html +</IfModule> + +<FilesMatch "^\.ht"> + Order allow,deny + Deny from all + Satisfy All +</FilesMatch> + +ErrorLog "click_through_license_error.log" + +LogLevel warn + +<IfModule log_config_module> + LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\"" combined + LogFormat "%h %l %u %t \"%r\" %>s %b" common + + <IfModule logio_module> + LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\" %I %O" combinedio + </IfModule> + + CustomLog "click_through_license_access.log" common + +</IfModule> + +<IfModule mod_php5.c> + <FilesMatch "\.ph(p3?|tml)$"> + SetHandler application/x-httpd-php + </FilesMatch> + <FilesMatch "\.phps$"> + SetHandler application/x-httpd-php-source + </FilesMatch> +</IfModule> + +DefaultType text/plain + +IndexOptions FancyIndexing VersionSort HTMLTable NameWidth=* DescriptionWidth=* Charset=UTF-8 +ReadmeName README.html +HeaderName HEADER.html +IndexIgnore .??* *~ *# RCS CVS *,v *,t. diff --git a/tests/doctest_production_browser.py b/tests/doctest_production_browser.py new file mode 100644 index 0000000..0b5a157 --- /dev/null +++ b/tests/doctest_production_browser.py @@ -0,0 +1,172 @@ +from BeautifulSoup import BeautifulSoup + +from license_protected_file_downloader import LicenseProtectedFileFetcher + + +class EmptyDirectoryException(Exception): + ''' Directory at the current URL is empty. ''' + + +class NoLicenseException(Exception): + ''' No license protecting the file. ''' + + +class UnexpectedLicenseException(Exception): + ''' License protecting non-licensed the file. ''' + + +class DoctestProductionBrowser(): + """Doctest production testing browser class.""" + + def __init__(self, host_address): + self.host_address = host_address + self.current_url = host_address + self.fetcher = LicenseProtectedFileFetcher() + + def is_dir(self, link): + """Check if the link is a directory.""" + return link[-1] == "/" + + def get_header(self): + """Get header from the current url.""" + return self.parse_header(self.fetcher.get_headers(self.current_url)) + + def get_license_text(self): + """Get license from the current URL if it redirects to license.""" + license = self.fetcher.get_or_return_license(self.current_url) + if license[0]: + return license[0] + else: + raise NoLicenseException("License expected here.") + + def get_unprotected_file_header(self): + """Get headers from unprotected file.""" + page = self.fetcher.get_or_return_license(self.current_url) + # Check if license with accept and decline links is returned. + if len(page) == 3: + raise UnexpectedLicenseException("License not expected here.") + else: + return self.parse_header(self.fetcher.header) + + def get_content(self): + """Get contents from the current url.""" + return self.fetcher.get(self.current_url) + + def get_content_title(self): + """Get content title from the current url.""" + return self.get_title(self.fetcher.get(self.current_url)) + + def get_header_when_redirected(self): + """Get header when the client is redirected to the license.""" + self.fetcher.get(self.current_url) + return self.parse_header(self.fetcher.header) + + def accept_license_get_header(self): + """Accept license and get header of the file it redirects to.""" + license = self.fetcher.get_or_return_license(self.current_url) + # Second element in result is the accept link. + if license[1]: + self.fetcher.get_protected_file(license[1], self.current_url) + return self.parse_header(self.fetcher.header) + else: + raise NoLicenseException("License expected here.") + + def decline_license(self): + """Decline license. Return title of the page.""" + return self.get_title( + self.fetcher.get(self.current_url, accept_license=False) + ) + + def parse_header(self, header): + """Formats headers from dict form to the multi-line string.""" + header_str = "" + for key in sorted(header.iterkeys()): + header_str += "%s: %s\n" % (key, header[key]) + return header_str + + def get_title(self, html): + soup = BeautifulSoup(html) + titles_all = soup.findAll('title') + if len(titles_all) > 0: + return titles_all[0].contents[0] + else: + return "" + + def browse_to_relative(self, path): + """Change current url relatively.""" + self.current_url += path + + def browse_to_absolute(self, path): + """Change current url to specified path.""" + self.current_url = self.host_address + path + + def browse_to_next(self, condition): + """Browse to next dir/build file that matches condition. + + Set the current URL to to match the condition among the + links in the current page with priority to build files. + If there's no match, set link to build file if present. + Otherwise, set link to first directory present. + """ + links = self.find_links(self.get_content()) + link = self.find_link_with_condition(links, condition) + if not link: + # No link matching condition, get first build in list. + link = self.find_build_tar_bz2(links) + if not link: + # Still no link, just get first dir in list. + link = self.find_directory(links) + if not link: + # We found page with no directories nor builds. + raise EmptyDirectoryException("Directory is empty.") + + self.browse_to_relative(link) + + def find_links(self, html): + """Return list of links on the page with special conditions. + + Return all links below the "Parent directory" link. + Return whole list if there is no such link. + """ + soup = BeautifulSoup(html) + links_all = soup.findAll('a') + had_parent = False + links = [] + for link in links_all: + if had_parent: + links.append(link.get("href")) + if link.contents[0] == "Parent Directory": + had_parent = True + + if had_parent: + return links + else: + return [each.get('href') for each in links_all] + + def find_link_with_condition(self, links, condition): + """Finds a link which satisfies the condition. + + Condition is actually to contain the string from the list. + Build files (which end in .tar.bz2) have the priority. + """ + for link in links: + if condition in link and link[-7:] == "tar.bz2": + return link + for link in links: + if condition in link: + return link + return None + + def find_directory(self, links): + """Finds a directory among list of links.""" + for link in links: + if self.is_dir(link): + return link + return None + + def find_build_tar_bz2(self, links): + """Finds a file list of links which ends in tar.bz2.""" + for link in links: + if link[-7:] == "tar.bz2": + return link + return None diff --git a/tests/license_protected_file_downloader.py b/tests/license_protected_file_downloader.py new file mode 100644 index 0000000..5726a56 --- /dev/null +++ b/tests/license_protected_file_downloader.py @@ -0,0 +1,287 @@ +#!/usr/bin/env python + +import argparse +import os +import pycurl +import re +import urlparse +import html2text +from BeautifulSoup import BeautifulSoup + + +class LicenseProtectedFileFetcher: + """Fetch a file from the web that may be protected by a license redirect + + This is designed to run on snapshots.linaro.org. License HTML file are in + the form: + + <vendor>.html has a link to <vendor>-accept.html + + If self.get is pointed at a file that has to go through one of these + licenses, it should be able to automatically accept the license and + download the file. + + Once a license has been accepted, it will be used for all following + downloads. + + If self.close() is called before the object is deleted, cURL will store + the license accept cookie to cookies.txt, so it can be used for later + downloads. + + """ + def __init__(self, cookie_file="cookies.txt"): + """Set up cURL""" + self.curl = pycurl.Curl() + self.curl.setopt(pycurl.WRITEFUNCTION, self._write_body) + self.curl.setopt(pycurl.HEADERFUNCTION, self._write_header) + self.curl.setopt(pycurl.FOLLOWLOCATION, 1) + self.curl.setopt(pycurl.COOKIEFILE, cookie_file) + self.curl.setopt(pycurl.COOKIEJAR, cookie_file) + self.file_out = None + + def _get(self, url): + """Clear out header and body storage, fetch URL, filling them in.""" + url = url.encode("ascii") + self.curl.setopt(pycurl.URL, url) + + self.body = "" + self.header = "" + + if self.file_name: + self.file_out = open(self.file_name, 'w') + else: + self.file_out = None + + self.curl.perform() + self._parse_headers(url) + + if self.file_out: + self.file_out.close() + + def _parse_headers(self, url): + header = {} + for line in self.header.splitlines(): + # Header lines typically are of the form thing: value... + test_line = re.search("^(.*?)\s*:\s*(.*)$", line) + + if test_line: + header[test_line.group(1)] = test_line.group(2) + + # The location attribute is sometimes relative, but we would + # like to have it as always absolute... + if 'Location' in header: + parsed_location = urlparse.urlparse(header["Location"]) + + # If not an absolute location... + if not parsed_location.netloc: + parsed_source_url = urlparse.urlparse(url) + new_location = ["", "", "", "", ""] + + new_location[0] = parsed_source_url.scheme + new_location[1] = parsed_source_url.netloc + new_location[2] = header["Location"] + + # Update location with absolute URL + header["Location"] = urlparse.urlunsplit(new_location) + + self.header_text = self.header + self.header = header + + def get_headers(self, url): + url = url.encode("ascii") + self.curl.setopt(pycurl.URL, url) + + self.body = "" + self.header = "" + + # Setting NOBODY causes CURL to just fetch the header. + self.curl.setopt(pycurl.NOBODY, True) + self.curl.perform() + self.curl.setopt(pycurl.NOBODY, False) + + self._parse_headers(url) + + return self.header + + def get_or_return_license(self, url, file_name=None): + """Get file at the requested URL or, if behind a license, return that. + + If the URL provided does not redirect us to a license, then return the + body of that file. If we are redirected to a license click through + then return (the license as plain text, url to accept the license). + + If the user of this function accepts the license, then they should + call get_protected_file.""" + + self.file_name = file_name + + # Get the license details. If this returns None, the file isn't license + # protected and we can just return the file we started to get in the + # function (self.body). + license_details = self._get_license(url) + + if license_details: + return license_details + + return self.body + + def get(self, url, file_name=None, ignore_license=False, + accept_license=True): + """Fetch the requested URL, accepting licenses + + Fetches the file at url. If a redirect is encountered, it is + expected to be to a license that has an accept link. Follow that link, + then download the original file. Returns the fist 1MB of the file + (see _write_body). + + """ + + self.file_name = file_name + if ignore_license: + self._get(url) + return self.body + + license_details = self._get_license(url) + + if license_details: + # Found a license. + if accept_license: + # Accept the license without looking at it and + # start fetching the file we originally wanted. + accept_url = license_details[1] + self.get_protected_file(accept_url, url) + else: + # We want to decline the license and return the notice. + decline_url = license_details[2] + self._get(decline_url) + + else: + # If we got here, there wasn't a license protecting the file + # so we just fetch it. + self._get(url) + + return self.body + + def _get_license(self, url): + """Return (license, accept URL, decline URL) if found, + else return None. + + """ + + self.get_headers(url) + + if "Location" in self.header and self.header["Location"] != url: + # We have been redirected to a new location - the license file + location = self.header["Location"] + + # Fetch the license HTML + self._get(location) + + # Get the file from the URL (full path) + file = urlparse.urlparse(location).path + + # Get the file without the rest of the path + file = os.path.split(file)[-1] + + # Look for a link with accepted.html in the page name. Follow it. + accept_search, decline_search = None, None + for line in self.body.splitlines(): + if not accept_search: + accept_search = re.search( + """href=.*?["'](.*?-accepted.html)""", + line) + if not decline_search: + decline_search = re.search( + """href=.*?["'](.*?-declined.html)""", + line) + + if accept_search and decline_search: + # Have found license accept URL! + new_file = accept_search.group(1) + accept_url = re.sub(file, new_file, location) + + # Found decline URL as well. + new_file_decline = decline_search.group(1) + decline_url = re.sub(file, new_file_decline, location) + + # Parse the HTML using BeautifulSoup + soup = BeautifulSoup(self.body) + + # The license is in a div with the ID license-text, so we + # use this to pull just the license out of the HTML. + html_license = u"" + for chunk in soup.findAll(id="license-text"): + # Output of chunk.prettify is UTF8, but comes back + # as a str, so convert it here. + html_license += chunk.prettify().decode("utf-8") + + text_license = html2text.html2text(html_license) + + return text_license, accept_url, decline_url + + return None + + def get_protected_file(self, accept_url, url): + """Gets the file redirected to by the accept_url""" + + self._get(accept_url) # Accept the license + + if not("Location" in self.header and self.header["Location"] == url): + # If we got here, we don't have the file yet (weren't redirected + # to it). Fetch our target file. This should work now that we have + # the right cookie. + self._get(url) # Download the target file + + return self.body + + def _write_body(self, buf): + """Used by curl as a sink for body content""" + + # If we have a target file to write to, write to it + if self.file_out: + self.file_out.write(buf) + + # Only buffer first 1MB of body. This should be plenty for anything + # we wish to parse internally. + if len(self.body) < 1024 * 1024 * 1024: + # XXX Would be nice to stop keeping the file in RAM at all and + # passing large buffers around. Perhaps only keep in RAM if + # file_name == None? (used for getting directory listings + # normally). + self.body += buf + + def _write_header(self, buf): + """Used by curl as a sink for header content""" + self.header += buf + + def register_progress_callback(self, callback): + self.curl.setopt(pycurl.NOPROGRESS, 0) + self.curl.setopt(pycurl.PROGRESSFUNCTION, callback) + + def close(self): + """Wrapper to close curl - this will allow curl to write out cookies""" + self.curl.close() + + +def main(): + """Download file specified on command line""" + parser = argparse.ArgumentParser(description="Download a file, accepting " + "any licenses required to do so.") + + parser.add_argument('url', metavar="URL", type=str, nargs=1, + help="URL of file to download.") + + args = parser.parse_args() + + fetcher = LicenseProtectedFileFetcher() + + # Get file name from URL + file_name = os.path.basename(urlparse.urlparse(args.url[0]).path) + if not file_name: + file_name = "downloaded" + fetcher.get(args.url[0], file_name) + + fetcher.close() + +if __name__ == "__main__": + main() diff --git a/tests/mime.types b/tests/mime.types new file mode 100644 index 0000000..c4a3984 --- /dev/null +++ b/tests/mime.types @@ -0,0 +1,11 @@ +text/css css +text/directory +text/html html htm shtml +text/plain asc txt text pot brf +text/uri-list +application/x-httpd-php phtml pht php +application/x-httpd-php-source phps +application/x-httpd-php3 php3 +application/x-httpd-php3-preprocessed php3p +application/x-httpd-php4 php4 +application/x-httpd-php5 php5 diff --git a/tests/test_click_through_license.py b/tests/test_click_through_license.py new file mode 100644 index 0000000..0f8643a --- /dev/null +++ b/tests/test_click_through_license.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python + +import re +import os +import shutil +import shlex +import subprocess +import socket + +from testtools import TestCase +from testtools.matchers import Mismatch +from license_protected_file_downloader import LicenseProtectedFileFetcher + +fetcher = LicenseProtectedFileFetcher() +cwd = os.getcwd() +docroot = cwd +srvroot = os.path.abspath(os.path.join(*([cwd] + ['tests']))) +local_rewrite = 'RewriteCond %{REMOTE_ADDR} 127.0.0.1 [OR]' + +host = 'http://127.0.0.1' +port = '0' # 0 == Pick a free port. +samsung_license_path = '/licenses/samsung.html' +ste_license_path = '/licenses/ste.html' +linaro_license_path = '/licenses/linaro.html' +samsung_test_file = '/android/~linaro-android/staging-origen/test.txt' +ste_test_file = '/android/~linaro-android/staging-snowball/173/target/product/snowball/test.txt' +ste_open_test_file = '/android/~linaro-android/staging-snowball/173/test.txt' +never_available = '/android/~linaro-android/staging-imx53/test.txt' +linaro_test_file = '/android/~linaro-android/staging-panda/test.txt' +not_protected_test_file = '/android/~linaro-android/staging-vexpress-a9/test.txt' +not_found_test_file = '/android/~linaro-android/staging-vexpress-a9/notfound.txt' +per_file_samsung_test_file = '/android/images/origen-blob.txt' +per_file_ste_test_file = '/android/images/snowball-blob.txt' +per_file_not_protected_test_file = '/android/images/MANIFEST' +dirs_only_dir = '/android/~linaro-android/' + + +class Contains(object): + '''Match if a string contains substring''' + def __init__(self, substr): + self.substr = substr + + def __str__(self): + return 'Contains(%s)' % (self.substr,) + + def match(self, actual): + for line in actual.splitlines(): + res = re.search(self.substr, line) + if res: + return None + return Mismatch("Initial string doesn't contain substring (%s)" % + self.substr) + + +class CommandNotFoundException(Exception): + ''' Unable to find command ''' + + +class NonZeroReturnValueException(Exception): + ''' Command exited with nonzero return value ''' + + +class TestLicense(TestCase): + '''Tests for accessing files and directories with license protection''' + + @classmethod + def setUpClass(cls): + global host + global port + if port == '0': + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(('127.0.0.1', 0)) + port = str(s.getsockname()[1]) + s.close() + host = host + ':' + port + shutil.copy("%s/apache2.conf.tmpl" % srvroot, "%s/apache2.conf" % + srvroot) + shutil.copy("%s/.htaccess" % docroot, "%s/dothtaccess" % docroot) + subprocess.Popen(['sed', '-i', 's/ServerRoot \"\"/ServerRoot \"%s\"/' + % srvroot.replace('/', '\/'), '%s/apache2.conf' % srvroot], + stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT).wait() + subprocess.Popen(['sed', '-i', 's/DocumentRoot \"\"/DocumentRoot ' + '\"%s\"/' % docroot.replace('/', '\/'), '%s/apache2.conf' + % srvroot], stdout=open('/dev/null', 'w'), + stderr=subprocess.STDOUT).wait() + subprocess.Popen(['sed', '-i', 's/Directory \"\"/Directory \"%s\"/' + % docroot.replace('/', '\/'), '%s/apache2.conf' % srvroot], + stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT).wait() + subprocess.Popen(['sed', '-i', 's/Listen/Listen %s/' % port, + '%s/apache2.conf' % srvroot], stdout=open('/dev/null', 'w'), + stderr=subprocess.STDOUT).wait() + if subprocess.Popen(['which', 'apache2'], + stdout=open('/dev/null', 'w'), + stderr=subprocess.STDOUT).wait(): + raise CommandNotFoundException("apache2 command not found. Please " + "install apache2 web server and rerun tests.") + args = shlex.split('apache2 -d %s -f apache2.conf -k start' % srvroot) + rc = subprocess.Popen(args, stdout=open('/dev/null', 'w'), + stderr=subprocess.STDOUT).wait() + if rc: + raise NonZeroReturnValueException("apache2 server exited with " + "error %s" % rc) + + @classmethod + def tearDownClass(cls): + if os.path.exists("%s/cookies.txt" % docroot): + os.unlink("%s/cookies.txt" % docroot) + args = shlex.split('apache2 -d %s -f apache2.conf -k stop' % srvroot) + subprocess.Popen(args, stdout=open('/dev/null', 'w'), + stderr=subprocess.STDOUT).wait() + if os.path.exists("%s/apache2.conf" % srvroot): + os.unlink("%s/apache2.conf" % srvroot) + if os.path.exists("%s/click_through_license_access.log" % srvroot): + os.unlink("%s/click_through_license_access.log" % srvroot) + if os.path.exists("%s/click_through_license_error.log" % srvroot): + os.unlink("%s/click_through_license_error.log" % srvroot) + if os.path.exists("%s/rewrite.log" % srvroot): + os.unlink("%s/rewrite.log" % srvroot) + os.rename("%s/dothtaccess" % docroot, "%s/.htaccess" % docroot) + + def setUp(self): + super(TestLicense, self).setUp() + global fetcher + fetcher = LicenseProtectedFileFetcher() + + def tearDown(self): + super(TestLicense, self).tearDown() + if isinstance(fetcher, LicenseProtectedFileFetcher): + fetcher.close() + if os.path.exists("%s/cookies.txt" % docroot): + os.unlink("%s/cookies.txt" % docroot) + + def test_licensefile_directly_samsung(self): + search = "Index of /" + testfile = fetcher.get(host + samsung_license_path) + self.assertThat(testfile, Contains(search)) + + def test_licensefile_directly_ste(self): + search = "Index of /" + testfile = fetcher.get(host + ste_license_path) + self.assertThat(testfile, Contains(search)) + + def test_licensefile_directly_linaro(self): + search = "Index of /" + testfile = fetcher.get(host + linaro_license_path) + self.assertThat(testfile, Contains(search)) + + def test_redirect_to_license_samsung(self): + search = "PLEASE READ THE FOLLOWING AGREEMENT CAREFULLY" + testfile = fetcher.get_or_return_license(host + samsung_test_file) + self.assertThat(testfile[0], Contains(search)) + + def test_redirect_to_license_ste(self): + search = "PLEASE READ THE FOLLOWING AGREEMENT CAREFULLY" + testfile = fetcher.get_or_return_license(host + ste_test_file) + self.assertThat(testfile[0], Contains(search)) + + def test_redirect_to_license_linaro(self): + search = "Linaro license." + testfile = fetcher.get_or_return_license(host + linaro_test_file) + self.assertThat(testfile[0], Contains(search)) + + def test_decline_license_samsung(self): + search = "License has not been accepted" + testfile = fetcher.get(host + samsung_test_file, accept_license=False) + self.assertThat(testfile, Contains(search)) + + def test_decline_license_ste(self): + search = "License has not been accepted" + testfile = fetcher.get(host + ste_test_file, accept_license=False) + self.assertThat(testfile, Contains(search)) + + def test_decline_license_linaro(self): + search = "License has not been accepted" + testfile = fetcher.get(host + linaro_test_file, accept_license=False) + self.assertThat(testfile, Contains(search)) + + def test_non_protected_dirs(self): + search = "This is always available." + testfile = fetcher.get(host + not_protected_test_file) + self.assertThat(testfile, Contains(search)) + + def test_never_available_dirs(self): + search = "Forbidden" + testfile = fetcher.get(host + never_available) + self.assertThat(testfile, Contains(search)) + + def test_accept_license_samsung_file(self): + search = "This is protected with click-through Samsung license." + testfile = fetcher.get(host + samsung_test_file) + fetcher.close() + if os.path.exists("%s/cookies.txt" % docroot): + os.rename("%s/cookies.txt" % docroot, + "%s/cookies.samsung" % docroot) + self.assertThat(testfile, Contains(search)) + + def test_accept_license_samsung_dir(self): + search = "Index of /android/~linaro-android/staging-origen" + testfile = fetcher.get(host + os.path.dirname(samsung_test_file)) + self.assertThat(testfile, Contains(search)) + + def test_accept_license_ste_file(self): + search = "This is protected with click-through ST-E license." + testfile = fetcher.get(host + ste_test_file) + fetcher.close() + if os.path.exists("%s/cookies.txt" % docroot): + os.rename("%s/cookies.txt" % docroot, "%s/cookies.ste" % docroot) + self.assertThat(testfile, Contains(search)) + + def test_accept_license_ste_dir(self): + search = "Index of /android/~linaro-android/staging-snowball" + testfile = fetcher.get(host + os.path.dirname(ste_test_file)) + self.assertThat(testfile, Contains(search)) + + def test_license_accepted_samsung(self): + search = "This is protected with click-through Samsung license." + os.rename("%s/cookies.samsung" % docroot, "%s/cookies.txt" % docroot) + testfile = fetcher.get(host + samsung_test_file) + self.assertThat(testfile, Contains(search)) + + def test_license_accepted_ste(self): + search = "This is protected with click-through ST-E license." + os.rename("%s/cookies.ste" % docroot, "%s/cookies.txt" % docroot) + testfile = fetcher.get(host + ste_test_file) + self.assertThat(testfile, Contains(search)) + + def test_internal_host_samsung(self): + search = "This is protected with click-through Samsung license." + subprocess.Popen(['sed', '-i', '/## Let internal hosts through ' + 'always./ a %s' % local_rewrite, '%s/.htaccess' % docroot], + stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT).wait() + testfile = fetcher.get(host + samsung_test_file, ignore_license=True) + shutil.copy("%s/dothtaccess" % docroot, "%s/.htaccess" % docroot) + self.assertThat(testfile, Contains(search)) + + def test_internal_host_ste(self): + search = "This is protected with click-through ST-E license." + subprocess.Popen(['sed', '-i', '/## Let internal hosts through ' + 'always./ a %s' % local_rewrite, '%s/.htaccess' % docroot], + stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT).wait() + testfile = fetcher.get(host + ste_test_file, ignore_license=True) + shutil.copy("%s/dothtaccess" % docroot, "%s/.htaccess" % docroot) + self.assertThat(testfile, Contains(search)) + + def test_ste_open_file(self): + search = "This is always available." + testfile = fetcher.get(host + ste_open_test_file) + self.assertThat(testfile, Contains(search)) + + def test_per_file_accept_license_samsung_file(self): + search = "This is protected with click-through Samsung license." + testfile = fetcher.get(host + per_file_samsung_test_file) + fetcher.close() + if os.path.exists("%s/cookies.txt" % docroot): + os.rename("%s/cookies.txt" % docroot, + "%s/cookies.samsung" % docroot) + self.assertThat(testfile, Contains(search)) + + def test_per_file_accept_license_ste_file(self): + search = "This is protected with click-through ST-E license." + testfile = fetcher.get(host + per_file_ste_test_file) + fetcher.close() + if os.path.exists("%s/cookies.txt" % docroot): + os.rename("%s/cookies.txt" % docroot, "%s/cookies.ste" % docroot) + self.assertThat(testfile, Contains(search)) + + def test_per_file_license_accepted_samsung(self): + search = "This is protected with click-through Samsung license." + os.rename("%s/cookies.samsung" % docroot, "%s/cookies.txt" % docroot) + testfile = fetcher.get(host + per_file_samsung_test_file, ignore_license=True) + self.assertThat(testfile, Contains(search)) + + def test_per_file_license_accepted_ste(self): + search = "This is protected with click-through ST-E license." + os.rename("%s/cookies.ste" % docroot, "%s/cookies.txt" % docroot) + testfile = fetcher.get(host + per_file_ste_test_file, ignore_license=True) + self.assertThat(testfile, Contains(search)) + + def test_per_file_non_protected_dirs(self): + search = "MANIFEST" + testfile = fetcher.get(host + per_file_not_protected_test_file) + self.assertThat(testfile, Contains(search)) + + def test_dir_containing_only_dirs(self): + search = "Index of /android/~linaro-android" + testfile = fetcher.get(host + dirs_only_dir) + self.assertThat(testfile, Contains(search)) + + def test_not_found_file(self): + search = "Not Found" + testfile = fetcher.get(host + not_found_test_file) + self.assertThat(testfile, Contains(search)) diff --git a/tests/test_php_unit.py b/tests/test_php_unit.py new file mode 100644 index 0000000..731d749 --- /dev/null +++ b/tests/test_php_unit.py @@ -0,0 +1,36 @@ +import os +import tempfile +import subprocess +import xml.etree.ElementTree + +from testtools import TestCase +from testtools.matchers import Equals +from testtools.matchers import AllMatch + +class PhpUnitTest(TestCase): + '''Tests for executing the PHP Unit tests''' + + def setUp(self): + super(PhpUnitTest, self).setUp() + self.xml_path = tempfile.mkstemp()[1] + if subprocess.Popen(['phpunit', '--log-junit', + self.xml_path, 'tests/LicenseHelperTest'], + stdout=open('/dev/null', 'w'), + stderr=subprocess.STDOUT).wait(): + raise CommandNotFoundException("phpunit command not found. Please " + "install phpunit package and rerun tests.") + self.xml_data = xml.etree.ElementTree.parse(self.xml_path) + + def tearDown(self): + super(PhpUnitTest, self).tearDown() + if os.path.exists(self.xml_path): + os.unlink(self.xml_path) + + def test_run_php_unit_tests(self): + self.assertThat( + [ + self.xml_data.getroot()[0].attrib['failures'], + self.xml_data.getroot()[0].attrib['errors'] + ], + AllMatch(Equals("0")) + ) diff --git a/tests/test_publish_to_snapshots.py b/tests/test_publish_to_snapshots.py new file mode 100644 index 0000000..870d404 --- /dev/null +++ b/tests/test_publish_to_snapshots.py @@ -0,0 +1,464 @@ +#!/usr/bin/env python + +import os +import sys +import shutil +import tempfile +import argparse +from StringIO import StringIO +from testtools import TestCase +from scripts.publish_to_snapshots import SnapshotsPublisher + +class TestSnapshotsPublisher(TestCase): + '''Tests for publishing files to the snapshots.l.o www are.''' + + uploads_path = "uploads/" + target_path = "www/" + orig_dir = os.getcwd() + + def setUp(self): + self.parser = argparse.ArgumentParser() + self.parser.add_argument("-t", "--job-type", dest="job_type") + self.parser.add_argument("-j", "--job-name", dest="job_name") + self.parser.add_argument("-n", "--build-num", dest="build_num", type=int) + self.parser.add_argument("-m", "--manifest", dest="manifest", action='store_true') + if not os.path.isdir(self.uploads_path): + os.mkdir(self.uploads_path) + + if not os.path.isdir(self.target_path): + os.mkdir(self.target_path) + super(TestSnapshotsPublisher, self).setUp() + + def tearDown(self): + os.chdir(self.orig_dir) + if os.path.isdir(self.uploads_path): + shutil.rmtree(self.uploads_path) + + if os.path.isdir(self.target_path): + shutil.rmtree(self.target_path) + super(TestSnapshotsPublisher, self).tearDown() + + def test_validate_args_valid_job_values(self): + self.publisher = SnapshotsPublisher() + param = self.parser.parse_args(['-t', 'android', '-j', 'dummy_job_name', + '-n', '1']) + self.publisher.validate_args(param) + param = self.parser.parse_args(['-t', 'kernel-hwpack', '-j', 'dummy_job_name', + '-n', '1']) + self.publisher.validate_args(param) + + param = self.parser.parse_args(['-t', 'prebuilt', '-j', 'dummy_job_name', + '-n', '1']) + self.publisher.validate_args(param) + param = self.parser.parse_args(['-t', 'ubuntu-hwpacks', '-j', 'dummy_job_name', + '-n', '1']) + self.publisher.validate_args(param) + param = self.parser.parse_args(['-t', 'ubuntu-images', '-j', 'dummy_job_name', + '-n', '1']) + self.publisher.validate_args(param) + param = self.parser.parse_args(['-t', 'ubuntu-restricted', '-j', 'dummy_job_name', + '-n', '1']) + self.publisher.validate_args(param) + param = self.parser.parse_args(['-t', 'ubuntu-sysroots', '-j', 'dummy_job_name', + '-n', '1']) + self.publisher.validate_args(param) + param = self.parser.parse_args(['-t', 'binaries', '-j', 'dummy_job_name', + '-n', '1']) + self.publisher.validate_args(param) + + def test_validate_args_invalid_job_type(self): + orig_stderr = sys.stderr + stderr = sys.stderr = StringIO() + self.publisher = SnapshotsPublisher() + param = self.parser.parse_args(['-t', 'invalid_job_type', '-j', 'dummy_job_name', + '-n', '1']) + try: + self.publisher.validate_args(param) + except SystemExit, err: + self.assertEqual(err.code, 2, "Expected result") + finally: + sys.stderr = orig_stderr + + stderr.seek(0) + self.assertIn("Invalid job type", stderr.read()) + + + def test_validate_args_run_invalid_argument(self): + orig_stderr = sys.stderr + stderr = sys.stderr = StringIO() + self.publisher = SnapshotsPublisher() + try: + param = self.parser.parse_args(['-a']) + self.publisher.validate_args(param) + except SystemExit, err: + self.assertEqual(err.code, 2, "Invalid argument passed") + finally: + sys.stderr = orig_stderr + + stderr.seek(0) + self.assertIn("unrecognized arguments: -a\n", stderr.read()) + + def test_validate_args_run_invalid_value(self): + orig_stderr = sys.stderr + stderr = sys.stderr = StringIO() + self.publisher = SnapshotsPublisher() + try: + param = self.parser.parse_args(['-n', "N"]) + self.publisher.validate_args(param) + except SystemExit, err: + self.assertEqual(err.code, 2, "Invalid value passed") + finally: + sys.stderr = orig_stderr + + stderr.seek(0) + self.assertIn("argument -n/--build-num: invalid int value: 'N'", + stderr.read()) + + def test_validate_args_run_none_values(self): + orig_stderr = sys.stderr + stderr = sys.stderr = StringIO() + self.publisher = SnapshotsPublisher() + try: + param = self.parser.parse_args(['-t', None , '-j', None , '-n' , 0]) + self.publisher.validate_args(param) + except SystemExit, err: + self.assertEqual(err.code, 2, "None values are not acceptable") + finally: + sys.stderr = orig_stderr + + stderr.seek(0) + self.assertIn("You must specify job-type, job-name and build-num", + stderr.read()) + + def test_validate_paths_invalid_uploads_path(self): + orig_stdout = sys.stdout + stdout = sys.stdout = StringIO() + self.publisher = SnapshotsPublisher() + param = self.parser.parse_args(['-t', 'android', '-j', 'dummy_job_name', + '-n', '1']) + + self.publisher.validate_args(param) + uploads_path = "./dummy_uploads_path" + try: + self.publisher.validate_paths(param, uploads_path, self.target_path) + finally: + sys.stdout = orig_stdout + + stdout.seek(0) + self.assertIn("Missing build path", stdout.read()) + + def test_validate_paths_invalid_target_path(self): + orig_stdout = sys.stdout + stdout = sys.stdout = StringIO() + self.publisher = SnapshotsPublisher() + param = self.parser.parse_args(['-t', 'android', '-j', 'dummy_job_name', + '-n', '1']) + + self.publisher.validate_args(param) + build_path = os.path.join(self.uploads_path, param.job_type, param.job_name, + str(param.build_num)) + os.makedirs(build_path) + self.target_path = "./dummy_target_path" + try: + self.publisher.validate_paths(param, self.uploads_path, self.target_path) + finally: + sys.stdout = orig_stdout + + stdout.seek(0) + self.assertIn("Missing target path", stdout.read()) + + def test_move_artifacts_kernel_successful_move(self): + orig_stdout = sys.stdout + stdout = sys.stdout = StringIO() + self.publisher = SnapshotsPublisher() + param = self.parser.parse_args(['-t', 'kernel-hwpack', '-j', 'dummy_job_name', + '-n', '1']) + self.publisher.validate_args(param) + build_path = os.path.join(self.uploads_path, param.job_type, param.job_name, + str(param.build_num)) + os.makedirs(build_path) + tempfile.mkstemp(dir=build_path) + try: + uploads_dir_path, target_dir_path = self.publisher.validate_paths(param, + self.uploads_path, self.target_path) + uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path) + target_dir_path = os.path.join(self.orig_dir, target_dir_path) + self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path) + finally: + sys.stdout = orig_stdout + pass + + stdout.seek(0) + self.assertIn("Moved the files from", stdout.read()) + + def test_move_artifacts_android_successful_move(self): + orig_stdout = sys.stdout + stdout = sys.stdout = StringIO() + self.publisher = SnapshotsPublisher() + param = self.parser.parse_args(['-t', 'android', '-j', 'dummy_job_name', + '-n', '1']) + self.publisher.validate_args(param) + build_dir = '/'.join([param.job_type, param.job_name, str(param.build_num)]) + build_path = os.path.join(self.uploads_path, build_dir) + os.makedirs(build_path) + tempfile.mkstemp(dir=build_path) + try: + uploads_dir_path, target_dir_path = self.publisher.validate_paths(param, + self.uploads_path, self.target_path) + uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path) + target_dir_path = os.path.join(self.orig_dir, target_dir_path) + self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path) + finally: + sys.stdout = orig_stdout + pass + + stdout.seek(0) + self.assertIn("Moved the files from", stdout.read()) + + def test_move_artifacts_prebuilt_successful_move(self): + orig_stdout = sys.stdout + stdout = sys.stdout = StringIO() + self.publisher = SnapshotsPublisher() + param = self.parser.parse_args(['-t', 'prebuilt', '-j', 'dummy_job_name', + '-n', '1']) + self.publisher.validate_args(param) + build_dir = '/'.join([param.job_name, str(param.build_num)]) + build_path = os.path.join(self.uploads_path, build_dir,'oneiric') + os.makedirs(build_path) + tempfile.mkstemp(dir=build_path) + try: + uploads_dir_path, target_dir_path = self.publisher.validate_paths(param, + self.uploads_path, self.target_path) + uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path) + target_dir_path = os.path.join(self.orig_dir, target_dir_path) + self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path) + finally: + sys.stdout = orig_stdout + pass + + stdout.seek(0) + self.assertIn("Moved the files from", stdout.read()) + + def test_move_artifacts_ubuntu_hwpacks_successful_move(self): + orig_stdout = sys.stdout + stdout = sys.stdout = StringIO() + self.publisher = SnapshotsPublisher() + param = self.parser.parse_args(['-t', 'ubuntu-hwpacks', '-j', + 'precise-armhf-lt-panda', '-n', '1']) + self.publisher.validate_args(param) + build_dir = '/'.join([param.job_name, str(param.build_num)]) + build_path = os.path.join(self.uploads_path, build_dir) + os.makedirs(build_path) + tempfile.mkstemp(dir=build_path) + try: + uploads_dir_path, target_dir_path = self.publisher.validate_paths(param, + self.uploads_path, self.target_path) + uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path) + target_dir_path = os.path.join(self.orig_dir, target_dir_path) + self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path) + finally: + sys.stdout = orig_stdout + pass + + stdout.seek(0) + self.assertIn("Moved the files from", stdout.read()) + + def test_move_artifacts_ubuntu_images_successful_move(self): + orig_stdout = sys.stdout + stdout = sys.stdout = StringIO() + self.publisher = SnapshotsPublisher() + param = self.parser.parse_args(['-t', 'ubuntu-images', '-j', + 'precise-armhf-ubuntu-desktop', '-n', '1']) + self.publisher.validate_args(param) + build_dir = '/'.join([param.job_name, str(param.build_num)]) + build_path = os.path.join(self.uploads_path, build_dir) + os.makedirs(build_path) + tempfile.mkstemp(dir=build_path) + try: + uploads_dir_path, target_dir_path = self.publisher.validate_paths(param, + self.uploads_path, self.target_path) + uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path) + target_dir_path = os.path.join(self.orig_dir, target_dir_path) + self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path) + finally: + sys.stdout = orig_stdout + pass + + stdout.seek(0) + self.assertIn("Moved the files from", stdout.read()) + + def test_move_artifacts_ubuntu_restricted_successful_move(self): + orig_stdout = sys.stdout + stdout = sys.stdout = StringIO() + self.publisher = SnapshotsPublisher() + param = self.parser.parse_args(['-t', 'ubuntu-restricted', '-j', + 'precise-armhf-integrated-big.little-fastmodels', + '-n', '1']) + self.publisher.validate_args(param) + build_dir = '/'.join([param.job_name, str(param.build_num)]) + build_path = os.path.join(self.uploads_path, build_dir) + os.makedirs(build_path) + tempfile.mkstemp(dir=build_path) + try: + uploads_dir_path, target_dir_path = self.publisher.validate_paths(param, + self.uploads_path, self.target_path) + uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path) + target_dir_path = os.path.join(self.orig_dir, target_dir_path) + self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path) + finally: + sys.stdout = orig_stdout + pass + + stdout.seek(0) + self.assertIn("Moved the files from", stdout.read()) + + def test_move_artifacts_ubuntu_sysroots_successful_move(self): + orig_stdout = sys.stdout + stdout = sys.stdout = StringIO() + self.publisher = SnapshotsPublisher() + param = self.parser.parse_args(['-t', 'ubuntu-sysroots', '-j', + 'precise-armhf-ubuntu-desktop-dev', '-n', '1']) + self.publisher.validate_args(param) + build_dir = '/'.join([param.job_name, str(param.build_num)]) + build_path = os.path.join(self.uploads_path, build_dir) + os.makedirs(build_path) + tempfile.mkstemp(dir=build_path) + try: + uploads_dir_path, target_dir_path = self.publisher.validate_paths(param, + self.uploads_path, self.target_path) + uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path) + target_dir_path = os.path.join(self.orig_dir, target_dir_path) + self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path) + finally: + sys.stdout = orig_stdout + pass + + stdout.seek(0) + self.assertIn("Moved the files from", stdout.read()) + + def test_move_artifacts_binaries_successful_move(self): + orig_stdout = sys.stdout + stdout = sys.stdout = StringIO() + self.publisher = SnapshotsPublisher() + param = self.parser.parse_args(['-t', 'binaries', '-j', + 'snowball-binary-update', '-n', '1']) + self.publisher.validate_args(param) + build_path = os.path.join(self.uploads_path, + param.job_name, + str(param.build_num)) + os.makedirs(build_path) + tempfile.mkstemp(dir=build_path) + ts_file = os.path.join(build_path, 'TIMESTAMP') + f = open(ts_file, "w") + f.write('20120416') + f.close() + try: + uploads_dir_path, target_dir_path = self.publisher.validate_paths(param, + self.uploads_path, + self.target_path) + uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path) + target_dir_path = os.path.join(self.orig_dir, target_dir_path) + self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path) + finally: + sys.stdout = orig_stdout + pass + + stdout.seek(0) + self.assertIn("Moved the files from", stdout.read()) + + def test_move_artifacts_android_successful_move(self): + orig_stdout = sys.stdout + stdout = sys.stdout = StringIO() + self.publisher = SnapshotsPublisher() + param = self.parser.parse_args(['-t', 'android', '-j', 'dummy_job_name', + '-n', '1']) + self.publisher.validate_args(param) + build_dir = '/'.join([param.job_type, param.job_name, str(param.build_num)]) + build_path = os.path.join(self.uploads_path, build_dir) + os.makedirs(build_path) + tempfile.mkstemp(dir=build_path) + try: + uploads_dir_path, target_dir_path = self.publisher.validate_paths(param, + self.uploads_path, self.target_path) + uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path) + target_dir_path = os.path.join(self.orig_dir, target_dir_path) + self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path) + finally: + sys.stdout = orig_stdout + pass + + stdout.seek(0) + self.assertIn("Moved the files from", stdout.read()) + + def test_create_symlink(self): + orig_stdout = sys.stdout + stdout = sys.stdout = StringIO() + self.publisher = SnapshotsPublisher() + param = self.parser.parse_args(['-t', 'android', '-j', 'dummy_job_name', + '-n', '1']) + self.publisher.validate_args(param) + build_dir = '/'.join([param.job_type, param.job_name, str(param.build_num)]) + build_path = os.path.join(self.uploads_path, build_dir) + os.makedirs(build_path) + tempfile.mkstemp(dir=build_path) + try: + uploads_dir_path, target_dir_path = self.publisher.validate_paths(param, + self.uploads_path, self.target_path) + uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path) + target_dir_path = os.path.join(self.orig_dir, target_dir_path) + self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path) + finally: + sys.stdout = orig_stdout + pass + + stdout.seek(0) + msg = "The latest build is now linked to " + target_dir_path + self.assertIn(msg, stdout.read()) + + def test_create_manifest_file_option(self): + orig_stdout = sys.stdout + stdout = sys.stdout = StringIO() + self.publisher = SnapshotsPublisher() + param = self.parser.parse_args(['-t', 'android', '-j', 'dummy_job_name', + '-n', '1', '-m']) + self.publisher.validate_args(param) + build_dir = '/'.join([param.job_type, param.job_name, str(param.build_num)]) + build_path = os.path.join(self.uploads_path, build_dir) + os.makedirs(build_path) + tempfile.mkstemp(dir=build_path) + lines = [] + try: + uploads_dir_path, target_dir_path = self.publisher.validate_paths(param, + self.uploads_path, self.target_path) + uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path) + target_dir_path = os.path.join(self.orig_dir, target_dir_path) + os.chdir(uploads_dir_path) + for path, subdirs, files in os.walk("."): + for name in files: + lines.append(os.path.join(path, name).split("./")[1] + "\n") + os.chdir(self.orig_dir) + self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path) + + manifest_file=os.path.join(target_dir_path, "MANIFEST") + dest = open(manifest_file, "r").read() + + if len(lines) != 0: + tempfiles = tempfile.mkstemp(dir=target_dir_path) + fd = open(tempfiles[1], "w+") + for line in lines: + fd.write(line) + fd.close() + orig = open(tempfiles[1], "r").read() + + except Exception, details: + pass + + finally: + sys.stdout = orig_stdout + + stdout.seek(0) + res_output = stdout.read() + self.assertIn("Moved the files from", res_output) + msg = "Manifest file " + manifest_file + " generated" + self.assertIn(msg, res_output) + self.assertTrue(orig == dest) |