aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorDanilo Šegan <danilo@segan.org>2012-05-17 20:24:36 +0200
committerDanilo Šegan <danilo@segan.org>2012-05-17 20:24:36 +0200
commitab13ebe1efe613cf026c82644bfefd360ce9b430 (patch)
treec3087be3232cfa01ffa8b8853c30e608c135ba1c /tests
parent686c39dbcc05c18acf87e3e106c3c26f8af10c39 (diff)
Rename "testing" subdirectory to "tests".
Diffstat (limited to 'tests')
-rw-r--r--tests/LicenseHelperTest.php167
-rw-r--r--tests/__init__.py16
-rw-r--r--tests/apache2.conf.tmpl117
-rw-r--r--tests/doctest_production_browser.py172
-rw-r--r--tests/license_protected_file_downloader.py287
-rw-r--r--tests/mime.types11
-rw-r--r--tests/test_click_through_license.py292
-rw-r--r--tests/test_php_unit.py36
-rw-r--r--tests/test_publish_to_snapshots.py464
9 files changed, 1562 insertions, 0 deletions
diff --git a/tests/LicenseHelperTest.php b/tests/LicenseHelperTest.php
new file mode 100644
index 0000000..b7bc855
--- /dev/null
+++ b/tests/LicenseHelperTest.php
@@ -0,0 +1,167 @@
+<?php
+
+require_once("licenses/LicenseHelper.php");
+
+class LicenseHelperTest extends PHPUnit_Framework_TestCase
+{
+
+ private $temp_filename;
+
+ /**
+ * Running checkFile on a directory path returns false.
+ */
+ public function test_checkFile_nonFile()
+ {
+ $this->assertFalse(LicenseHelper::checkFile(dirname(__FILE__)));
+ }
+
+ /**
+ * Running checkFile on a symbolic link to an existing file returns true.
+ */
+ public function test_checkFile_link()
+ {
+ $this->temp_filename = tempnam(sys_get_temp_dir(), "unittest");
+ symlink($this->temp_filename, "test_link");
+
+ $this->assertTrue(LicenseHelper::checkFile("test_link"));
+
+ unlink("test_link");
+ unlink($this->temp_filename);
+ }
+
+ /**
+ * Running checkFile on a broken symbolic link returns true.
+ * This is because PHP function is_link returns true on broken soft links.
+ */
+ public function test_checkFile_brokenLink()
+ {
+ $this->temp_filename = tempnam(sys_get_temp_dir(), "unittest");
+ symlink($this->temp_filename, "test_link");
+ unlink($this->temp_filename);
+
+ $this->assertTrue(LicenseHelper::checkFile("test_link"));
+
+ unlink("test_link");
+ }
+
+ /**
+ * Running checkFile on a regular file returns true.
+ */
+ public function test_checkFile_file()
+ {
+ $this->assertTrue(LicenseHelper::checkFile(__FILE__));
+ }
+
+ /**
+ * getFileList throws an InvalidArgumentException when passed
+ * an argument pointing to a file.
+ * @expectedException InvalidArgumentException
+ */
+ public function test_getFilesList_file()
+ {
+ $file_list = LicenseHelper::getFilesList(__FILE__);
+ }
+
+ /**
+ * getFileList returns a list of filenames in that directory.
+ */
+ public function test_getFilesList_dir()
+ {
+ $temp_dir_name = tempnam(sys_get_temp_dir(), "unittest");
+ if (file_exists($temp_dir_name)) {
+ unlink($temp_dir_name);
+ }
+ mkdir($temp_dir_name);
+
+ touch($temp_dir_name . "/unittest1");
+ touch($temp_dir_name . "/unittest2");
+
+ $file_list = LicenseHelper::getFilesList($temp_dir_name);
+ $this->assertCount(2, $file_list);
+
+ // Sort the file list, this function returns the files as they are
+ // written on the filesystem.
+ sort($file_list);
+ $this->assertEquals("unittest1", $file_list[0]);
+ $this->assertEquals("unittest2", $file_list[1]);
+
+ unlink($temp_dir_name . "/unittest1");
+ unlink($temp_dir_name . "/unittest2");
+ rmdir($temp_dir_name);
+ }
+
+ /**
+ * Running findFileByPattern on an array without matches returns false.
+ */
+ public function test_findFileByPattern_noMatch()
+ {
+ $file_list = array("test.txt", "new_file.pdf");
+ $pattern = "/^abc/";
+ $this->assertFalse(
+ LicenseHelper::findFileByPattern($file_list, $pattern));
+ }
+
+ /**
+ * Running findFileByPattern on an array with matches returns first
+ * matching element.
+ */
+ public function test_findFileByPattern_match()
+ {
+ $file_list = array("test.txt", "new_file.pdf");
+ $pattern = "/test/";
+ $this->assertEquals("test.txt",
+ LicenseHelper::findFileByPattern($file_list,
+ $pattern));
+ }
+
+ /**
+ * getTheme returns a ST-E Linaro-branded template when
+ * no EULA is present (indicated by eula filename being named
+ * EULA.txt or not).
+ */
+ public function test_getTheme_noEula_snowball()
+ {
+ $eula = "EULA.txt";
+ $filename = "snowball.build.tar.bz2";
+ $this->assertEquals("ste", LicenseHelper::getTheme($eula, $filename));
+ }
+
+ /**
+ * getTheme returns a Samsung Linaro-branded template when
+ * no EULA is present (indicated by eula filename being named
+ * EULA.txt or not).
+ */
+ public function test_getTheme_noEula_origen()
+ {
+ $eula = "EULA.txt";
+ $filename = "origen.build.tar.bz2";
+ $this->assertEquals("samsung",
+ LicenseHelper::getTheme($eula, $filename));
+ }
+
+ /**
+ * getTheme returns a generic Linaro-branded template when
+ * no EULA is present (indicated by eula filename being named
+ * EULA.txt or not).
+ */
+ public function test_getTheme_noEula_generic()
+ {
+ $eula = "EULA.txt";
+ $filename = "build.tar.bz2";
+ $this->assertEquals("linaro",
+ LicenseHelper::getTheme($eula, $filename));
+ }
+
+ /**
+ * Running getTheme with eula file present (indicated by eula filename
+ * being named EULA.txt or not) returns extension of eula file.
+ */
+ public function test_getTheme_eula()
+ {
+ $eula = "EULA.txt.test";
+ $this->assertEquals("test", LicenseHelper::getTheme($eula, ""));
+ }
+
+}
+
+?> \ No newline at end of file
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..a9da24d
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,16 @@
+import os
+import unittest
+
+from tests.test_click_through_license import *
+from tests.test_publish_to_snapshots import *
+
+
+def test_suite():
+ module_names = [
+ 'tests.test_click_through_license.TestLicense',
+ 'tests.test_publish_to_snapshots.TestSnapshotsPublisher',
+ 'tests.test_php_unit.PhpUnitTest',
+ ]
+ loader = unittest.TestLoader()
+ suite = loader.loadTestsFromNames(module_names)
+ return suite
diff --git a/tests/apache2.conf.tmpl b/tests/apache2.conf.tmpl
new file mode 100644
index 0000000..4cfd851
--- /dev/null
+++ b/tests/apache2.conf.tmpl
@@ -0,0 +1,117 @@
+ServerRoot ""
+
+LockFile click_through_license.lock
+PidFile click_through_license.pid
+
+Timeout 300
+
+KeepAlive On
+
+KeepAliveTimeout 5
+
+Listen
+
+ServerAdmin you@example.com
+
+#ServerName www.example.com:80
+
+DocumentRoot ""
+
+RewriteLog rewrite.log
+RewriteLogLevel 7
+
+AccessFileName .htaccess
+
+LoadModule authz_host_module /usr/lib/apache2/modules/mod_authz_host.so
+LoadModule cache_module /usr/lib/apache2/modules/mod_cache.so
+LoadModule mime_module /usr/lib/apache2/modules/mod_mime.so
+LoadModule autoindex_module /usr/lib/apache2/modules/mod_autoindex.so
+LoadModule rewrite_module /usr/lib/apache2/modules/mod_rewrite.so
+LoadModule php5_module /usr/lib/apache2/modules/libphp5.so
+
+User daemon
+Group daemon
+
+<IfModule mpm_prefork_module>
+ StartServers 5
+ MinSpareServers 5
+ MaxSpareServers 10
+ MaxClients 150
+ MaxRequestsPerChild 0
+</IfModule>
+
+<IfModule mpm_worker_module>
+ StartServers 2
+ MinSpareThreads 25
+ MaxSpareThreads 75.
+ ThreadLimit 64
+ ThreadsPerChild 25
+ MaxClients 150
+ MaxRequestsPerChild 0
+</IfModule>
+
+<IfModule mpm_event_module>
+ StartServers 2
+ MinSpareThreads 25
+ MaxSpareThreads 75.
+ ThreadLimit 64
+ ThreadsPerChild 25
+ MaxClients 150
+ MaxRequestsPerChild 0
+</IfModule>
+
+<Directory />
+ Options +FollowSymLinks
+ AllowOverride All
+ Order deny,allow
+ Deny from all
+</Directory>
+
+<Directory "">
+ Options Indexes +FollowSymLinks MultiViews
+ AllowOverride All
+ Order allow,deny
+ Allow from all
+</Directory>
+
+<IfModule dir_module>
+ DirectoryIndex index.html
+</IfModule>
+
+<FilesMatch "^\.ht">
+ Order allow,deny
+ Deny from all
+ Satisfy All
+</FilesMatch>
+
+ErrorLog "click_through_license_error.log"
+
+LogLevel warn
+
+<IfModule log_config_module>
+ LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\"" combined
+ LogFormat "%h %l %u %t \"%r\" %>s %b" common
+
+ <IfModule logio_module>
+ LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\" %I %O" combinedio
+ </IfModule>
+
+ CustomLog "click_through_license_access.log" common
+
+</IfModule>
+
+<IfModule mod_php5.c>
+ <FilesMatch "\.ph(p3?|tml)$">
+ SetHandler application/x-httpd-php
+ </FilesMatch>
+ <FilesMatch "\.phps$">
+ SetHandler application/x-httpd-php-source
+ </FilesMatch>
+</IfModule>
+
+DefaultType text/plain
+
+IndexOptions FancyIndexing VersionSort HTMLTable NameWidth=* DescriptionWidth=* Charset=UTF-8
+ReadmeName README.html
+HeaderName HEADER.html
+IndexIgnore .??* *~ *# RCS CVS *,v *,t.
diff --git a/tests/doctest_production_browser.py b/tests/doctest_production_browser.py
new file mode 100644
index 0000000..0b5a157
--- /dev/null
+++ b/tests/doctest_production_browser.py
@@ -0,0 +1,172 @@
+from BeautifulSoup import BeautifulSoup
+
+from license_protected_file_downloader import LicenseProtectedFileFetcher
+
+
+class EmptyDirectoryException(Exception):
+ ''' Directory at the current URL is empty. '''
+
+
+class NoLicenseException(Exception):
+ ''' No license protecting the file. '''
+
+
+class UnexpectedLicenseException(Exception):
+ ''' License protecting non-licensed the file. '''
+
+
+class DoctestProductionBrowser():
+ """Doctest production testing browser class."""
+
+ def __init__(self, host_address):
+ self.host_address = host_address
+ self.current_url = host_address
+ self.fetcher = LicenseProtectedFileFetcher()
+
+ def is_dir(self, link):
+ """Check if the link is a directory."""
+ return link[-1] == "/"
+
+ def get_header(self):
+ """Get header from the current url."""
+ return self.parse_header(self.fetcher.get_headers(self.current_url))
+
+ def get_license_text(self):
+ """Get license from the current URL if it redirects to license."""
+ license = self.fetcher.get_or_return_license(self.current_url)
+ if license[0]:
+ return license[0]
+ else:
+ raise NoLicenseException("License expected here.")
+
+ def get_unprotected_file_header(self):
+ """Get headers from unprotected file."""
+ page = self.fetcher.get_or_return_license(self.current_url)
+ # Check if license with accept and decline links is returned.
+ if len(page) == 3:
+ raise UnexpectedLicenseException("License not expected here.")
+ else:
+ return self.parse_header(self.fetcher.header)
+
+ def get_content(self):
+ """Get contents from the current url."""
+ return self.fetcher.get(self.current_url)
+
+ def get_content_title(self):
+ """Get content title from the current url."""
+ return self.get_title(self.fetcher.get(self.current_url))
+
+ def get_header_when_redirected(self):
+ """Get header when the client is redirected to the license."""
+ self.fetcher.get(self.current_url)
+ return self.parse_header(self.fetcher.header)
+
+ def accept_license_get_header(self):
+ """Accept license and get header of the file it redirects to."""
+ license = self.fetcher.get_or_return_license(self.current_url)
+ # Second element in result is the accept link.
+ if license[1]:
+ self.fetcher.get_protected_file(license[1], self.current_url)
+ return self.parse_header(self.fetcher.header)
+ else:
+ raise NoLicenseException("License expected here.")
+
+ def decline_license(self):
+ """Decline license. Return title of the page."""
+ return self.get_title(
+ self.fetcher.get(self.current_url, accept_license=False)
+ )
+
+ def parse_header(self, header):
+ """Formats headers from dict form to the multi-line string."""
+ header_str = ""
+ for key in sorted(header.iterkeys()):
+ header_str += "%s: %s\n" % (key, header[key])
+ return header_str
+
+ def get_title(self, html):
+ soup = BeautifulSoup(html)
+ titles_all = soup.findAll('title')
+ if len(titles_all) > 0:
+ return titles_all[0].contents[0]
+ else:
+ return ""
+
+ def browse_to_relative(self, path):
+ """Change current url relatively."""
+ self.current_url += path
+
+ def browse_to_absolute(self, path):
+ """Change current url to specified path."""
+ self.current_url = self.host_address + path
+
+ def browse_to_next(self, condition):
+ """Browse to next dir/build file that matches condition.
+
+ Set the current URL to to match the condition among the
+ links in the current page with priority to build files.
+ If there's no match, set link to build file if present.
+ Otherwise, set link to first directory present.
+ """
+ links = self.find_links(self.get_content())
+ link = self.find_link_with_condition(links, condition)
+ if not link:
+ # No link matching condition, get first build in list.
+ link = self.find_build_tar_bz2(links)
+ if not link:
+ # Still no link, just get first dir in list.
+ link = self.find_directory(links)
+ if not link:
+ # We found page with no directories nor builds.
+ raise EmptyDirectoryException("Directory is empty.")
+
+ self.browse_to_relative(link)
+
+ def find_links(self, html):
+ """Return list of links on the page with special conditions.
+
+ Return all links below the "Parent directory" link.
+ Return whole list if there is no such link.
+ """
+ soup = BeautifulSoup(html)
+ links_all = soup.findAll('a')
+ had_parent = False
+ links = []
+ for link in links_all:
+ if had_parent:
+ links.append(link.get("href"))
+ if link.contents[0] == "Parent Directory":
+ had_parent = True
+
+ if had_parent:
+ return links
+ else:
+ return [each.get('href') for each in links_all]
+
+ def find_link_with_condition(self, links, condition):
+ """Finds a link which satisfies the condition.
+
+ Condition is actually to contain the string from the list.
+ Build files (which end in .tar.bz2) have the priority.
+ """
+ for link in links:
+ if condition in link and link[-7:] == "tar.bz2":
+ return link
+ for link in links:
+ if condition in link:
+ return link
+ return None
+
+ def find_directory(self, links):
+ """Finds a directory among list of links."""
+ for link in links:
+ if self.is_dir(link):
+ return link
+ return None
+
+ def find_build_tar_bz2(self, links):
+ """Finds a file list of links which ends in tar.bz2."""
+ for link in links:
+ if link[-7:] == "tar.bz2":
+ return link
+ return None
diff --git a/tests/license_protected_file_downloader.py b/tests/license_protected_file_downloader.py
new file mode 100644
index 0000000..5726a56
--- /dev/null
+++ b/tests/license_protected_file_downloader.py
@@ -0,0 +1,287 @@
+#!/usr/bin/env python
+
+import argparse
+import os
+import pycurl
+import re
+import urlparse
+import html2text
+from BeautifulSoup import BeautifulSoup
+
+
+class LicenseProtectedFileFetcher:
+ """Fetch a file from the web that may be protected by a license redirect
+
+ This is designed to run on snapshots.linaro.org. License HTML file are in
+ the form:
+
+ <vendor>.html has a link to <vendor>-accept.html
+
+ If self.get is pointed at a file that has to go through one of these
+ licenses, it should be able to automatically accept the license and
+ download the file.
+
+ Once a license has been accepted, it will be used for all following
+ downloads.
+
+ If self.close() is called before the object is deleted, cURL will store
+ the license accept cookie to cookies.txt, so it can be used for later
+ downloads.
+
+ """
+ def __init__(self, cookie_file="cookies.txt"):
+ """Set up cURL"""
+ self.curl = pycurl.Curl()
+ self.curl.setopt(pycurl.WRITEFUNCTION, self._write_body)
+ self.curl.setopt(pycurl.HEADERFUNCTION, self._write_header)
+ self.curl.setopt(pycurl.FOLLOWLOCATION, 1)
+ self.curl.setopt(pycurl.COOKIEFILE, cookie_file)
+ self.curl.setopt(pycurl.COOKIEJAR, cookie_file)
+ self.file_out = None
+
+ def _get(self, url):
+ """Clear out header and body storage, fetch URL, filling them in."""
+ url = url.encode("ascii")
+ self.curl.setopt(pycurl.URL, url)
+
+ self.body = ""
+ self.header = ""
+
+ if self.file_name:
+ self.file_out = open(self.file_name, 'w')
+ else:
+ self.file_out = None
+
+ self.curl.perform()
+ self._parse_headers(url)
+
+ if self.file_out:
+ self.file_out.close()
+
+ def _parse_headers(self, url):
+ header = {}
+ for line in self.header.splitlines():
+ # Header lines typically are of the form thing: value...
+ test_line = re.search("^(.*?)\s*:\s*(.*)$", line)
+
+ if test_line:
+ header[test_line.group(1)] = test_line.group(2)
+
+ # The location attribute is sometimes relative, but we would
+ # like to have it as always absolute...
+ if 'Location' in header:
+ parsed_location = urlparse.urlparse(header["Location"])
+
+ # If not an absolute location...
+ if not parsed_location.netloc:
+ parsed_source_url = urlparse.urlparse(url)
+ new_location = ["", "", "", "", ""]
+
+ new_location[0] = parsed_source_url.scheme
+ new_location[1] = parsed_source_url.netloc
+ new_location[2] = header["Location"]
+
+ # Update location with absolute URL
+ header["Location"] = urlparse.urlunsplit(new_location)
+
+ self.header_text = self.header
+ self.header = header
+
+ def get_headers(self, url):
+ url = url.encode("ascii")
+ self.curl.setopt(pycurl.URL, url)
+
+ self.body = ""
+ self.header = ""
+
+ # Setting NOBODY causes CURL to just fetch the header.
+ self.curl.setopt(pycurl.NOBODY, True)
+ self.curl.perform()
+ self.curl.setopt(pycurl.NOBODY, False)
+
+ self._parse_headers(url)
+
+ return self.header
+
+ def get_or_return_license(self, url, file_name=None):
+ """Get file at the requested URL or, if behind a license, return that.
+
+ If the URL provided does not redirect us to a license, then return the
+ body of that file. If we are redirected to a license click through
+ then return (the license as plain text, url to accept the license).
+
+ If the user of this function accepts the license, then they should
+ call get_protected_file."""
+
+ self.file_name = file_name
+
+ # Get the license details. If this returns None, the file isn't license
+ # protected and we can just return the file we started to get in the
+ # function (self.body).
+ license_details = self._get_license(url)
+
+ if license_details:
+ return license_details
+
+ return self.body
+
+ def get(self, url, file_name=None, ignore_license=False,
+ accept_license=True):
+ """Fetch the requested URL, accepting licenses
+
+ Fetches the file at url. If a redirect is encountered, it is
+ expected to be to a license that has an accept link. Follow that link,
+ then download the original file. Returns the fist 1MB of the file
+ (see _write_body).
+
+ """
+
+ self.file_name = file_name
+ if ignore_license:
+ self._get(url)
+ return self.body
+
+ license_details = self._get_license(url)
+
+ if license_details:
+ # Found a license.
+ if accept_license:
+ # Accept the license without looking at it and
+ # start fetching the file we originally wanted.
+ accept_url = license_details[1]
+ self.get_protected_file(accept_url, url)
+ else:
+ # We want to decline the license and return the notice.
+ decline_url = license_details[2]
+ self._get(decline_url)
+
+ else:
+ # If we got here, there wasn't a license protecting the file
+ # so we just fetch it.
+ self._get(url)
+
+ return self.body
+
+ def _get_license(self, url):
+ """Return (license, accept URL, decline URL) if found,
+ else return None.
+
+ """
+
+ self.get_headers(url)
+
+ if "Location" in self.header and self.header["Location"] != url:
+ # We have been redirected to a new location - the license file
+ location = self.header["Location"]
+
+ # Fetch the license HTML
+ self._get(location)
+
+ # Get the file from the URL (full path)
+ file = urlparse.urlparse(location).path
+
+ # Get the file without the rest of the path
+ file = os.path.split(file)[-1]
+
+ # Look for a link with accepted.html in the page name. Follow it.
+ accept_search, decline_search = None, None
+ for line in self.body.splitlines():
+ if not accept_search:
+ accept_search = re.search(
+ """href=.*?["'](.*?-accepted.html)""",
+ line)
+ if not decline_search:
+ decline_search = re.search(
+ """href=.*?["'](.*?-declined.html)""",
+ line)
+
+ if accept_search and decline_search:
+ # Have found license accept URL!
+ new_file = accept_search.group(1)
+ accept_url = re.sub(file, new_file, location)
+
+ # Found decline URL as well.
+ new_file_decline = decline_search.group(1)
+ decline_url = re.sub(file, new_file_decline, location)
+
+ # Parse the HTML using BeautifulSoup
+ soup = BeautifulSoup(self.body)
+
+ # The license is in a div with the ID license-text, so we
+ # use this to pull just the license out of the HTML.
+ html_license = u""
+ for chunk in soup.findAll(id="license-text"):
+ # Output of chunk.prettify is UTF8, but comes back
+ # as a str, so convert it here.
+ html_license += chunk.prettify().decode("utf-8")
+
+ text_license = html2text.html2text(html_license)
+
+ return text_license, accept_url, decline_url
+
+ return None
+
+ def get_protected_file(self, accept_url, url):
+ """Gets the file redirected to by the accept_url"""
+
+ self._get(accept_url) # Accept the license
+
+ if not("Location" in self.header and self.header["Location"] == url):
+ # If we got here, we don't have the file yet (weren't redirected
+ # to it). Fetch our target file. This should work now that we have
+ # the right cookie.
+ self._get(url) # Download the target file
+
+ return self.body
+
+ def _write_body(self, buf):
+ """Used by curl as a sink for body content"""
+
+ # If we have a target file to write to, write to it
+ if self.file_out:
+ self.file_out.write(buf)
+
+ # Only buffer first 1MB of body. This should be plenty for anything
+ # we wish to parse internally.
+ if len(self.body) < 1024 * 1024 * 1024:
+ # XXX Would be nice to stop keeping the file in RAM at all and
+ # passing large buffers around. Perhaps only keep in RAM if
+ # file_name == None? (used for getting directory listings
+ # normally).
+ self.body += buf
+
+ def _write_header(self, buf):
+ """Used by curl as a sink for header content"""
+ self.header += buf
+
+ def register_progress_callback(self, callback):
+ self.curl.setopt(pycurl.NOPROGRESS, 0)
+ self.curl.setopt(pycurl.PROGRESSFUNCTION, callback)
+
+ def close(self):
+ """Wrapper to close curl - this will allow curl to write out cookies"""
+ self.curl.close()
+
+
+def main():
+ """Download file specified on command line"""
+ parser = argparse.ArgumentParser(description="Download a file, accepting "
+ "any licenses required to do so.")
+
+ parser.add_argument('url', metavar="URL", type=str, nargs=1,
+ help="URL of file to download.")
+
+ args = parser.parse_args()
+
+ fetcher = LicenseProtectedFileFetcher()
+
+ # Get file name from URL
+ file_name = os.path.basename(urlparse.urlparse(args.url[0]).path)
+ if not file_name:
+ file_name = "downloaded"
+ fetcher.get(args.url[0], file_name)
+
+ fetcher.close()
+
+if __name__ == "__main__":
+ main()
diff --git a/tests/mime.types b/tests/mime.types
new file mode 100644
index 0000000..c4a3984
--- /dev/null
+++ b/tests/mime.types
@@ -0,0 +1,11 @@
+text/css css
+text/directory
+text/html html htm shtml
+text/plain asc txt text pot brf
+text/uri-list
+application/x-httpd-php phtml pht php
+application/x-httpd-php-source phps
+application/x-httpd-php3 php3
+application/x-httpd-php3-preprocessed php3p
+application/x-httpd-php4 php4
+application/x-httpd-php5 php5
diff --git a/tests/test_click_through_license.py b/tests/test_click_through_license.py
new file mode 100644
index 0000000..0f8643a
--- /dev/null
+++ b/tests/test_click_through_license.py
@@ -0,0 +1,292 @@
+#!/usr/bin/env python
+
+import re
+import os
+import shutil
+import shlex
+import subprocess
+import socket
+
+from testtools import TestCase
+from testtools.matchers import Mismatch
+from license_protected_file_downloader import LicenseProtectedFileFetcher
+
+fetcher = LicenseProtectedFileFetcher()
+cwd = os.getcwd()
+docroot = cwd
+srvroot = os.path.abspath(os.path.join(*([cwd] + ['tests'])))
+local_rewrite = 'RewriteCond %{REMOTE_ADDR} 127.0.0.1 [OR]'
+
+host = 'http://127.0.0.1'
+port = '0' # 0 == Pick a free port.
+samsung_license_path = '/licenses/samsung.html'
+ste_license_path = '/licenses/ste.html'
+linaro_license_path = '/licenses/linaro.html'
+samsung_test_file = '/android/~linaro-android/staging-origen/test.txt'
+ste_test_file = '/android/~linaro-android/staging-snowball/173/target/product/snowball/test.txt'
+ste_open_test_file = '/android/~linaro-android/staging-snowball/173/test.txt'
+never_available = '/android/~linaro-android/staging-imx53/test.txt'
+linaro_test_file = '/android/~linaro-android/staging-panda/test.txt'
+not_protected_test_file = '/android/~linaro-android/staging-vexpress-a9/test.txt'
+not_found_test_file = '/android/~linaro-android/staging-vexpress-a9/notfound.txt'
+per_file_samsung_test_file = '/android/images/origen-blob.txt'
+per_file_ste_test_file = '/android/images/snowball-blob.txt'
+per_file_not_protected_test_file = '/android/images/MANIFEST'
+dirs_only_dir = '/android/~linaro-android/'
+
+
+class Contains(object):
+ '''Match if a string contains substring'''
+ def __init__(self, substr):
+ self.substr = substr
+
+ def __str__(self):
+ return 'Contains(%s)' % (self.substr,)
+
+ def match(self, actual):
+ for line in actual.splitlines():
+ res = re.search(self.substr, line)
+ if res:
+ return None
+ return Mismatch("Initial string doesn't contain substring (%s)" %
+ self.substr)
+
+
+class CommandNotFoundException(Exception):
+ ''' Unable to find command '''
+
+
+class NonZeroReturnValueException(Exception):
+ ''' Command exited with nonzero return value '''
+
+
+class TestLicense(TestCase):
+ '''Tests for accessing files and directories with license protection'''
+
+ @classmethod
+ def setUpClass(cls):
+ global host
+ global port
+ if port == '0':
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.bind(('127.0.0.1', 0))
+ port = str(s.getsockname()[1])
+ s.close()
+ host = host + ':' + port
+ shutil.copy("%s/apache2.conf.tmpl" % srvroot, "%s/apache2.conf" %
+ srvroot)
+ shutil.copy("%s/.htaccess" % docroot, "%s/dothtaccess" % docroot)
+ subprocess.Popen(['sed', '-i', 's/ServerRoot \"\"/ServerRoot \"%s\"/'
+ % srvroot.replace('/', '\/'), '%s/apache2.conf' % srvroot],
+ stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT).wait()
+ subprocess.Popen(['sed', '-i', 's/DocumentRoot \"\"/DocumentRoot '
+ '\"%s\"/' % docroot.replace('/', '\/'), '%s/apache2.conf'
+ % srvroot], stdout=open('/dev/null', 'w'),
+ stderr=subprocess.STDOUT).wait()
+ subprocess.Popen(['sed', '-i', 's/Directory \"\"/Directory \"%s\"/'
+ % docroot.replace('/', '\/'), '%s/apache2.conf' % srvroot],
+ stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT).wait()
+ subprocess.Popen(['sed', '-i', 's/Listen/Listen %s/' % port,
+ '%s/apache2.conf' % srvroot], stdout=open('/dev/null', 'w'),
+ stderr=subprocess.STDOUT).wait()
+ if subprocess.Popen(['which', 'apache2'],
+ stdout=open('/dev/null', 'w'),
+ stderr=subprocess.STDOUT).wait():
+ raise CommandNotFoundException("apache2 command not found. Please "
+ "install apache2 web server and rerun tests.")
+ args = shlex.split('apache2 -d %s -f apache2.conf -k start' % srvroot)
+ rc = subprocess.Popen(args, stdout=open('/dev/null', 'w'),
+ stderr=subprocess.STDOUT).wait()
+ if rc:
+ raise NonZeroReturnValueException("apache2 server exited with "
+ "error %s" % rc)
+
+ @classmethod
+ def tearDownClass(cls):
+ if os.path.exists("%s/cookies.txt" % docroot):
+ os.unlink("%s/cookies.txt" % docroot)
+ args = shlex.split('apache2 -d %s -f apache2.conf -k stop' % srvroot)
+ subprocess.Popen(args, stdout=open('/dev/null', 'w'),
+ stderr=subprocess.STDOUT).wait()
+ if os.path.exists("%s/apache2.conf" % srvroot):
+ os.unlink("%s/apache2.conf" % srvroot)
+ if os.path.exists("%s/click_through_license_access.log" % srvroot):
+ os.unlink("%s/click_through_license_access.log" % srvroot)
+ if os.path.exists("%s/click_through_license_error.log" % srvroot):
+ os.unlink("%s/click_through_license_error.log" % srvroot)
+ if os.path.exists("%s/rewrite.log" % srvroot):
+ os.unlink("%s/rewrite.log" % srvroot)
+ os.rename("%s/dothtaccess" % docroot, "%s/.htaccess" % docroot)
+
+ def setUp(self):
+ super(TestLicense, self).setUp()
+ global fetcher
+ fetcher = LicenseProtectedFileFetcher()
+
+ def tearDown(self):
+ super(TestLicense, self).tearDown()
+ if isinstance(fetcher, LicenseProtectedFileFetcher):
+ fetcher.close()
+ if os.path.exists("%s/cookies.txt" % docroot):
+ os.unlink("%s/cookies.txt" % docroot)
+
+ def test_licensefile_directly_samsung(self):
+ search = "Index of /"
+ testfile = fetcher.get(host + samsung_license_path)
+ self.assertThat(testfile, Contains(search))
+
+ def test_licensefile_directly_ste(self):
+ search = "Index of /"
+ testfile = fetcher.get(host + ste_license_path)
+ self.assertThat(testfile, Contains(search))
+
+ def test_licensefile_directly_linaro(self):
+ search = "Index of /"
+ testfile = fetcher.get(host + linaro_license_path)
+ self.assertThat(testfile, Contains(search))
+
+ def test_redirect_to_license_samsung(self):
+ search = "PLEASE READ THE FOLLOWING AGREEMENT CAREFULLY"
+ testfile = fetcher.get_or_return_license(host + samsung_test_file)
+ self.assertThat(testfile[0], Contains(search))
+
+ def test_redirect_to_license_ste(self):
+ search = "PLEASE READ THE FOLLOWING AGREEMENT CAREFULLY"
+ testfile = fetcher.get_or_return_license(host + ste_test_file)
+ self.assertThat(testfile[0], Contains(search))
+
+ def test_redirect_to_license_linaro(self):
+ search = "Linaro license."
+ testfile = fetcher.get_or_return_license(host + linaro_test_file)
+ self.assertThat(testfile[0], Contains(search))
+
+ def test_decline_license_samsung(self):
+ search = "License has not been accepted"
+ testfile = fetcher.get(host + samsung_test_file, accept_license=False)
+ self.assertThat(testfile, Contains(search))
+
+ def test_decline_license_ste(self):
+ search = "License has not been accepted"
+ testfile = fetcher.get(host + ste_test_file, accept_license=False)
+ self.assertThat(testfile, Contains(search))
+
+ def test_decline_license_linaro(self):
+ search = "License has not been accepted"
+ testfile = fetcher.get(host + linaro_test_file, accept_license=False)
+ self.assertThat(testfile, Contains(search))
+
+ def test_non_protected_dirs(self):
+ search = "This is always available."
+ testfile = fetcher.get(host + not_protected_test_file)
+ self.assertThat(testfile, Contains(search))
+
+ def test_never_available_dirs(self):
+ search = "Forbidden"
+ testfile = fetcher.get(host + never_available)
+ self.assertThat(testfile, Contains(search))
+
+ def test_accept_license_samsung_file(self):
+ search = "This is protected with click-through Samsung license."
+ testfile = fetcher.get(host + samsung_test_file)
+ fetcher.close()
+ if os.path.exists("%s/cookies.txt" % docroot):
+ os.rename("%s/cookies.txt" % docroot,
+ "%s/cookies.samsung" % docroot)
+ self.assertThat(testfile, Contains(search))
+
+ def test_accept_license_samsung_dir(self):
+ search = "Index of /android/~linaro-android/staging-origen"
+ testfile = fetcher.get(host + os.path.dirname(samsung_test_file))
+ self.assertThat(testfile, Contains(search))
+
+ def test_accept_license_ste_file(self):
+ search = "This is protected with click-through ST-E license."
+ testfile = fetcher.get(host + ste_test_file)
+ fetcher.close()
+ if os.path.exists("%s/cookies.txt" % docroot):
+ os.rename("%s/cookies.txt" % docroot, "%s/cookies.ste" % docroot)
+ self.assertThat(testfile, Contains(search))
+
+ def test_accept_license_ste_dir(self):
+ search = "Index of /android/~linaro-android/staging-snowball"
+ testfile = fetcher.get(host + os.path.dirname(ste_test_file))
+ self.assertThat(testfile, Contains(search))
+
+ def test_license_accepted_samsung(self):
+ search = "This is protected with click-through Samsung license."
+ os.rename("%s/cookies.samsung" % docroot, "%s/cookies.txt" % docroot)
+ testfile = fetcher.get(host + samsung_test_file)
+ self.assertThat(testfile, Contains(search))
+
+ def test_license_accepted_ste(self):
+ search = "This is protected with click-through ST-E license."
+ os.rename("%s/cookies.ste" % docroot, "%s/cookies.txt" % docroot)
+ testfile = fetcher.get(host + ste_test_file)
+ self.assertThat(testfile, Contains(search))
+
+ def test_internal_host_samsung(self):
+ search = "This is protected with click-through Samsung license."
+ subprocess.Popen(['sed', '-i', '/## Let internal hosts through '
+ 'always./ a %s' % local_rewrite, '%s/.htaccess' % docroot],
+ stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT).wait()
+ testfile = fetcher.get(host + samsung_test_file, ignore_license=True)
+ shutil.copy("%s/dothtaccess" % docroot, "%s/.htaccess" % docroot)
+ self.assertThat(testfile, Contains(search))
+
+ def test_internal_host_ste(self):
+ search = "This is protected with click-through ST-E license."
+ subprocess.Popen(['sed', '-i', '/## Let internal hosts through '
+ 'always./ a %s' % local_rewrite, '%s/.htaccess' % docroot],
+ stdout=open('/dev/null', 'w'), stderr=subprocess.STDOUT).wait()
+ testfile = fetcher.get(host + ste_test_file, ignore_license=True)
+ shutil.copy("%s/dothtaccess" % docroot, "%s/.htaccess" % docroot)
+ self.assertThat(testfile, Contains(search))
+
+ def test_ste_open_file(self):
+ search = "This is always available."
+ testfile = fetcher.get(host + ste_open_test_file)
+ self.assertThat(testfile, Contains(search))
+
+ def test_per_file_accept_license_samsung_file(self):
+ search = "This is protected with click-through Samsung license."
+ testfile = fetcher.get(host + per_file_samsung_test_file)
+ fetcher.close()
+ if os.path.exists("%s/cookies.txt" % docroot):
+ os.rename("%s/cookies.txt" % docroot,
+ "%s/cookies.samsung" % docroot)
+ self.assertThat(testfile, Contains(search))
+
+ def test_per_file_accept_license_ste_file(self):
+ search = "This is protected with click-through ST-E license."
+ testfile = fetcher.get(host + per_file_ste_test_file)
+ fetcher.close()
+ if os.path.exists("%s/cookies.txt" % docroot):
+ os.rename("%s/cookies.txt" % docroot, "%s/cookies.ste" % docroot)
+ self.assertThat(testfile, Contains(search))
+
+ def test_per_file_license_accepted_samsung(self):
+ search = "This is protected with click-through Samsung license."
+ os.rename("%s/cookies.samsung" % docroot, "%s/cookies.txt" % docroot)
+ testfile = fetcher.get(host + per_file_samsung_test_file, ignore_license=True)
+ self.assertThat(testfile, Contains(search))
+
+ def test_per_file_license_accepted_ste(self):
+ search = "This is protected with click-through ST-E license."
+ os.rename("%s/cookies.ste" % docroot, "%s/cookies.txt" % docroot)
+ testfile = fetcher.get(host + per_file_ste_test_file, ignore_license=True)
+ self.assertThat(testfile, Contains(search))
+
+ def test_per_file_non_protected_dirs(self):
+ search = "MANIFEST"
+ testfile = fetcher.get(host + per_file_not_protected_test_file)
+ self.assertThat(testfile, Contains(search))
+
+ def test_dir_containing_only_dirs(self):
+ search = "Index of /android/~linaro-android"
+ testfile = fetcher.get(host + dirs_only_dir)
+ self.assertThat(testfile, Contains(search))
+
+ def test_not_found_file(self):
+ search = "Not Found"
+ testfile = fetcher.get(host + not_found_test_file)
+ self.assertThat(testfile, Contains(search))
diff --git a/tests/test_php_unit.py b/tests/test_php_unit.py
new file mode 100644
index 0000000..731d749
--- /dev/null
+++ b/tests/test_php_unit.py
@@ -0,0 +1,36 @@
+import os
+import tempfile
+import subprocess
+import xml.etree.ElementTree
+
+from testtools import TestCase
+from testtools.matchers import Equals
+from testtools.matchers import AllMatch
+
+class PhpUnitTest(TestCase):
+ '''Tests for executing the PHP Unit tests'''
+
+ def setUp(self):
+ super(PhpUnitTest, self).setUp()
+ self.xml_path = tempfile.mkstemp()[1]
+ if subprocess.Popen(['phpunit', '--log-junit',
+ self.xml_path, 'tests/LicenseHelperTest'],
+ stdout=open('/dev/null', 'w'),
+ stderr=subprocess.STDOUT).wait():
+ raise CommandNotFoundException("phpunit command not found. Please "
+ "install phpunit package and rerun tests.")
+ self.xml_data = xml.etree.ElementTree.parse(self.xml_path)
+
+ def tearDown(self):
+ super(PhpUnitTest, self).tearDown()
+ if os.path.exists(self.xml_path):
+ os.unlink(self.xml_path)
+
+ def test_run_php_unit_tests(self):
+ self.assertThat(
+ [
+ self.xml_data.getroot()[0].attrib['failures'],
+ self.xml_data.getroot()[0].attrib['errors']
+ ],
+ AllMatch(Equals("0"))
+ )
diff --git a/tests/test_publish_to_snapshots.py b/tests/test_publish_to_snapshots.py
new file mode 100644
index 0000000..870d404
--- /dev/null
+++ b/tests/test_publish_to_snapshots.py
@@ -0,0 +1,464 @@
+#!/usr/bin/env python
+
+import os
+import sys
+import shutil
+import tempfile
+import argparse
+from StringIO import StringIO
+from testtools import TestCase
+from scripts.publish_to_snapshots import SnapshotsPublisher
+
+class TestSnapshotsPublisher(TestCase):
+ '''Tests for publishing files to the snapshots.l.o www are.'''
+
+ uploads_path = "uploads/"
+ target_path = "www/"
+ orig_dir = os.getcwd()
+
+ def setUp(self):
+ self.parser = argparse.ArgumentParser()
+ self.parser.add_argument("-t", "--job-type", dest="job_type")
+ self.parser.add_argument("-j", "--job-name", dest="job_name")
+ self.parser.add_argument("-n", "--build-num", dest="build_num", type=int)
+ self.parser.add_argument("-m", "--manifest", dest="manifest", action='store_true')
+ if not os.path.isdir(self.uploads_path):
+ os.mkdir(self.uploads_path)
+
+ if not os.path.isdir(self.target_path):
+ os.mkdir(self.target_path)
+ super(TestSnapshotsPublisher, self).setUp()
+
+ def tearDown(self):
+ os.chdir(self.orig_dir)
+ if os.path.isdir(self.uploads_path):
+ shutil.rmtree(self.uploads_path)
+
+ if os.path.isdir(self.target_path):
+ shutil.rmtree(self.target_path)
+ super(TestSnapshotsPublisher, self).tearDown()
+
+ def test_validate_args_valid_job_values(self):
+ self.publisher = SnapshotsPublisher()
+ param = self.parser.parse_args(['-t', 'android', '-j', 'dummy_job_name',
+ '-n', '1'])
+ self.publisher.validate_args(param)
+ param = self.parser.parse_args(['-t', 'kernel-hwpack', '-j', 'dummy_job_name',
+ '-n', '1'])
+ self.publisher.validate_args(param)
+
+ param = self.parser.parse_args(['-t', 'prebuilt', '-j', 'dummy_job_name',
+ '-n', '1'])
+ self.publisher.validate_args(param)
+ param = self.parser.parse_args(['-t', 'ubuntu-hwpacks', '-j', 'dummy_job_name',
+ '-n', '1'])
+ self.publisher.validate_args(param)
+ param = self.parser.parse_args(['-t', 'ubuntu-images', '-j', 'dummy_job_name',
+ '-n', '1'])
+ self.publisher.validate_args(param)
+ param = self.parser.parse_args(['-t', 'ubuntu-restricted', '-j', 'dummy_job_name',
+ '-n', '1'])
+ self.publisher.validate_args(param)
+ param = self.parser.parse_args(['-t', 'ubuntu-sysroots', '-j', 'dummy_job_name',
+ '-n', '1'])
+ self.publisher.validate_args(param)
+ param = self.parser.parse_args(['-t', 'binaries', '-j', 'dummy_job_name',
+ '-n', '1'])
+ self.publisher.validate_args(param)
+
+ def test_validate_args_invalid_job_type(self):
+ orig_stderr = sys.stderr
+ stderr = sys.stderr = StringIO()
+ self.publisher = SnapshotsPublisher()
+ param = self.parser.parse_args(['-t', 'invalid_job_type', '-j', 'dummy_job_name',
+ '-n', '1'])
+ try:
+ self.publisher.validate_args(param)
+ except SystemExit, err:
+ self.assertEqual(err.code, 2, "Expected result")
+ finally:
+ sys.stderr = orig_stderr
+
+ stderr.seek(0)
+ self.assertIn("Invalid job type", stderr.read())
+
+
+ def test_validate_args_run_invalid_argument(self):
+ orig_stderr = sys.stderr
+ stderr = sys.stderr = StringIO()
+ self.publisher = SnapshotsPublisher()
+ try:
+ param = self.parser.parse_args(['-a'])
+ self.publisher.validate_args(param)
+ except SystemExit, err:
+ self.assertEqual(err.code, 2, "Invalid argument passed")
+ finally:
+ sys.stderr = orig_stderr
+
+ stderr.seek(0)
+ self.assertIn("unrecognized arguments: -a\n", stderr.read())
+
+ def test_validate_args_run_invalid_value(self):
+ orig_stderr = sys.stderr
+ stderr = sys.stderr = StringIO()
+ self.publisher = SnapshotsPublisher()
+ try:
+ param = self.parser.parse_args(['-n', "N"])
+ self.publisher.validate_args(param)
+ except SystemExit, err:
+ self.assertEqual(err.code, 2, "Invalid value passed")
+ finally:
+ sys.stderr = orig_stderr
+
+ stderr.seek(0)
+ self.assertIn("argument -n/--build-num: invalid int value: 'N'",
+ stderr.read())
+
+ def test_validate_args_run_none_values(self):
+ orig_stderr = sys.stderr
+ stderr = sys.stderr = StringIO()
+ self.publisher = SnapshotsPublisher()
+ try:
+ param = self.parser.parse_args(['-t', None , '-j', None , '-n' , 0])
+ self.publisher.validate_args(param)
+ except SystemExit, err:
+ self.assertEqual(err.code, 2, "None values are not acceptable")
+ finally:
+ sys.stderr = orig_stderr
+
+ stderr.seek(0)
+ self.assertIn("You must specify job-type, job-name and build-num",
+ stderr.read())
+
+ def test_validate_paths_invalid_uploads_path(self):
+ orig_stdout = sys.stdout
+ stdout = sys.stdout = StringIO()
+ self.publisher = SnapshotsPublisher()
+ param = self.parser.parse_args(['-t', 'android', '-j', 'dummy_job_name',
+ '-n', '1'])
+
+ self.publisher.validate_args(param)
+ uploads_path = "./dummy_uploads_path"
+ try:
+ self.publisher.validate_paths(param, uploads_path, self.target_path)
+ finally:
+ sys.stdout = orig_stdout
+
+ stdout.seek(0)
+ self.assertIn("Missing build path", stdout.read())
+
+ def test_validate_paths_invalid_target_path(self):
+ orig_stdout = sys.stdout
+ stdout = sys.stdout = StringIO()
+ self.publisher = SnapshotsPublisher()
+ param = self.parser.parse_args(['-t', 'android', '-j', 'dummy_job_name',
+ '-n', '1'])
+
+ self.publisher.validate_args(param)
+ build_path = os.path.join(self.uploads_path, param.job_type, param.job_name,
+ str(param.build_num))
+ os.makedirs(build_path)
+ self.target_path = "./dummy_target_path"
+ try:
+ self.publisher.validate_paths(param, self.uploads_path, self.target_path)
+ finally:
+ sys.stdout = orig_stdout
+
+ stdout.seek(0)
+ self.assertIn("Missing target path", stdout.read())
+
+ def test_move_artifacts_kernel_successful_move(self):
+ orig_stdout = sys.stdout
+ stdout = sys.stdout = StringIO()
+ self.publisher = SnapshotsPublisher()
+ param = self.parser.parse_args(['-t', 'kernel-hwpack', '-j', 'dummy_job_name',
+ '-n', '1'])
+ self.publisher.validate_args(param)
+ build_path = os.path.join(self.uploads_path, param.job_type, param.job_name,
+ str(param.build_num))
+ os.makedirs(build_path)
+ tempfile.mkstemp(dir=build_path)
+ try:
+ uploads_dir_path, target_dir_path = self.publisher.validate_paths(param,
+ self.uploads_path, self.target_path)
+ uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path)
+ target_dir_path = os.path.join(self.orig_dir, target_dir_path)
+ self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path)
+ finally:
+ sys.stdout = orig_stdout
+ pass
+
+ stdout.seek(0)
+ self.assertIn("Moved the files from", stdout.read())
+
+ def test_move_artifacts_android_successful_move(self):
+ orig_stdout = sys.stdout
+ stdout = sys.stdout = StringIO()
+ self.publisher = SnapshotsPublisher()
+ param = self.parser.parse_args(['-t', 'android', '-j', 'dummy_job_name',
+ '-n', '1'])
+ self.publisher.validate_args(param)
+ build_dir = '/'.join([param.job_type, param.job_name, str(param.build_num)])
+ build_path = os.path.join(self.uploads_path, build_dir)
+ os.makedirs(build_path)
+ tempfile.mkstemp(dir=build_path)
+ try:
+ uploads_dir_path, target_dir_path = self.publisher.validate_paths(param,
+ self.uploads_path, self.target_path)
+ uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path)
+ target_dir_path = os.path.join(self.orig_dir, target_dir_path)
+ self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path)
+ finally:
+ sys.stdout = orig_stdout
+ pass
+
+ stdout.seek(0)
+ self.assertIn("Moved the files from", stdout.read())
+
+ def test_move_artifacts_prebuilt_successful_move(self):
+ orig_stdout = sys.stdout
+ stdout = sys.stdout = StringIO()
+ self.publisher = SnapshotsPublisher()
+ param = self.parser.parse_args(['-t', 'prebuilt', '-j', 'dummy_job_name',
+ '-n', '1'])
+ self.publisher.validate_args(param)
+ build_dir = '/'.join([param.job_name, str(param.build_num)])
+ build_path = os.path.join(self.uploads_path, build_dir,'oneiric')
+ os.makedirs(build_path)
+ tempfile.mkstemp(dir=build_path)
+ try:
+ uploads_dir_path, target_dir_path = self.publisher.validate_paths(param,
+ self.uploads_path, self.target_path)
+ uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path)
+ target_dir_path = os.path.join(self.orig_dir, target_dir_path)
+ self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path)
+ finally:
+ sys.stdout = orig_stdout
+ pass
+
+ stdout.seek(0)
+ self.assertIn("Moved the files from", stdout.read())
+
+ def test_move_artifacts_ubuntu_hwpacks_successful_move(self):
+ orig_stdout = sys.stdout
+ stdout = sys.stdout = StringIO()
+ self.publisher = SnapshotsPublisher()
+ param = self.parser.parse_args(['-t', 'ubuntu-hwpacks', '-j',
+ 'precise-armhf-lt-panda', '-n', '1'])
+ self.publisher.validate_args(param)
+ build_dir = '/'.join([param.job_name, str(param.build_num)])
+ build_path = os.path.join(self.uploads_path, build_dir)
+ os.makedirs(build_path)
+ tempfile.mkstemp(dir=build_path)
+ try:
+ uploads_dir_path, target_dir_path = self.publisher.validate_paths(param,
+ self.uploads_path, self.target_path)
+ uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path)
+ target_dir_path = os.path.join(self.orig_dir, target_dir_path)
+ self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path)
+ finally:
+ sys.stdout = orig_stdout
+ pass
+
+ stdout.seek(0)
+ self.assertIn("Moved the files from", stdout.read())
+
+ def test_move_artifacts_ubuntu_images_successful_move(self):
+ orig_stdout = sys.stdout
+ stdout = sys.stdout = StringIO()
+ self.publisher = SnapshotsPublisher()
+ param = self.parser.parse_args(['-t', 'ubuntu-images', '-j',
+ 'precise-armhf-ubuntu-desktop', '-n', '1'])
+ self.publisher.validate_args(param)
+ build_dir = '/'.join([param.job_name, str(param.build_num)])
+ build_path = os.path.join(self.uploads_path, build_dir)
+ os.makedirs(build_path)
+ tempfile.mkstemp(dir=build_path)
+ try:
+ uploads_dir_path, target_dir_path = self.publisher.validate_paths(param,
+ self.uploads_path, self.target_path)
+ uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path)
+ target_dir_path = os.path.join(self.orig_dir, target_dir_path)
+ self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path)
+ finally:
+ sys.stdout = orig_stdout
+ pass
+
+ stdout.seek(0)
+ self.assertIn("Moved the files from", stdout.read())
+
+ def test_move_artifacts_ubuntu_restricted_successful_move(self):
+ orig_stdout = sys.stdout
+ stdout = sys.stdout = StringIO()
+ self.publisher = SnapshotsPublisher()
+ param = self.parser.parse_args(['-t', 'ubuntu-restricted', '-j',
+ 'precise-armhf-integrated-big.little-fastmodels',
+ '-n', '1'])
+ self.publisher.validate_args(param)
+ build_dir = '/'.join([param.job_name, str(param.build_num)])
+ build_path = os.path.join(self.uploads_path, build_dir)
+ os.makedirs(build_path)
+ tempfile.mkstemp(dir=build_path)
+ try:
+ uploads_dir_path, target_dir_path = self.publisher.validate_paths(param,
+ self.uploads_path, self.target_path)
+ uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path)
+ target_dir_path = os.path.join(self.orig_dir, target_dir_path)
+ self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path)
+ finally:
+ sys.stdout = orig_stdout
+ pass
+
+ stdout.seek(0)
+ self.assertIn("Moved the files from", stdout.read())
+
+ def test_move_artifacts_ubuntu_sysroots_successful_move(self):
+ orig_stdout = sys.stdout
+ stdout = sys.stdout = StringIO()
+ self.publisher = SnapshotsPublisher()
+ param = self.parser.parse_args(['-t', 'ubuntu-sysroots', '-j',
+ 'precise-armhf-ubuntu-desktop-dev', '-n', '1'])
+ self.publisher.validate_args(param)
+ build_dir = '/'.join([param.job_name, str(param.build_num)])
+ build_path = os.path.join(self.uploads_path, build_dir)
+ os.makedirs(build_path)
+ tempfile.mkstemp(dir=build_path)
+ try:
+ uploads_dir_path, target_dir_path = self.publisher.validate_paths(param,
+ self.uploads_path, self.target_path)
+ uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path)
+ target_dir_path = os.path.join(self.orig_dir, target_dir_path)
+ self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path)
+ finally:
+ sys.stdout = orig_stdout
+ pass
+
+ stdout.seek(0)
+ self.assertIn("Moved the files from", stdout.read())
+
+ def test_move_artifacts_binaries_successful_move(self):
+ orig_stdout = sys.stdout
+ stdout = sys.stdout = StringIO()
+ self.publisher = SnapshotsPublisher()
+ param = self.parser.parse_args(['-t', 'binaries', '-j',
+ 'snowball-binary-update', '-n', '1'])
+ self.publisher.validate_args(param)
+ build_path = os.path.join(self.uploads_path,
+ param.job_name,
+ str(param.build_num))
+ os.makedirs(build_path)
+ tempfile.mkstemp(dir=build_path)
+ ts_file = os.path.join(build_path, 'TIMESTAMP')
+ f = open(ts_file, "w")
+ f.write('20120416')
+ f.close()
+ try:
+ uploads_dir_path, target_dir_path = self.publisher.validate_paths(param,
+ self.uploads_path,
+ self.target_path)
+ uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path)
+ target_dir_path = os.path.join(self.orig_dir, target_dir_path)
+ self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path)
+ finally:
+ sys.stdout = orig_stdout
+ pass
+
+ stdout.seek(0)
+ self.assertIn("Moved the files from", stdout.read())
+
+ def test_move_artifacts_android_successful_move(self):
+ orig_stdout = sys.stdout
+ stdout = sys.stdout = StringIO()
+ self.publisher = SnapshotsPublisher()
+ param = self.parser.parse_args(['-t', 'android', '-j', 'dummy_job_name',
+ '-n', '1'])
+ self.publisher.validate_args(param)
+ build_dir = '/'.join([param.job_type, param.job_name, str(param.build_num)])
+ build_path = os.path.join(self.uploads_path, build_dir)
+ os.makedirs(build_path)
+ tempfile.mkstemp(dir=build_path)
+ try:
+ uploads_dir_path, target_dir_path = self.publisher.validate_paths(param,
+ self.uploads_path, self.target_path)
+ uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path)
+ target_dir_path = os.path.join(self.orig_dir, target_dir_path)
+ self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path)
+ finally:
+ sys.stdout = orig_stdout
+ pass
+
+ stdout.seek(0)
+ self.assertIn("Moved the files from", stdout.read())
+
+ def test_create_symlink(self):
+ orig_stdout = sys.stdout
+ stdout = sys.stdout = StringIO()
+ self.publisher = SnapshotsPublisher()
+ param = self.parser.parse_args(['-t', 'android', '-j', 'dummy_job_name',
+ '-n', '1'])
+ self.publisher.validate_args(param)
+ build_dir = '/'.join([param.job_type, param.job_name, str(param.build_num)])
+ build_path = os.path.join(self.uploads_path, build_dir)
+ os.makedirs(build_path)
+ tempfile.mkstemp(dir=build_path)
+ try:
+ uploads_dir_path, target_dir_path = self.publisher.validate_paths(param,
+ self.uploads_path, self.target_path)
+ uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path)
+ target_dir_path = os.path.join(self.orig_dir, target_dir_path)
+ self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path)
+ finally:
+ sys.stdout = orig_stdout
+ pass
+
+ stdout.seek(0)
+ msg = "The latest build is now linked to " + target_dir_path
+ self.assertIn(msg, stdout.read())
+
+ def test_create_manifest_file_option(self):
+ orig_stdout = sys.stdout
+ stdout = sys.stdout = StringIO()
+ self.publisher = SnapshotsPublisher()
+ param = self.parser.parse_args(['-t', 'android', '-j', 'dummy_job_name',
+ '-n', '1', '-m'])
+ self.publisher.validate_args(param)
+ build_dir = '/'.join([param.job_type, param.job_name, str(param.build_num)])
+ build_path = os.path.join(self.uploads_path, build_dir)
+ os.makedirs(build_path)
+ tempfile.mkstemp(dir=build_path)
+ lines = []
+ try:
+ uploads_dir_path, target_dir_path = self.publisher.validate_paths(param,
+ self.uploads_path, self.target_path)
+ uploads_dir_path = os.path.join(self.orig_dir, uploads_dir_path)
+ target_dir_path = os.path.join(self.orig_dir, target_dir_path)
+ os.chdir(uploads_dir_path)
+ for path, subdirs, files in os.walk("."):
+ for name in files:
+ lines.append(os.path.join(path, name).split("./")[1] + "\n")
+ os.chdir(self.orig_dir)
+ self.publisher.move_artifacts(param, uploads_dir_path, target_dir_path)
+
+ manifest_file=os.path.join(target_dir_path, "MANIFEST")
+ dest = open(manifest_file, "r").read()
+
+ if len(lines) != 0:
+ tempfiles = tempfile.mkstemp(dir=target_dir_path)
+ fd = open(tempfiles[1], "w+")
+ for line in lines:
+ fd.write(line)
+ fd.close()
+ orig = open(tempfiles[1], "r").read()
+
+ except Exception, details:
+ pass
+
+ finally:
+ sys.stdout = orig_stdout
+
+ stdout.seek(0)
+ res_output = stdout.read()
+ self.assertIn("Moved the files from", res_output)
+ msg = "Manifest file " + manifest_file + " generated"
+ self.assertIn(msg, res_output)
+ self.assertTrue(orig == dest)