diff options
author | Stevan Radakovic <stevan.radakovic@linaro.org> | 2012-05-11 14:30:47 +0200 |
---|---|---|
committer | Stevan Radakovic <stevan.radakovic@linaro.org> | 2012-05-11 14:30:47 +0200 |
commit | 01d5aa7173c16cf91946ab2c7b35f8feb3ff9f60 (patch) | |
tree | 38a8a6456cfc1bbc3189f545c423300582d119d1 | |
parent | 716594af8fddd535a0c36a9daf60c791cd010ac1 (diff) | |
parent | 77c14230b6835ff48e02cfb574bb84ab83ed5b43 (diff) |
Update filefether to latest version James implemented, fix bug 996002. Reviewed by danilo, dooferlad.
-rw-r--r-- | .htaccess | 4 | ||||
-rw-r--r-- | testing/filefetcher.py | 129 | ||||
-rw-r--r-- | testing/license_protected_file_downloader.py | 284 | ||||
-rw-r--r-- | testing/test_click_through_license.py | 24 |
4 files changed, 298 insertions, 143 deletions
@@ -13,12 +13,12 @@ RewriteRule (.*\/)(.*) - [E=LP_DOWNLOAD_DIR:%1] ## without port number for use in cookie domain RewriteCond %{SERVER_PORT} !^80$ [OR] RewriteCond %{SERVER_PORT} !^443$ -RewriteCond %{HTTP_HOST} (.*)(\:.*) +RewriteCond %{HTTP_HOST} ^([^:]*)$ RewriteRule .* - [E=CO_DOMAIN:%1] RewriteCond %{SERVER_PORT} !^80$ [OR] RewriteCond %{SERVER_PORT} !^443$ -RewriteCond %{HTTP_HOST} (^.*$) +RewriteCond %{HTTP_HOST} ^([^:]*):(.*)$ RewriteRule .* - [E=CO_DOMAIN:%1] ## Let internal hosts through always. diff --git a/testing/filefetcher.py b/testing/filefetcher.py deleted file mode 100644 index d14e9f0..0000000 --- a/testing/filefetcher.py +++ /dev/null @@ -1,129 +0,0 @@ -#!/usr/bin/env python - -# Changes required to address EULA for the origen hwpacks - -import argparse -import os -import pycurl -import re -import urlparse - - -class LicenseProtectedFileFetcher: - """Fetch a file from the web that may be protected by a license redirect - - This is designed to run on snapshots.linaro.org. License HTML file are in - the form: - - <vendor>.html has a link to <vendor>-accept.html - - If self.get is pointed at a file that has to go through one of these - licenses, it should be able to automatically accept the license and - download the file. - - Once a license has been accepted, it will be used for all following - downloads. - - If self.close() is called before the object is deleted, cURL will store - the license accept cookie to cookies.txt, so it can be used for later - downloads. - - """ - def __init__(self): - """Set up cURL""" - self.curl = pycurl.Curl() - self.curl.setopt(pycurl.FOLLOWLOCATION, 1) - self.curl.setopt(pycurl.WRITEFUNCTION, self._write_body) - self.curl.setopt(pycurl.HEADERFUNCTION, self._write_header) - self.curl.setopt(pycurl.COOKIEFILE, "cookies.txt") - self.curl.setopt(pycurl.COOKIEJAR, "cookies.txt") - - def _get(self, url): - """Clear out header and body storage, fetch URL, filling them in.""" - self.curl.setopt(pycurl.URL, url) - - self.body = "" - self.header = "" - - self.curl.perform() - - def get(self, url, ignore_license=False, accept_license=True): - """Fetch the requested URL, ignoring license at all or - accepting or declining licenses, returns file body. - - Fetches the file at url. If a redirect is encountered, it is - expected to be to a license that has an accept or decline link. - Follow that link, then download original file or nolicense notice. - - """ - self._get(url) - - if ignore_license: - return self.body - - location = self._get_location() - if location: - # Off to the races - we have been redirected. - # Expect to find a link to self.location with -accepted or - # -declined inserted before the .html, - # i.e. ste.html -> ste-accepted.html - - # Get the file from the URL (full path) - file = urlparse.urlparse(location).path - - # Get the file without the rest of the path - file = os.path.split(file)[-1] - - # Look for a link with accepted.html or declined.html - # in the page name. Follow it. - new_file = None - for line in self.body.splitlines(): - if accept_license: - link_search = re.search("""href=.*?["'](.*?-accepted.html)""", - line) - else: - link_search = re.search("""href=.*?["'](.*?-declined.html)""", - line) - if link_search: - # Have found license decline URL! - new_file = link_search.group(1) - - if new_file: - # accept or decline the license... - next_url = re.sub(file, new_file, location) - self._get(next_url) - - # The above get *should* take us to the file requested via - # a redirect. If we manually need to follow that redirect, - # do that now. - - if accept_license and self._get_location(): - # If we haven't been redirected to our original file, - # we should be able to just download it now. - self._get(url) - - return self.body - - def _search_header(self, field): - """Search header for the supplied field, return field / None""" - for line in self.header.splitlines(): - search = re.search(field + ":\s+(.*?)$", line) - if search: - return search.group(1) - return None - - def _get_location(self): - """Return content of Location field in header / None""" - return self._search_header("Location") - - def _write_body(self, buf): - """Used by curl as a sink for body content""" - self.body += buf - - def _write_header(self, buf): - """Used by curl as a sink for header content""" - self.header += buf - - def close(self): - """Wrapper to close curl - this will allow curl to write out cookies""" - self.curl.close() diff --git a/testing/license_protected_file_downloader.py b/testing/license_protected_file_downloader.py new file mode 100644 index 0000000..2a3bfff --- /dev/null +++ b/testing/license_protected_file_downloader.py @@ -0,0 +1,284 @@ +#!/usr/bin/env python + +import argparse +import os +import pycurl +import re +import urlparse +import html2text +from BeautifulSoup import BeautifulSoup + +class LicenseProtectedFileFetcher: + """Fetch a file from the web that may be protected by a license redirect + + This is designed to run on snapshots.linaro.org. License HTML file are in + the form: + + <vendor>.html has a link to <vendor>-accept.html + + If self.get is pointed at a file that has to go through one of these + licenses, it should be able to automatically accept the license and + download the file. + + Once a license has been accepted, it will be used for all following + downloads. + + If self.close() is called before the object is deleted, cURL will store + the license accept cookie to cookies.txt, so it can be used for later + downloads. + + """ + def __init__(self, cookie_file="cookies.txt"): + """Set up cURL""" + self.curl = pycurl.Curl() + self.curl.setopt(pycurl.WRITEFUNCTION, self._write_body) + self.curl.setopt(pycurl.HEADERFUNCTION, self._write_header) + self.curl.setopt(pycurl.FOLLOWLOCATION, 1) + self.curl.setopt(pycurl.COOKIEFILE, cookie_file) + self.curl.setopt(pycurl.COOKIEJAR, cookie_file) + self.file_out = None + + def _get(self, url): + """Clear out header and body storage, fetch URL, filling them in.""" + url = url.encode("ascii") + self.curl.setopt(pycurl.URL, url) + + self.body = "" + self.header = "" + + if self.file_name: + self.file_out = open(self.file_name, 'w') + else: + self.file_out = None + + self.curl.perform() + self._parse_headers(url) + + if self.file_out: + self.file_out.close() + + def _parse_headers(self, url): + header = {} + for line in self.header.splitlines(): + # Header lines typically are of the form thing: value... + test_line = re.search("^(.*?)\s*:\s*(.*)$", line) + + if test_line: + header[test_line.group(1)] = test_line.group(2) + + # The location attribute is sometimes relative, but we would + # like to have it as always absolute... + if 'Location' in header: + parsed_location = urlparse.urlparse(header["Location"]) + + # If not an absolute location... + if not parsed_location.netloc: + parsed_source_url = urlparse.urlparse(url) + new_location = ["", "", "", "", ""] + + new_location[0] = parsed_source_url.scheme + new_location[1] = parsed_source_url.netloc + new_location[2] = header["Location"] + + # Update location with absolute URL + header["Location"] = urlparse.urlunsplit(new_location) + + self.header_text = self.header + self.header = header + + def get_headers(self, url): + url = url.encode("ascii") + self.curl.setopt(pycurl.URL, url) + + self.body = "" + self.header = "" + + # Setting NOBODY causes CURL to just fetch the header. + self.curl.setopt(pycurl.NOBODY, True) + self.curl.perform() + self.curl.setopt(pycurl.NOBODY, False) + + self._parse_headers(url) + + return self.header + + def get_or_return_license(self, url, file_name=None): + """Get file at the requested URL or, if behind a license, return that. + + If the URL provided does not redirect us to a license, then return the + body of that file. If we are redirected to a license click through + then return (the license as plain text, url to accept the license). + + If the user of this function accepts the license, then they should + call get_protected_file.""" + + self.file_name = file_name + + # Get the license details. If this returns None, the file isn't license + # protected and we can just return the file we started to get in the + # function (self.body). + license_details = self._get_license(url) + + if license_details: + return license_details + + return self.body + + def get(self, url, file_name=None, ignore_license=False, accept_license=True): + """Fetch the requested URL, accepting licenses + + Fetches the file at url. If a redirect is encountered, it is + expected to be to a license that has an accept link. Follow that link, + then download the original file. Returns the fist 1MB of the file + (see _write_body). + + """ + + self.file_name = file_name + if ignore_license: + self._get(url) + return self.body + + license_details = self._get_license(url) + + if license_details: + # Found a license. + if accept_license: + # Accept the license without looking at it and + # start fetching the file we originally wanted. + accept_url = license_details[1] + self.get_protected_file(accept_url, url) + else: + # We want to decline the license and return the notice. + decline_url = license_details[2] + self._get(decline_url) + + else: + # If we got here, there wasn't a license protecting the file + # so we just fetch it. + self._get(url) + + return self.body + + def _get_license(self, url): + """Return (license, accept URL, decline URL) if found, + else return None. + + """ + + self.get_headers(url) + + if "Location" in self.header and self.header["Location"] != url: + # We have been redirected to a new location - the license file + location = self.header["Location"] + + # Fetch the license HTML + self._get(location) + + # Get the file from the URL (full path) + file = urlparse.urlparse(location).path + + # Get the file without the rest of the path + file = os.path.split(file)[-1] + + # Look for a link with accepted.html in the page name. Follow it. + accept_search, decline_search = None, None + for line in self.body.splitlines(): + if not accept_search: + accept_search = re.search( + """href=.*?["'](.*?-accepted.html)""", + line) + if not decline_search: + decline_search = re.search( + """href=.*?["'](.*?-declined.html)""", + line) + + if accept_search and decline_search: + # Have found license accept URL! + new_file = accept_search.group(1) + accept_url = re.sub(file, new_file, location) + + # Found decline URL as well. + new_file_decline = decline_search.group(1) + decline_url = re.sub(file, new_file_decline, location) + + # Parse the HTML using BeautifulSoup + soup = BeautifulSoup(self.body) + + # The license is in a div with the ID license-text, so we + # use this to pull just the license out of the HTML. + html_license = u"" + for chunk in soup.findAll(id="license-text"): + # Output of chunk.prettify is UTF8, but comes back + # as a str, so convert it here. + html_license += chunk.prettify().decode("utf-8") + + text_license = html2text.html2text(html_license) + + return text_license, accept_url, decline_url + + return None + + def get_protected_file(self, accept_url, url): + """Gets the file redirected to by the accept_url""" + + self._get(accept_url) # Accept the license + + if not("Location" in self.header and self.header["Location"] == url): + # If we got here, we don't have the file yet (weren't redirected + # to it). Fetch our target file. This should work now that we have + # the right cookie. + self._get(url) # Download the target file + + return self.body + + def _write_body(self, buf): + """Used by curl as a sink for body content""" + + # If we have a target file to write to, write to it + if self.file_out: + self.file_out.write(buf) + + # Only buffer first 1MB of body. This should be plenty for anything + # we wish to parse internally. + if len(self.body) < 1024*1024*1024: + # XXX Would be nice to stop keeping the file in RAM at all and + # passing large buffers around. Perhaps only keep in RAM if + # file_name == None? (used for getting directory listings + # normally). + self.body += buf + + def _write_header(self, buf): + """Used by curl as a sink for header content""" + self.header += buf + + def register_progress_callback(self, callback): + self.curl.setopt(pycurl.NOPROGRESS, 0) + self.curl.setopt(pycurl.PROGRESSFUNCTION, callback) + + def close(self): + """Wrapper to close curl - this will allow curl to write out cookies""" + self.curl.close() + +def main(): + """Download file specified on command line""" + parser = argparse.ArgumentParser(description="Download a file, accepting " + "any licenses required to do so.") + + parser.add_argument('url', metavar="URL", type=str, nargs=1, + help="URL of file to download.") + + args = parser.parse_args() + + fetcher = LicenseProtectedFileFetcher() + + # Get file name from URL + file_name = os.path.basename(urlparse.urlparse(args.url[0]).path) + if not file_name: + file_name = "downloaded" + fetcher.get(args.url[0], file_name) + + fetcher.close() + +if __name__ == "__main__": + main() diff --git a/testing/test_click_through_license.py b/testing/test_click_through_license.py index ea7cf72..3288f85 100644 --- a/testing/test_click_through_license.py +++ b/testing/test_click_through_license.py @@ -9,7 +9,7 @@ import socket from testtools import TestCase from testtools.matchers import Mismatch -from filefetcher import LicenseProtectedFileFetcher +from license_protected_file_downloader import LicenseProtectedFileFetcher fetcher = LicenseProtectedFileFetcher() cwd = os.getcwd() @@ -145,19 +145,19 @@ class TestLicense(TestCase): self.assertThat(testfile, Contains(search)) def test_redirect_to_license_samsung(self): - search = "LICENSE AGREEMENT" - testfile = fetcher.get(host + samsung_test_file, ignore_license=True) - self.assertThat(testfile, Contains(search)) + search = "PLEASE READ THE FOLLOWING AGREEMENT CAREFULLY" + testfile = fetcher.get_or_return_license(host + samsung_test_file) + self.assertThat(testfile[0], Contains(search)) def test_redirect_to_license_ste(self): - search = "LICENSE AGREEMENT" - testfile = fetcher.get(host + ste_test_file, ignore_license=True) - self.assertThat(testfile, Contains(search)) + search = "PLEASE READ THE FOLLOWING AGREEMENT CAREFULLY" + testfile = fetcher.get_or_return_license(host + ste_test_file) + self.assertThat(testfile[0], Contains(search)) def test_redirect_to_license_linaro(self): - search = "LICENSE AGREEMENT" - testfile = fetcher.get(host + linaro_test_file, ignore_license=True) - self.assertThat(testfile, Contains(search)) + search = "Linaro license." + testfile = fetcher.get_or_return_license(host + linaro_test_file) + self.assertThat(testfile[0], Contains(search)) def test_decline_license_samsung(self): search = "License has not been accepted" @@ -214,13 +214,13 @@ class TestLicense(TestCase): def test_license_accepted_samsung(self): search = "This is protected with click-through Samsung license." os.rename("%s/cookies.samsung" % docroot, "%s/cookies.txt" % docroot) - testfile = fetcher.get(host + samsung_test_file, ignore_license=True) + testfile = fetcher.get(host + samsung_test_file) self.assertThat(testfile, Contains(search)) def test_license_accepted_ste(self): search = "This is protected with click-through ST-E license." os.rename("%s/cookies.ste" % docroot, "%s/cookies.txt" % docroot) - testfile = fetcher.get(host + ste_test_file, ignore_license=True) + testfile = fetcher.get(host + ste_test_file) self.assertThat(testfile, Contains(search)) def test_internal_host_samsung(self): |