aboutsummaryrefslogtreecommitdiff
path: root/linaro_media_create
diff options
context:
space:
mode:
authorGuilherme Salgado <salgado@canonical.com>2011-01-11 15:26:54 -0600
committerGuilherme Salgado <salgado@canonical.com>2011-01-11 15:26:54 -0600
commit6759a060b6c741c8b0f830db83a11b64d9ddefb8 (patch)
tree6ff43cf5f57c631b0d687150c6f6a786ace03d6e /linaro_media_create
parent0e4ecd19250300e1f5ee3e733f85e9d2f6285749 (diff)
Rename the media_create python package to linaro_media_create, to match the name of the deb package (python-linaro-media-create)
Diffstat (limited to 'linaro_media_create')
-rw-r--r--linaro_media_create/__init__.py207
-rw-r--r--linaro_media_create/boot_cmd.py0
-rwxr-xr-xlinaro_media_create/check_device.py114
-rw-r--r--linaro_media_create/cmd_runner.py58
-rw-r--r--linaro_media_create/ensure_command.py13
-rw-r--r--linaro_media_create/hwpack.py118
-rw-r--r--linaro_media_create/partitions.py299
-rw-r--r--linaro_media_create/populate_boot.py167
-rw-r--r--linaro_media_create/remove_binary_dir.py11
-rw-r--r--linaro_media_create/rootfs.py113
-rw-r--r--linaro_media_create/tests/__init__.py8
-rwxr-xr-xlinaro_media_create/tests/fixtures.py123
-rw-r--r--linaro_media_create/tests/test_media_create.py1047
-rw-r--r--linaro_media_create/unpack_binary_tarball.py9
14 files changed, 2287 insertions, 0 deletions
diff --git a/linaro_media_create/__init__.py b/linaro_media_create/__init__.py
new file mode 100644
index 0000000..e22e9f8
--- /dev/null
+++ b/linaro_media_create/__init__.py
@@ -0,0 +1,207 @@
+import argparse
+import uuid
+
+ROOTFS_UUID = str(uuid.uuid4())
+KNOWN_BOARDS = ['beagle', 'igep', 'mx51evk', 'panda', 'ux500', 'vexpress']
+
+
+class Live256MegsAction(argparse.Action):
+ """A custom argparse.Action for the --live-256m option.
+
+ It is a store_true action for the given dest plus a store_true action for
+ 'is_live'.
+ """
+
+ def __init__(self, option_strings, dest, default=None, required=False,
+ help=None, metavar=None):
+ super(Live256MegsAction, self).__init__(
+ option_strings=option_strings, dest=dest, nargs=0,
+ default=False, required=required, help=help)
+
+ def __call__(self, parser, namespace, values, option_string=None):
+ setattr(namespace, self.dest, True)
+ setattr(namespace, 'is_live', True)
+
+
+def get_args_parser():
+ """Get the ArgumentParser for the arguments given on the command line."""
+ parser = argparse.ArgumentParser()
+ group = parser.add_mutually_exclusive_group(required=True)
+ group.add_argument(
+ '--mmc', dest='device', help='The storage device to use.')
+ group.add_argument(
+ '--image_file', dest='device',
+ help='File where we should write the QEMU image.')
+ parser.add_argument(
+ '--dev', required=True, dest='board', choices=KNOWN_BOARDS,
+ help='Generate an SD card or image for the given board.')
+ parser.add_argument(
+ '--rootfs', default='ext3', choices=['ext2', 'ext3', 'ext4', 'btrfs'],
+ help='Type of filesystem to use for the rootfs')
+ parser.add_argument(
+ '--rfs_label', default='rootfs',
+ help='Label to use for the root filesystem.')
+ parser.add_argument(
+ '--boot_label', default='boot',
+ help='Label to use for the boot filesystem.')
+ parser.add_argument(
+ '--swap_file', type=int,
+ help='Create a swap file of the given size (in MBs).')
+ group = parser.add_mutually_exclusive_group()
+ group.add_argument(
+ '--live', dest='is_live', action='store_true',
+ help=('Create boot command for casper/live images; if this is not '
+ 'provided a UUID for the rootfs is generated and used as the '
+ 'root= option'))
+ group.add_argument(
+ '--live-256m', dest='is_lowmem', action=Live256MegsAction,
+ help=('Create boot command for casper/live images; adds '
+ 'only-ubiquity option to allow use of live installer on '
+ 'boards with 256M memory - like beagle.'))
+ parser.add_argument(
+ '--console', action='append', dest='consoles',
+ help=('Add a console to kernel boot parameter; this parameter can be '
+ 'defined multiple times.'))
+ parser.add_argument(
+ '--hwpack', action='append', dest='hwpacks', required=True,
+ help=('A hardware pack that should be installed in the rootfs; this '
+ 'parameter can be defined multiple times.'))
+ parser.add_argument(
+ '--hwpack-force-yes', action='store_true',
+ help='Pass --force-yes to linaro-hwpack-install')
+ parser.add_argument(
+ '--image_size', default='2G',
+ help=('The image size, specified in mega/giga bytes (e.g. 3000M or '
+ '3G); use with --image_file only'))
+ parser.add_argument(
+ '--binary', default='binary-tar.tar.gz', required=True,
+ help=('The tarball containing the rootfs used to create the bootable '
+ 'system.'))
+ parser.add_argument(
+ '--no-rootfs', dest='should_format_rootfs', action='store_false',
+ help='Do not deploy the root filesystem.')
+ parser.add_argument(
+ '--no-bootfs', dest='should_format_bootfs', action='store_false',
+ help='Do not deploy the boot filesystem.')
+ parser.add_argument(
+ '--no-part', dest='should_create_partitions', action='store_false',
+ help='Reuse existing partitions on the given media.')
+ return parser
+
+
+def get_board_config(board, is_live, is_lowmem, consoles):
+ """Return a dict containing the configs to create an image for a board.
+
+ :param args: An argparse.ArgumentParser object containing the arguments
+ passed to linaro-media-create.
+ """
+ mmc_part_offset = 0
+ mmc_option = '0:1'
+ boot_args_options = 'rootwait ro'
+ uboot_flavor = None
+ fat_size = 32
+ serial_opts = ''
+ if consoles is not None:
+ for console in consoles:
+ serial_opts += ' console=%s' % console
+
+ # XXX: I think this is not needed as we have board-specific
+ # serial options for when is_live is true.
+ if is_live:
+ serial_opts += ' serialtty=ttyS2'
+
+ if board in ('beagle', 'igep'):
+ if board == 'beagle':
+ uboot_flavor = 'omap3_beagle'
+ serial_opts += ' console=tty0 console=ttyS2,115200n8'
+ live_serial_opts = 'serialtty=ttyS2'
+ kernel_addr = '0x80000000'
+ initrd_addr = '0x81600000'
+ load_addr = '0x80008000'
+ sub_arch = 'linaro-omap'
+ boot_script = 'boot.scr'
+ boot_args_options += (
+ ' earlyprintk fixrtc nocompcache vram=12M omapfb.debug=y'
+ ' omapfb.mode=dvi:1280x720MR-16@60')
+
+ elif board == 'panda':
+ uboot_flavor = 'omap4_panda'
+ serial_opts += ' console = tty0 console = ttyO2,115200n8'
+ live_serial_opts = 'serialtty = ttyO2'
+ kernel_addr = '0x80200000'
+ initrd_addr = '0x81600000'
+ load_addr = '0x80008000'
+ sub_arch = 'omap4'
+ boot_script = 'boot.scr'
+ boot_args_options += (
+ ' earlyprintk fixrtc nocompcache vram = 32M omapfb.debug = y'
+ ' omapfb.vram = 0:8M mem = 463M ip = none')
+
+ elif board == 'ux500':
+ serial_opts += ' console = tty0 console = ttyAMA2,115200n8'
+ live_serial_opts = 'serialtty = ttyAMA2'
+ kernel_addr = '0x00100000'
+ initrd_addr = '0x08000000'
+ load_addr = '0x00008000'
+ sub_arch = 'ux500'
+ boot_script = 'flash.scr'
+ boot_args_options += (
+ ' earlyprintk rootdelay = 1 fixrtc nocompcache'
+ ' mem = 96M@0 mem_modem = 32M@96M mem = 44M@128M pmem = 22M@172M'
+ ' mem = 30M@194M mem_mali = 32M@224M pmem_hwb = 54M@256M'
+ ' hwmem = 48M@302M mem = 152M@360M')
+ mmc_option = '1:1'
+
+ elif board == 'mx51evk':
+ serial_opts += ' console = tty0 console = ttymxc0,115200n8'
+ live_serial_opts = 'serialtty = ttymxc0'
+ kernel_addr = '0x90000000'
+ initrd_addr = '0x90800000'
+ load_addr = '0x90008000'
+ sub_arch = 'linaro-mx51'
+ boot_script = 'boot.scr'
+ mmc_part_offset = 1
+ mmc_option = '0:2'
+
+ elif board == 'vexpress':
+ uboot_flavor = 'ca9x4_ct_vxp'
+ serial_opts += ' console = tty0 console = ttyAMA0,38400n8'
+ live_serial_opts = 'serialtty = ttyAMA0'
+ kernel_addr = '0x60008000'
+ initrd_addr = '0x81000000'
+ load_addr = kernel_addr
+ sub_arch = 'linaro-vexpress'
+ boot_script = None
+ # ARM Boot Monitor is used to load u-boot, uImage etc. into flash and
+ # only allows for FAT16
+ fat_size = 16
+
+ else:
+ raise ValueError("Unkown board: %s" % board)
+
+ lowmem_opt = ''
+ boot_snippet = 'root=UUID=%s' % ROOTFS_UUID
+ if is_live:
+ serial_opts += ' %s' % live_serial_opts
+ boot_snippet = 'boot=casper'
+ if is_lowmem:
+ lowmem_opt = 'only-ubiquity'
+
+ boot_cmd = (
+ "setenv bootcmd 'fatload mmc %(mmc_option)s %(kernel_addr)s "
+ "uImage; fatload mmc %(mmc_option)s %(initrd_addr)s uInitrd; "
+ "bootm %(kernel_addr)s %(initrd_addr)s'\n"
+ "setenv bootargs '%(serial_opts)s %(lowmem_opt)s "
+ "%(boot_snippet)s %(boot_args_options)s'\n"
+ "boot" % vars())
+
+ # Instead of constructing a dict here, we could create a separate class
+ # for the config of every board, with the varying bits stored as class
+ # variables. At this point I don't see much advantage in doing that,
+ # though.
+ return dict(
+ kernel_addr=kernel_addr, initrd_addr=initrd_addr, load_addr=load_addr,
+ sub_arch=sub_arch, boot_script=boot_script, fat_size=fat_size,
+ boot_args_options=boot_args_options, serial_opts=serial_opts,
+ uboot_flavor=uboot_flavor, mmc_part_offset=mmc_part_offset,
+ mmc_option=mmc_option, boot_cmd=boot_cmd)
diff --git a/linaro_media_create/boot_cmd.py b/linaro_media_create/boot_cmd.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/linaro_media_create/boot_cmd.py
diff --git a/linaro_media_create/check_device.py b/linaro_media_create/check_device.py
new file mode 100755
index 0000000..6130ff5
--- /dev/null
+++ b/linaro_media_create/check_device.py
@@ -0,0 +1,114 @@
+import glob
+import sys
+
+import dbus
+
+from linaro_media_create import partitions
+
+
+def _get_system_bus_and_udisks_iface():
+ """Return the system bus and the UDisks interface.
+
+ :return: System bus and UDisks inteface tuple.
+ """
+ bus = dbus.SystemBus()
+ udisks = dbus.Interface(
+ bus.get_object("org.freedesktop.UDisks", "/org/freedesktop/UDisks"),
+ 'org.freedesktop.UDisks')
+
+ return (bus, udisks)
+
+
+def _get_dbus_property(prop, device, path):
+ """ Return a named property for a specific device.
+
+ :param prop: Named property.
+ :param device: Device object.
+ :param path: Device path.
+ :return: Device property.
+ """
+ return device.Get(
+ path, prop, dbus_interface='org.freedesktop.DBus.Properties')
+
+
+def _does_device_exist(path):
+ """Checks if the provided path is an existing device.
+
+ :param path: Disk device path.
+ :return: True if the device exist, else False.
+ """
+ bus, udisks = _get_system_bus_and_udisks_iface()
+ try:
+ udisks.get_dbus_method('FindDeviceByDeviceFile')(path)
+ except dbus.exceptions.DBusException:
+ # TODO: Check that this exception isn't hiding other errors.
+ return False
+
+ return True
+
+
+def _print_devices():
+ """Print disk devices found on the system."""
+ bus, udisks = _get_system_bus_and_udisks_iface()
+ print '%-16s %-16s %s' % ('Device', 'Mount point', 'Size')
+ devices = udisks.get_dbus_method('EnumerateDevices')()
+ for path in devices:
+ device = bus.get_object("org.freedesktop.UDisks", path)
+ device_file = _get_dbus_property('DeviceFile', device, path)
+
+ mount_paths = _get_dbus_property('device-mount-paths', device, path)
+ mount_point = ''.join(b for b in mount_paths)
+ if mount_point == '':
+ mount_point = 'none'
+
+ if _get_dbus_property('DeviceIsPartition', device, path):
+ part_size = _get_dbus_property('partition-size', device, path)
+ print '%-16s %-16s %dMB' % (
+ device_file, mount_point, part_size / 1024**2)
+ else:
+ device_size = _get_dbus_property('device-size', device, path)
+ print '%-16s %-16s %dMB' % (
+ device_file, mount_point, device_size / 1024**2)
+
+
+def _select_device(device):
+ """Ask the user to confirm the selected device.
+
+ :param device: Device path.
+ :return: True if the user confirms the selection, else False.
+ """
+ resp = raw_input('Are you 100%% sure, on selecting [%s] (y/n)? ' % device)
+ if resp.lower() != 'y':
+ return False
+ return True
+
+
+def _ensure_device_partitions_not_mounted(device):
+ """Ensure all partitions of the given device are not mounted."""
+ # Use '%s?*' as we only want the device files representing
+ # partitions and not the one representing the device itself.
+ for part in glob.glob('%s?*' % device):
+ partitions.ensure_partition_is_not_mounted(part)
+
+
+def confirm_device_selection_and_ensure_it_is_ready(device):
+ """Confirm this is the device to use and ensure it's ready.
+
+ If the device exists, the user is asked to confirm that this is the
+ device to use. Upon confirmation we ensure all partitions of the device
+ are umounted.
+
+ :param device: The path to the device.
+ :return: True if the device exist and is selected, else False.
+ """
+ if _does_device_exist(device):
+ print '\nI see...'
+ _print_devices()
+ if _select_device(device):
+ _ensure_device_partitions_not_mounted(device)
+ return True
+ else:
+ print '\nAre you sure? I do not see [%s].' % device
+ print 'Here is what I see...'
+ _print_devices()
+ return False
diff --git a/linaro_media_create/cmd_runner.py b/linaro_media_create/cmd_runner.py
new file mode 100644
index 0000000..22c603c
--- /dev/null
+++ b/linaro_media_create/cmd_runner.py
@@ -0,0 +1,58 @@
+import os
+import subprocess
+
+
+def run(args, as_root=False, stdin=None, stdout=None, stderr=None):
+ """Run the given command as a sub process.
+
+ Return a Popen instance.
+
+ Callsites must wait() or communicate() with the returned Popen instance.
+
+ :param command: A list or tuple containing the command to run and the
+ arguments that should be passed to it.
+ :param as_root: Should the given command be run as root (with sudo)?
+ :param stdin: Same as in subprocess.Popen().
+ :param stdout: Same as in subprocess.Popen().
+ :param stderr: Same as in subprocess.Popen().
+ """
+ assert isinstance(args, (list, tuple)), (
+ "The command to run must be a list or tuple, found: %s" % type(args))
+ # TODO: We might want to always use 'sudo -E' here to avoid problems like
+ # https://launchpad.net/bugs/673570
+ if as_root:
+ args = args[:]
+ args.insert(0, 'sudo')
+ return Popen(args, stdin=stdin, stdout=stdout, stderr=stderr)
+
+
+class Popen(subprocess.Popen):
+ """A version of Popen which raises an error on non-zero returncode.
+
+ Once the subprocess completes we check its returncode and raise
+ SubcommandNonZeroReturnValue if it's non-zero.
+ """
+
+ def __init__(self, args, env=None, **kwargs):
+ self._my_args = args
+ if env is None:
+ env = os.environ.copy()
+ env['LC_ALL'] = 'C'
+ super(Popen, self).__init__(args, env=env, **kwargs)
+
+ def wait(self):
+ returncode = super(Popen, self).wait()
+ if returncode != 0:
+ raise SubcommandNonZeroReturnValue(self._my_args, returncode)
+ return returncode
+
+
+class SubcommandNonZeroReturnValue(Exception):
+
+ def __init__(self, command, return_value):
+ self.command = command
+ self.retval = return_value
+
+ def __str__(self):
+ return 'Sub process "%s" returned a non-zero value: %d' % (
+ self.command, self.retval)
diff --git a/linaro_media_create/ensure_command.py b/linaro_media_create/ensure_command.py
new file mode 100644
index 0000000..ad8755e
--- /dev/null
+++ b/linaro_media_create/ensure_command.py
@@ -0,0 +1,13 @@
+import os
+import sys
+
+
+def apt_get_install(command, package):
+ print ("Installing required command %s from package %s"
+ % (command, package))
+ os.system('sudo apt-get install %s' % package)
+
+
+def ensure_command(command, package):
+ if os.system('which %s 2>/dev/null 1>/dev/null' % command) != 0:
+ apt_get_install(command, package)
diff --git a/linaro_media_create/hwpack.py b/linaro_media_create/hwpack.py
new file mode 100644
index 0000000..54a7ff5
--- /dev/null
+++ b/linaro_media_create/hwpack.py
@@ -0,0 +1,118 @@
+import os
+import platform
+
+from linaro_media_create import cmd_runner
+from linaro_media_create.ensure_command import ensure_command
+
+
+# It'd be nice if we could use atexit here, but all the things we need to undo
+# have to happen right after install_hwpacks completes and the atexit
+# functions would only be called after l-m-c.py exits.
+local_atexit = []
+
+def install_hwpacks(chroot_dir, tmp_dir, hwpack_force_yes, *hwpack_files):
+ """Install the given hwpacks onto the given chroot."""
+
+ chroot_etc = os.path.join(chroot_dir, 'etc')
+ temporarily_overwrite_file_on_dir('/etc/resolv.conf', chroot_etc, tmp_dir)
+ temporarily_overwrite_file_on_dir('/etc/hosts', chroot_etc, tmp_dir)
+
+ if not platform.machine().startswith('arm'):
+ ensure_command('qemu-arm-static', 'qemu-arm-static')
+ ensure_command('qemu-img', 'qemu-kvm')
+ copy_file('/usr/bin/qemu-arm-static',
+ os.path.join(chroot_dir, 'usr', 'bin'))
+
+ # FIXME: This is an ugly hack to make sure we use the l-h-i script from
+ # the current development tree when possible.
+ here = os.path.dirname(__file__)
+ linaro_hwpack_install_path = os.path.join(
+ here, '..', 'linaro-hwpack-install')
+ if not os.path.exists(linaro_hwpack_install_path):
+ linaro_hwpack_install_path = '/usr/bin/linaro-hwpack-install'
+ copy_file(linaro_hwpack_install_path,
+ os.path.join(chroot_dir, 'usr', 'bin'))
+
+ mount_chroot_proc(chroot_dir)
+
+ for hwpack_file in hwpack_files:
+ install_hwpack(chroot_dir, hwpack_file, hwpack_force_yes)
+
+ run_local_atexit_funcs()
+
+
+def install_hwpack(chroot_dir, hwpack_file, hwpack_force_yes):
+ """Install an hwpack on the given chroot.
+
+ Copy the hwpack file to the chroot and run linaro-hwpack-install passing
+ that hwpack file to it. If hwpack_force_yes is True, also pass
+ --force-yes to linaro-hwpack-install.
+ """
+ hwpack_basename = os.path.basename(hwpack_file)
+ copy_file(hwpack_file, chroot_dir)
+ print "-" * 60
+ print "Installing (apt-get) %s in target rootfs." % hwpack_basename
+ args = ['chroot', chroot_dir, 'linaro-hwpack-install']
+ if hwpack_force_yes:
+ args.append('--force-yes')
+ args.append('/%s' % hwpack_basename)
+ cmd_runner.run(args, as_root=True).wait()
+ print "-" * 60
+
+
+def mount_chroot_proc(chroot_dir):
+ """Mount a /proc filesystem on the given chroot.
+
+ Also register a function in local_atexit to unmount that /proc filesystem.
+ """
+ chroot_proc = os.path.join(chroot_dir, 'proc')
+ proc = cmd_runner.run(
+ ['mount', 'proc', chroot_proc, '-t', 'proc'], as_root=True)
+ proc.wait()
+ def umount_chroot_proc():
+ cmd_runner.run(['umount', '-v', chroot_proc], as_root=True).wait()
+ local_atexit.append(umount_chroot_proc)
+
+
+def copy_file(filepath, directory):
+ """Copy the given file to the given directory.
+
+ The copying of the file is done in a subprocess and run using sudo.
+
+ We also register a function in local_atexit to remove the file from the
+ given directory.
+ """
+ cmd_runner.run(['cp', filepath, directory], as_root=True).wait()
+
+ def undo():
+ new_path = os.path.join(directory, os.path.basename(filepath))
+ cmd_runner.run(['rm', '-f', new_path], as_root=True).wait()
+ local_atexit.append(undo)
+
+
+def temporarily_overwrite_file_on_dir(filepath, directory, tmp_dir):
+ """Temporarily replace a file on the given directory.
+
+ We'll move the existing file on the given directory to a temp dir, then
+ copy over the given file to that directory and register a function in
+ local_atexit to move the orig file back to the given directory.
+ """
+ basename = os.path.basename(filepath)
+ path_to_orig = os.path.join(tmp_dir, basename)
+ # Move the existing file from the given directory to the temp dir.
+ cmd_runner.run(
+ ['mv', '-f', os.path.join(directory, basename), path_to_orig],
+ as_root=True).wait()
+ # Now copy the given file onto the given directory.
+ cmd_runner.run(['cp', filepath, directory], as_root=True).wait()
+
+ def undo():
+ cmd_runner.run(
+ ['mv', '-f', path_to_orig, directory], as_root=True).wait()
+ local_atexit.append(undo)
+
+
+def run_local_atexit_funcs():
+ # Run the funcs in LIFO order, just like atexit does.
+ while len(local_atexit) > 0:
+ local_atexit.pop()()
diff --git a/linaro_media_create/partitions.py b/linaro_media_create/partitions.py
new file mode 100644
index 0000000..c68feb4
--- /dev/null
+++ b/linaro_media_create/partitions.py
@@ -0,0 +1,299 @@
+import atexit
+import subprocess
+import time
+
+import dbus
+from parted import (
+ Device,
+ Disk,
+ )
+
+from linaro_media_create import cmd_runner
+
+
+HEADS = 255
+SECTORS = 63
+SECTOR_SIZE = 512 # bytes
+CYLINDER_SIZE = HEADS * SECTORS * SECTOR_SIZE
+DBUS_PROPERTIES = 'org.freedesktop.DBus.Properties'
+UDISKS = "org.freedesktop.UDisks"
+
+
+# I wonder if it'd make sense to convert this into a small shim which calls
+# the appropriate function for the given type of device? I think it's still
+# small enough that there's not much benefit in doing that, but if it grows we
+# might want to do it.
+def setup_partitions(board, media, fat_size, image_size,
+ bootfs_label, rootfs_label, rootfs_type, rootfs_uuid,
+ should_create_partitions, should_format_bootfs,
+ should_format_rootfs):
+ """Make sure the given device is partitioned to boot the given board.
+
+ :param board: The board's name, as a string.
+ :param media: The Media we should partition.
+ :param fat_size: The FAT size (16 or 32) for the boot partition.
+ :param image_size: The size of the image file, in case we're setting up a
+ QEMU image.
+ :param should_create_partitions: Whether or not we should erase existing
+ partitions and create new ones.
+ """
+ cylinders = None
+ if not media.is_block_device:
+ image_size_in_bytes = convert_size_to_bytes(image_size)
+ cylinders = image_size_in_bytes / CYLINDER_SIZE
+ image_size_in_bytes = cylinders * CYLINDER_SIZE
+ proc = cmd_runner.run(
+ ['qemu-img', 'create', '-f', 'raw', media.path, image_size],
+ stdout=open('/dev/null', 'w'))
+ proc.wait()
+
+ if should_create_partitions:
+ create_partitions(board, media, fat_size, HEADS, SECTORS, cylinders)
+
+ if media.is_block_device:
+ bootfs, rootfs = get_boot_and_root_partitions_for_media(media)
+ else:
+ bootfs, rootfs = get_boot_and_root_loopback_devices(media.path)
+
+ if should_format_bootfs:
+ print "\nFormating boot partition\n"
+ # It looks like KDE somehow automounts the partitions after you
+ # repartition a disk so we need to unmount them here to create the
+ # filesystem.
+ ensure_partition_is_not_mounted(bootfs)
+ proc = cmd_runner.run(
+ ['mkfs.vfat', '-F', str(fat_size), bootfs, '-n', bootfs_label],
+ as_root=True)
+ proc.wait()
+
+ if should_format_rootfs:
+ print "\nFormating root partition\n"
+ mkfs = 'mkfs.%s' % rootfs_type
+ ensure_partition_is_not_mounted(rootfs)
+ proc = cmd_runner.run(
+ [mkfs, '-U', rootfs_uuid, rootfs, '-L', rootfs_label],
+ as_root=True)
+ proc.wait()
+
+ return bootfs, rootfs
+
+
+def ensure_partition_is_not_mounted(partition):
+ """Ensure the given partition is not mounted, umounting if necessary."""
+ if is_partition_mounted(partition):
+ cmd_runner.run(['umount', partition], as_root=True).wait()
+
+
+def is_partition_mounted(partition):
+ """Is the given partition mounted?"""
+ device_path = _get_udisks_device_path(partition)
+ device = dbus.SystemBus().get_object(UDISKS, device_path)
+ is_partition = device.Get(
+ device_path, 'DeviceIsPartition', dbus_interface=DBUS_PROPERTIES)
+ return device.Get(
+ device_path, 'DeviceIsMounted', dbus_interface=DBUS_PROPERTIES)
+
+
+def get_boot_and_root_loopback_devices(image_file):
+ """Return the boot and root loopback devices for the given image file.
+
+ Register the loopback devices as well.
+ """
+ vfat_size, vfat_offset, linux_size, linux_offset = (
+ calculate_partition_size_and_offset(image_file))
+ boot_device = register_loopback(image_file, vfat_offset, vfat_size)
+ root_device = register_loopback(image_file, linux_offset, linux_size)
+ return boot_device, root_device
+
+
+def register_loopback(image_file, offset, size):
+ """Register a loopback device with an atexit handler to de-register it."""
+ def undo(device):
+ cmd_runner.run(['losetup', '-d', device], as_root=True).wait()
+
+ proc = cmd_runner.run(
+ ['losetup', '-f', '--show', image_file, '--offset',
+ str(offset), '--sizelimit', str(size)],
+ stdout=subprocess.PIPE, as_root=True)
+ device, _ = proc.communicate()
+ device = device.strip()
+ atexit.register(undo, device)
+ return device
+
+
+def calculate_partition_size_and_offset(image_file):
+ """Return the size and offset of the boot and root partitions.
+
+ Both the size and offset are in sectors.
+
+ :param image_file: A string containing the path to the image_file.
+ :return: A 4-tuple containing the offset and size of the boot partition
+ followed by the offset and size of the root partition.
+ """
+ # Here we can use parted.Device to read the partitions because we're
+ # reading from a regular file rather than a block device. If it was a
+ # block device we'd need root rights.
+ disk = Disk(Device(image_file))
+ vfat_partition = None
+ for partition in disk.partitions:
+ if 'boot' in partition.getFlagsAsString():
+ geometry = partition.geometry
+ vfat_offset = geometry.start * 512
+ vfat_size = geometry.length * 512
+ vfat_partition = partition
+
+ assert vfat_partition is not None, (
+ "Couldn't find boot partition on %s" % image_file)
+ linux_partition = vfat_partition.nextPartition()
+ geometry = linux_partition.geometry
+ linux_offset = geometry.start * 512
+ linux_size = geometry.length * 512
+ return vfat_size, vfat_offset, linux_size, linux_offset
+
+
+def get_boot_and_root_partitions_for_media(media):
+ """Return the device files for the boot and root partitions of media.
+
+ If the given media has 2 partitions, the first is boot and the second is
+ root. If there are 3 partitions, the second is boot and third is root.
+
+ If there are any other number of partitions, ValueError is raised.
+
+ This function must only be used for block devices.
+ """
+ assert media.is_block_device, (
+ "This function must only be used for block devices")
+
+ partition_count = _get_partition_count(media)
+
+ if partition_count == 2:
+ partition_offset = 0
+ elif partition_count == 3:
+ partition_offset = 1
+ else:
+ raise ValueError(
+ "Unexpected number of partitions on %s: %d" % (
+ media.path, partition_count))
+
+ boot_partition = "%s%d" % (media.path, 1 + partition_offset)
+ root_partition = "%s%d" % (media.path, 2 + partition_offset)
+ return boot_partition, root_partition
+
+
+def _get_partition_count(media):
+ """Return the number of partitions on the given media."""
+ # We could do the same easily using python-parted but it requires root
+ # rights to read block devices, so we use UDisks here.
+ device_path = _get_udisks_device_path(media.path)
+ device = dbus.SystemBus().get_object(UDISKS, device_path)
+ return device.Get(
+ device_path, 'PartitionTableCount', dbus_interface=DBUS_PROPERTIES)
+
+
+def _get_udisks_device_path(device):
+ """Return the UDisks path for the given device."""
+ bus = dbus.SystemBus()
+ udisks = dbus.Interface(
+ bus.get_object(UDISKS, "/org/freedesktop/UDisks"), UDISKS)
+ return udisks.get_dbus_method('FindDeviceByDeviceFile')(device)
+
+
+def convert_size_to_bytes(size):
+ """Convert a size string in Kbytes, Mbytes or Gbytes to bytes."""
+ unit = size[-1].upper()
+ real_size = int(size[:-1])
+ if unit == 'K':
+ real_size = real_size * 1024
+ elif unit == 'M':
+ real_size = real_size * 1024 * 1024
+ elif unit == 'G':
+ real_size = real_size * 1024 * 1024 * 1024
+ else:
+ raise ValueError("Unknown size format: %s. Use K[bytes], M[bytes] "
+ "or G[bytes]" % size)
+
+ # Round the size of the raw disk image up to a multiple of 256K so it is
+ # an exact number of SD card erase blocks in length. Otherwise Linux
+ # under qemu cannot access the last part of the card and is likely to
+ # complain that the last partition on the disk has been truncated. This
+ # doesn't appear to work in all cases, though, as can be seen on
+ # https://bugs.launchpad.net/linux-linaro/+bug/673335.
+ if real_size % (1024 * 256):
+ cylinders = real_size / CYLINDER_SIZE
+ real_size = cylinders * CYLINDER_SIZE
+ real_size = ((((real_size - 1) / (1024 * 256)) + 1) * (1024 * 256))
+
+ return real_size
+
+
+def run_sfdisk_commands(commands, heads, sectors, cylinders, device,
+ as_root=True, stderr=None):
+ """Run the given commands under sfdisk.
+
+ :param commands: A string of sfdisk commands; each on a separate line.
+ :return: A 2-tuple containing the subprocess' stdout and stderr.
+ """
+ args = ['sfdisk',
+ '-D',
+ '-H', str(heads),
+ '-S', str(sectors)]
+ if cylinders is not None:
+ args.extend(['-C', str(cylinders)])
+ args.append(device)
+ proc = cmd_runner.run(
+ args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=stderr,
+ as_root=as_root)
+ return proc.communicate("%s\n" % commands)
+
+
+def create_partitions(board, media, fat_size, heads, sectors, cylinders=None):
+ """Partition the given media according to the board requirements.
+
+ :param board: A string with the board type (e.g. beagle, panda, etc)
+ :param media: A setup_partitions.Media object to partition.
+ :param fat_size: The type of FATs used in the boot partition (16 or 32).
+ :param heads: Number of heads to use in the disk geometry of
+ partitions.
+ :param sectors: Number of sectors to use in the disk geometry of
+ partitions.
+ :param cylinders: The number of cylinders to pass to sfdisk's -C argument.
+ If None the -C argument is not passed.
+ """
+ if media.is_block_device:
+ # Overwrite any existing partition tables with a fresh one.
+ proc = cmd_runner.run(
+ ['parted', '-s', media.path, 'mklabel', 'msdos'], as_root=True)
+ proc.wait()
+
+ if fat_size == 32:
+ partition_type = '0x0C'
+ else:
+ partition_type = '0x0E'
+
+ if board == 'mx51evk':
+ # Create a one cylinder partition for fixed-offset bootloader data at
+ # the beginning of the image (size is one cylinder, so 8224768 bytes
+ # with the first sector for MBR).
+ run_sfdisk_commands(',1,0xDA', heads, sectors, cylinders, media.path)
+
+ # Create a VFAT or FAT16 partition of 9 cylinders (74027520 bytes, ~70
+ # MiB), followed by a Linux-type partition containing the rest of the free
+ # space.
+ sfdisk_cmd = ',9,%s,*\n,,,-' % partition_type
+ run_sfdisk_commands(sfdisk_cmd, heads, sectors, cylinders, media.path)
+
+ # Sync and sleep to wait for the partition to settle.
+ cmd_runner.run(['sync']).wait()
+ # Sleeping just 1 second seems to be enough here, but if we start getting
+ # errors because the disk is not partitioned then we should revisit this.
+ # XXX: This sleep can probably die now; need to do more tests before doing
+ # so, though.
+ time.sleep(1)
+
+
+class Media(object):
+ """A representation of the media where Linaro will be installed."""
+
+ def __init__(self, path):
+ self.path = path
+ self.is_block_device = path.startswith('/dev/')
diff --git a/linaro_media_create/populate_boot.py b/linaro_media_create/populate_boot.py
new file mode 100644
index 0000000..8aef8cc
--- /dev/null
+++ b/linaro_media_create/populate_boot.py
@@ -0,0 +1,167 @@
+import errno
+import glob
+import os
+
+from linaro_media_create import cmd_runner
+
+
+def _run_mkimage(img_type, load_addr, entry_point, name, img_data, img,
+ stdout=None, as_root=True):
+ cmd = ['mkimage',
+ '-A', 'arm',
+ '-O', 'linux',
+ '-T', img_type,
+ '-C', 'none',
+ '-a', load_addr,
+ '-e', load_addr,
+ '-n', name,
+ '-d', img_data,
+ img]
+ proc = cmd_runner.run(cmd, as_root=as_root, stdout=stdout)
+ proc.wait()
+ return proc.returncode
+
+
+def _get_file_matching(regex):
+ """Return a file whose path matches the given regex.
+
+ If zero or more than one files match, raise a ValueError.
+ """
+ files = glob.glob(regex)
+ if len(files) == 1:
+ return files[0]
+ elif len(files) == 0:
+ raise ValueError(
+ "No files found matching '%s'; can't continue" % regex)
+ else:
+ # TODO: Could ask the user to chosse which file to use instead of
+ # raising an exception.
+ raise ValueError("Too many files matching '%s' found." % regex)
+
+
+def make_uImage(load_addr, uboot_parts_dir, sub_arch, boot_disk):
+ img_data = _get_file_matching(
+ '%s/vmlinuz-*-%s' % (uboot_parts_dir, sub_arch))
+ img = '%s/uImage' % boot_disk
+ return _run_mkimage(
+ 'kernel', load_addr, load_addr, 'Linux', img_data, img)
+
+
+def make_uInitrd(uboot_parts_dir, sub_arch, boot_disk):
+ img_data = _get_file_matching(
+ '%s/initrd.img-*-%s' % (uboot_parts_dir, sub_arch))
+ img = '%s/uInitrd' % boot_disk
+ return _run_mkimage('ramdisk', '0', '0', 'initramfs', img_data, img)
+
+
+def make_boot_script(boot_script_data, tmp_dir, boot_script):
+ # Need to save the boot script data into a file that will be passed to
+ # mkimage.
+ data_file = '%s/boot.cmd' % tmp_dir
+ with open(data_file, 'w') as fd:
+ fd.write(boot_script_data)
+ return _run_mkimage(
+ 'script', '0', '0', 'boot script', data_file, boot_script)
+
+
+def install_mx51evk_boot_loader(imx_file, boot_device_or_file):
+ proc = cmd_runner.run([
+ "dd",
+ "if=%s" % imx_file,
+ "of=%s" % boot_device_or_file,
+ "bs=1024",
+ "seek=1",
+ "conv=notrunc"], as_root=True)
+ proc.wait()
+
+
+def install_omap_boot_loader(mlo_file, boot_disk):
+ cmd_runner.run(["cp", "-v", mlo_file, boot_disk], as_root=True).wait()
+ # XXX: Is this really needed?
+ cmd_runner.run(["sync"]).wait()
+
+
+def make_boot_ini(boot_script, boot_disk):
+ proc = cmd_runner.run(
+ ["cp", "-v", boot_script, "%s/boot.ini" % boot_disk], as_root=True)
+ proc.wait()
+
+
+def populate_boot(board, board_config, chroot_dir, boot_partition, boot_disk,
+ boot_device_or_file, tmp_dir, is_live):
+
+ parts_dir = 'boot'
+ if is_live:
+ parts_dir = 'casper'
+ uboot_parts_dir = os.path.join(chroot_dir, parts_dir)
+
+ try:
+ os.makedirs(boot_disk)
+ except OSError, exc:
+ if exc.errno == errno.EEXIST:
+ pass
+ else:
+ raise
+ cmd_runner.run(['mount', boot_partition, boot_disk], as_root=True).wait()
+
+ uboot_flavor = board_config.get('uboot_flavor')
+ if uboot_flavor is not None:
+ uboot_bin = os.path.join(
+ chroot_dir, 'usr', 'lib', 'u-boot', uboot_flavor, 'u-boot.bin')
+ cmd_runner.run(
+ ['cp', '-v', uboot_bin, boot_disk], as_root=True).wait()
+
+ boot_script = "%(boot_disk)s/%(boot_script_name)s" % (
+ dict(boot_disk=boot_disk,
+ boot_script_name=board_config['boot_script']))
+
+ load_addr = board_config['load_addr']
+ sub_arch = board_config['sub_arch']
+ boot_cmd = board_config['boot_cmd']
+
+ # TODO: Once linaro-media-create is fully ported to python, we should
+ # split this into several board-specific functions that are defined
+ # somewhere else and just called here.
+ if board in ["beagle", "panda"]:
+ xloader_dir = 'x-loader-omap'
+ if board == "panda":
+ xloader_dir = 'x-loader-omap4'
+ mlo_file = os.path.join(
+ chroot_dir, 'usr', 'lib', xloader_dir, 'MLO')
+ install_omap_boot_loader(mlo_file, boot_disk)
+ make_uImage(load_addr, uboot_parts_dir, sub_arch, boot_disk)
+ make_uInitrd(uboot_parts_dir, sub_arch, boot_disk)
+ make_boot_script(boot_cmd, tmp_dir, boot_script)
+ make_boot_ini(boot_script, boot_disk)
+
+ elif board == "igep":
+ make_uImage(load_addr, uboot_parts_dir, sub_arch, boot_disk)
+ make_uInitrd(uboot_parts_dir, sub_arch, boot_disk)
+ make_boot_script(boot_cmd, tmp_dir, boot_script)
+ make_boot_ini(boot_script, boot_disk)
+
+ elif board == "ux500":
+ make_uImage(load_addr, uboot_parts_dir, sub_arch, boot_disk)
+ make_uInitrd(uboot_parts_dir, sub_arch, boot_disk)
+ make_boot_script(boot_cmd, tmp_dir, boot_script)
+
+ elif board == "vexpress":
+ make_uImage(load_addr, uboot_parts_dir, sub_arch, boot_disk)
+ make_uInitrd(uboot_parts_dir, sub_arch, boot_disk)
+
+ elif board == "mx51evk":
+ install_mx51evk_boot_loader(
+ "binary/usr/lib/u-boot/mx51evk/u-boot.imx", boot_device_or_file)
+ make_uImage(load_addr, uboot_parts_dir, sub_arch, boot_disk)
+ make_uInitrd(uboot_parts_dir, sub_arch, boot_disk)
+ make_boot_script(boot_cmd, tmp_dir, boot_script)
+
+ else:
+ raise AssertionError(
+ "Internal error; missing support for board: %s" % board)
+
+ cmd_runner.run(['sync']).wait()
+ try:
+ cmd_runner.run(['umount', boot_disk], as_root=True).wait()
+ except cmd_runner.SubcommandNonZeroReturnValue:
+ pass
diff --git a/linaro_media_create/remove_binary_dir.py b/linaro_media_create/remove_binary_dir.py
new file mode 100644
index 0000000..c2b1438
--- /dev/null
+++ b/linaro_media_create/remove_binary_dir.py
@@ -0,0 +1,11 @@
+import os.path
+
+from linaro_media_create import cmd_runner
+
+
+def remove_dir(directory, as_root=True):
+ if os.path.exists(directory):
+ proc = cmd_runner.run(['rm', '-rf', directory], as_root=as_root)
+ proc.wait()
+ return proc.returncode
+ return 0
diff --git a/linaro_media_create/rootfs.py b/linaro_media_create/rootfs.py
new file mode 100644
index 0000000..3c8a721
--- /dev/null
+++ b/linaro_media_create/rootfs.py
@@ -0,0 +1,113 @@
+import glob
+import os
+import tempfile
+
+from linaro_media_create import cmd_runner
+
+
+def populate_rootfs(content_dir, root_disk, partition, rootfs_type,
+ rootfs_uuid, should_create_swap, swap_size,
+ partition_offset):
+ """Populate the rootfs and make the necessary tweaks to make it usable.
+
+ This consists of:
+ 1. Create a directory on the path specified by root_disk
+ 2. Mount the given partition onto the created directory.
+ 3. Move the contents of content_dir to that directory.
+ 4. If should_create_swap, then create it with the given size.
+ 5. Add fstab entries for the / filesystem and swap (if created).
+ 6. Create a /etc/flash-kernel.conf containing the target's boot device.
+ 7. Unmount the partition we mounted on step 2.
+ """
+ print "\nPopulating rootfs partition"
+ print "Be patient, this may take a few minutes\n"
+ # Create a directory to mount the rootfs partition.
+ os.makedirs(root_disk)
+
+ cmd_runner.run(['mount', partition, root_disk], as_root=True).wait()
+
+ move_contents(content_dir, root_disk)
+
+ fstab_additions = ["UUID=%s / %s errors=remount-ro 0 1 " % (
+ rootfs_uuid, rootfs_type)]
+ if should_create_swap:
+ print "\nCreating SWAP File\n"
+ if has_space_left_for_swap(root_disk, swap_size):
+ proc = cmd_runner.run([
+ 'dd',
+ 'if=/dev/zero',
+ 'of=%s/SWAP.swap' % root_disk,
+ 'bs=1M',
+ 'count=%s' % swap_size], as_root=True)
+ proc.wait()
+ proc = cmd_runner.run(
+ ['mkswap', '%s/SWAP.swap' % root_disk], as_root=True)
+ proc.wait()
+ fstab_additions.append("/SWAP.swap none swap sw 0 0")
+ else:
+ print ("Swap file is bigger than space left on partition; "
+ "continuing without swap.")
+
+ append_to_fstab(root_disk, fstab_additions)
+
+ print "\nCreating /etc/flash-kernel.conf\n"
+ create_flash_kernel_config(root_disk, 1 + partition_offset)
+
+ cmd_runner.run(['sync']).wait()
+ # The old code used to ignore failures here, but I don't think that's
+ # desirable so I'm using cmd_runner.run()'s standard behaviour, which will
+ # fail on a non-zero return value.
+ cmd_runner.run(['umount', root_disk], as_root=True).wait()
+
+
+def create_flash_kernel_config(root_disk, boot_partition_number):
+ """Create a flash-kernel.conf file under root_disk/etc.
+
+ Uses the given partition number to figure out the boot partition.
+ """
+ target_boot_dev = '/dev/mmcblk0p%s' % boot_partition_number
+ flash_kernel = os.path.join(root_disk, 'etc', 'flash-kernel.conf')
+ write_data_to_protected_file(
+ flash_kernel, "UBOOT_PART=%s" % target_boot_dev)
+
+
+def move_contents(from_, root_disk):
+ """Move everything under from_ to the given root disk.
+
+ Uses sudo for moving.
+ """
+ assert os.path.isdir(from_), "%s is not a directory" % from_
+ files = glob.glob(os.path.join(from_, '*'))
+ mv_cmd = ['mv']
+ mv_cmd.extend(files)
+ mv_cmd.append(root_disk)
+ cmd_runner.run(mv_cmd, as_root=True).wait()
+
+
+def has_space_left_for_swap(root_disk, swap_size_in_mega_bytes):
+ """Is there enough space for a swap file in the given root disk?"""
+ statvfs = os.statvfs(root_disk)
+ free_space = statvfs.f_bavail * statvfs.f_bsize
+ swap_size_in_bytes = int(swap_size_in_mega_bytes) * 1024**2
+ if free_space >= swap_size_in_bytes:
+ return True
+ return False
+
+
+def append_to_fstab(root_disk, fstab_additions):
+ fstab = os.path.join(root_disk, 'etc', 'fstab')
+ data = open(fstab).read() + '\n' + '\n'.join(fstab_additions)
+ write_data_to_protected_file(fstab, data)
+
+
+def write_data_to_protected_file(path, data):
+ """Write data to the file on the given path.
+
+ This is meant to be used when the given file is only writable by root, and
+ we overcome that by writing the data to a tempfile and then moving the
+ tempfile on top of the given one using sudo.
+ """
+ _, tmpfile = tempfile.mkstemp()
+ with open(tmpfile, 'w') as fd:
+ fd.write(data)
+ cmd_runner.run(['mv', '-f', tmpfile, path], as_root=True).wait()
diff --git a/linaro_media_create/tests/__init__.py b/linaro_media_create/tests/__init__.py
new file mode 100644
index 0000000..49b2b6d
--- /dev/null
+++ b/linaro_media_create/tests/__init__.py
@@ -0,0 +1,8 @@
+import unittest
+
+def test_suite():
+ module_names = ['linaro_media_create.tests.test_media_create',
+ ]
+ loader = unittest.TestLoader()
+ suite = loader.loadTestsFromNames(module_names)
+ return suite
diff --git a/linaro_media_create/tests/fixtures.py b/linaro_media_create/tests/fixtures.py
new file mode 100755
index 0000000..261917f
--- /dev/null
+++ b/linaro_media_create/tests/fixtures.py
@@ -0,0 +1,123 @@
+import os
+import shutil
+import subprocess
+import tempfile
+
+from linaro_media_create import partitions
+from linaro_media_create import cmd_runner
+
+
+class CreateTempDirFixture(object):
+
+ def __init__(self):
+ self.tempdir = None
+
+ def setUp(self):
+ self.tempdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ if os.path.exists(self.tempdir):
+ shutil.rmtree(self.tempdir)
+
+ def get_temp_dir(self):
+ return self.tempdir
+
+
+class CreateTarballFixture(object):
+
+ def __init__(self, dir):
+ self.dir = dir
+ self.tarball = os.path.join(self.dir, 'tarball.tar.gz')
+
+ def setUp(self):
+ # Create gzipped tar archive.
+ args = ['tar', '-czf', self.tarball, self.dir]
+ proc = subprocess.Popen(args)
+ proc.wait()
+
+ def tearDown(self):
+ if os.path.exists(self.tarball):
+ os.remove(self.tarball)
+
+ def get_tarball(self):
+ return self.tarball
+
+
+class MockSomethingFixture(object):
+ """A fixture which mocks something on the given object.
+
+ Replaces attr_name on obj with the given mock, undoing that upon
+ tearDown().
+ """
+
+ def __init__(self, obj, attr_name, mock):
+ self.obj = obj
+ self.attr_name = attr_name
+ self.mock = mock
+ self.orig_attr = getattr(obj, attr_name)
+
+ def setUp(self):
+ setattr(self.obj, self.attr_name, self.mock)
+
+ def tearDown(self):
+ setattr(self.obj, self.attr_name, self.orig_attr)
+
+
+class MockCmdRunnerPopen(object):
+ """A mock for cmd_runner.Popen() which stores the args given to it."""
+ calls = None
+ def __call__(self, cmd, *args, **kwargs):
+ if self.calls is None:
+ self.calls = []
+ if isinstance(cmd, basestring):
+ all_args = [cmd]
+ else:
+ all_args = cmd
+ all_args.extend(args)
+ self.calls.append(all_args)
+ self.returncode = 0
+ return self
+
+ def communicate(self, input=None):
+ self.wait()
+ return '', ''
+
+ def wait(self):
+ return self.returncode
+
+
+class MockCmdRunnerPopenFixture(MockSomethingFixture):
+ """A test fixture which mocks cmd_runner.do_run with the given mock.
+
+ If no mock is given, a MockCmdRunnerPopen instance is used.
+ """
+
+ def __init__(self, mock=None):
+ if mock is None:
+ mock = MockCmdRunnerPopen()
+ super(MockCmdRunnerPopenFixture, self).__init__(
+ cmd_runner, 'Popen', mock)
+
+
+class MockCallableWithPositionalArgs(object):
+ """A callable mock which just stores the positional args given to it.
+
+ Every time an instance of this is "called", it will append a tuple
+ containing the positional arguments given to it to self.calls.
+ """
+ calls = None
+ return_value = None
+ def __call__(self, *args):
+ if self.calls is None:
+ self.calls = []
+ self.calls.append(args)
+ return self.return_value
+
+
+class MockRunSfdiskCommandsFixture(MockSomethingFixture):
+
+ def __init__(self):
+ mock = MockCallableWithPositionalArgs()
+ mock.return_value = ('', '')
+ super(MockRunSfdiskCommandsFixture, self).__init__(
+ partitions, 'run_sfdisk_commands', mock)
diff --git a/linaro_media_create/tests/test_media_create.py b/linaro_media_create/tests/test_media_create.py
new file mode 100644
index 0000000..f02f80b
--- /dev/null
+++ b/linaro_media_create/tests/test_media_create.py
@@ -0,0 +1,1047 @@
+from contextlib import contextmanager
+import atexit
+import glob
+import os
+import random
+import string
+import subprocess
+import sys
+import time
+
+from testtools import TestCase
+from testtools.matchers import Mismatch
+
+from hwpack.testing import TestCaseWithFixtures
+
+from linaro_media_create import (
+ check_device,
+ cmd_runner,
+ ensure_command,
+ get_board_config,
+ populate_boot,
+ partitions,
+ rootfs,
+ ROOTFS_UUID,
+ )
+from linaro_media_create.hwpack import (
+ copy_file,
+ install_hwpack,
+ install_hwpacks,
+ mount_chroot_proc,
+ run_local_atexit_funcs,
+ temporarily_overwrite_file_on_dir,
+ )
+from linaro_media_create.partitions import (
+ calculate_partition_size_and_offset,
+ convert_size_to_bytes,
+ create_partitions,
+ ensure_partition_is_not_mounted,
+ get_boot_and_root_loopback_devices,
+ get_boot_and_root_partitions_for_media,
+ Media,
+ run_sfdisk_commands,
+ setup_partitions,
+ )
+from linaro_media_create.populate_boot import (
+ make_boot_script,
+ make_uImage,
+ make_uInitrd,
+ _get_file_matching,
+ _run_mkimage,
+ )
+from linaro_media_create.remove_binary_dir import remove_dir
+from linaro_media_create.rootfs import (
+ create_flash_kernel_config,
+ has_space_left_for_swap,
+ move_contents,
+ populate_rootfs,
+ write_data_to_protected_file,
+ )
+from linaro_media_create.unpack_binary_tarball import unpack_binary_tarball
+
+from linaro_media_create.tests.fixtures import (
+ CreateTempDirFixture,
+ CreateTarballFixture,
+ MockCmdRunnerPopenFixture,
+ MockSomethingFixture,
+ MockRunSfdiskCommandsFixture,
+ )
+
+
+class TestEnsureCommand(TestCase):
+
+ apt_get_called = False
+
+ def test_command_already_present(self):
+ with self.mock_apt_get_install():
+ ensure_command.ensure_command('apt-get', 'apt')
+ self.assertFalse(self.apt_get_called)
+
+ def test_command_not_present(self):
+ with self.mock_apt_get_install():
+ ensure_command.ensure_command('apt-get-two-o', 'apt-2')
+ self.assertTrue(self.apt_get_called)
+
+ @contextmanager
+ def mock_apt_get_install(self):
+ def mock_apt_get_install(cmd, pkg):
+ self.apt_get_called = True
+ orig_func = ensure_command.apt_get_install
+ ensure_command.apt_get_install = mock_apt_get_install
+ yield
+ ensure_command.apt_get_install = orig_func
+
+
+class IsEqualToDict(object):
+ """A testtools matcher to compare dicts.
+
+ When there are differences, only the differing keys/values are shown.
+ """
+
+ def __init__(self, expected):
+ self.expected = expected
+
+ def match(self, actual):
+ actual_keys = set(actual.keys())
+ expected_keys = set(self.expected.keys())
+ instersection = actual_keys.intersection(expected_keys)
+ expected_only_keys = expected_keys.difference(actual_keys)
+ actual_only_keys = actual_keys.difference(expected_keys)
+ keys_with_differing_values = []
+ for key in instersection:
+ if actual[key] != self.expected[key]:
+ keys_with_differing_values.append(key)
+
+ if (len(expected_only_keys) == 0 and len(actual_only_keys) == 0
+ and len(keys_with_differing_values) == 0):
+ return None
+
+ expected_diffs = []
+ for key in keys_with_differing_values + list(expected_only_keys):
+ expected_diffs.append("%s: %r" % (key, self.expected[key]))
+ expected_diffs = "\n".join(expected_diffs)
+
+ actual_diffs = []
+ for key in keys_with_differing_values + list(actual_only_keys):
+ actual_diffs.append("%s: %r" % (key, actual[key]))
+ actual_diffs = "\n".join(actual_diffs)
+
+ mismatch_string = "\na = %s\n" % expected_diffs
+ mismatch_string += "=" * 60 + "\n"
+ mismatch_string += "b = %s" % actual_diffs
+ return IsEqualToDictMismatch(self.expected, mismatch_string, actual)
+
+
+class IsEqualToDictMismatch(Mismatch):
+
+ def __init__(self, expected, mismatch_string, other):
+ self.expected = expected
+ self._mismatch_string = mismatch_string
+ self.other = other
+
+ def describe(self):
+ return self._mismatch_string
+
+
+class TestGetBoardConfig(TestCase):
+
+ expected_beagle_config = {
+ 'boot_cmd': (
+ "setenv bootcmd 'fatload mmc 0:1 0x80000000 uImage; "
+ "fatload mmc 0:1 0x81600000 uInitrd; bootm 0x80000000 "
+ "0x81600000'\nsetenv bootargs ' console=tty0 "
+ "console=ttyS2,115200n8 root=UUID=%s rootwait ro earlyprintk "
+ "fixrtc nocompcache vram=12M omapfb.debug=y "
+ "omapfb.mode=dvi:1280x720MR-16@60'\nboot" % ROOTFS_UUID),
+ 'boot_args_options': (
+ 'rootwait ro earlyprintk fixrtc nocompcache vram=12M '
+ 'omapfb.debug=y omapfb.mode=dvi:1280x720MR-16@60'),
+ 'boot_script': 'boot.scr',
+ 'fat_size': 32,
+ 'initrd_addr': '0x81600000',
+ 'kernel_addr': '0x80000000',
+ 'load_addr': '0x80008000',
+ 'mmc_option': '0:1',
+ 'mmc_part_offset': 0,
+ 'serial_opts': ' console=tty0 console=ttyS2,115200n8',
+ 'sub_arch': 'linaro-omap',
+ 'uboot_flavor': 'omap3_beagle'}
+
+ expected_panda_config = {
+ 'boot_cmd': (
+ "setenv bootcmd 'fatload mmc 0:1 0x80200000 uImage; fatload mmc "
+ "0:1 0x81600000 uInitrd; bootm 0x80200000 0x81600000'\nsetenv "
+ "bootargs ' console = tty0 console = ttyO2,115200n8 "
+ "root=UUID=%s rootwait ro earlyprintk fixrtc nocompcache "
+ "vram = 32M omapfb.debug = y omapfb.vram = 0:8M mem = 463M "
+ "ip = none'\nboot" % ROOTFS_UUID),
+ 'boot_args_options': (
+ 'rootwait ro earlyprintk fixrtc nocompcache vram = 32M '
+ 'omapfb.debug = y omapfb.vram = 0:8M mem = 463M ip = none'),
+ 'boot_script': 'boot.scr',
+ 'fat_size': 32,
+ 'initrd_addr': '0x81600000',
+ 'kernel_addr': '0x80200000',
+ 'load_addr': '0x80008000',
+ 'mmc_option': '0:1',
+ 'mmc_part_offset': 0,
+ 'serial_opts': ' console = tty0 console = ttyO2,115200n8',
+ 'sub_arch': 'omap4',
+ 'uboot_flavor': 'omap4_panda'}
+
+ expected_ux500_config = {
+ 'boot_args_options': (
+ 'rootwait ro earlyprintk rootdelay = 1 fixrtc nocompcache '
+ 'mem = 96M@0 mem_modem = 32M@96M mem = 44M@128M pmem = 22M@172M '
+ 'mem = 30M@194M mem_mali = 32M@224M pmem_hwb = 54M@256M '
+ 'hwmem = 48M@302M mem = 152M@360M'),
+ 'boot_cmd': (
+ "setenv bootcmd 'fatload mmc 1:1 0x00100000 uImage; fatload mmc "
+ "1:1 0x08000000 uInitrd; bootm 0x00100000 0x08000000'\nsetenv "
+ "bootargs ' console = tty0 console = ttyAMA2,115200n8 "
+ "root=UUID=%s rootwait ro earlyprintk rootdelay = 1 fixrtc "
+ "nocompcache mem = 96M@0 mem_modem = 32M@96M mem = 44M@128M "
+ "pmem = 22M@172M mem = 30M@194M mem_mali = 32M@224M "
+ "pmem_hwb = 54M@256M hwmem = 48M@302M mem = 152M@360M'\nboot"
+ % ROOTFS_UUID),
+ 'boot_script': 'flash.scr',
+ 'fat_size': 32,
+ 'initrd_addr': '0x08000000',
+ 'kernel_addr': '0x00100000',
+ 'load_addr': '0x00008000',
+ 'mmc_option': '1:1',
+ 'mmc_part_offset': 0,
+ 'serial_opts': ' console = tty0 console = ttyAMA2,115200n8',
+ 'sub_arch': 'ux500',
+ 'uboot_flavor': None}
+
+ expected_vexpress_config = {
+ 'boot_args_options': 'rootwait ro',
+ 'boot_cmd': (
+ "setenv bootcmd 'fatload mmc 0:1 0x60008000 uImage; fatload mmc "
+ "0:1 0x81000000 uInitrd; bootm 0x60008000 0x81000000'\nsetenv "
+ "bootargs ' console = tty0 console = ttyAMA0,38400n8 "
+ "root=UUID=%s rootwait ro'\nboot" % ROOTFS_UUID),
+ 'boot_script': None,
+ 'fat_size': 16,
+ 'initrd_addr': '0x81000000',
+ 'kernel_addr': '0x60008000',
+ 'load_addr': '0x60008000',
+ 'mmc_option': '0:1',
+ 'mmc_part_offset': 0,
+ 'serial_opts': ' console = tty0 console = ttyAMA0,38400n8',
+ 'sub_arch': 'linaro-vexpress',
+ 'uboot_flavor': 'ca9x4_ct_vxp'}
+
+ expected_mx51evk_config = {
+ 'boot_args_options': 'rootwait ro',
+ 'boot_cmd': (
+ "setenv bootcmd 'fatload mmc 0:2 0x90000000 uImage; fatload mmc "
+ "0:2 0x90800000 uInitrd; bootm 0x90000000 0x90800000'\nsetenv "
+ "bootargs ' console = tty0 console = ttymxc0,115200n8 "
+ "root=UUID=%s rootwait ro'\nboot" % ROOTFS_UUID),
+ 'boot_script': 'boot.scr',
+ 'fat_size': 32,
+ 'initrd_addr': '0x90800000',
+ 'kernel_addr': '0x90000000',
+ 'load_addr': '0x90008000',
+ 'mmc_option': '0:2',
+ 'mmc_part_offset': 1,
+ 'serial_opts': ' console = tty0 console = ttymxc0,115200n8',
+ 'sub_arch': 'linaro-mx51',
+ 'uboot_flavor': None}
+
+ def test_unknown_board(self):
+ self.assertRaises(
+ ValueError, get_board_config, 'foobar', is_live=True,
+ is_lowmem=False, consoles=None)
+
+ def test_vexpress_live(self):
+ config = get_board_config(
+ 'vexpress', is_live=True, is_lowmem=False, consoles=None)
+ expected = self.expected_vexpress_config.copy()
+ expected['boot_cmd'] = (
+ "setenv bootcmd 'fatload mmc 0:1 0x60008000 uImage; fatload mmc "
+ "0:1 0x81000000 uInitrd; bootm 0x60008000 0x81000000'\nsetenv "
+ "bootargs ' console = tty0 console = ttyAMA0,38400n8 "
+ "serialtty = ttyAMA0 boot=casper rootwait ro'\nboot")
+ expected['serial_opts'] = (
+ ' console = tty0 console = ttyAMA0,38400n8 serialtty = ttyAMA0')
+ self.assertThat(expected, IsEqualToDict(config))
+
+ def test_vexpress(self):
+ config = get_board_config(
+ 'vexpress', is_live=False, is_lowmem=False, consoles=None)
+ self.assertThat(self.expected_vexpress_config, IsEqualToDict(config))
+
+ def test_mx51evk_live(self):
+ config = get_board_config(
+ 'mx51evk', is_live=True, is_lowmem=False, consoles=None)
+ expected = self.expected_mx51evk_config.copy()
+ expected['boot_cmd'] = (
+ "setenv bootcmd 'fatload mmc 0:2 0x90000000 uImage; "
+ "fatload mmc 0:2 0x90800000 uInitrd; bootm 0x90000000 "
+ "0x90800000'\nsetenv bootargs ' console = tty0 "
+ "console = ttymxc0,115200n8 serialtty = ttymxc0 boot=casper "
+ "rootwait ro'\nboot")
+ expected['serial_opts'] = (
+ ' console = tty0 console = ttymxc0,115200n8 serialtty = ttymxc0')
+ self.assertThat(expected, IsEqualToDict(config))
+
+ def test_mx51evk(self):
+ config = get_board_config(
+ 'mx51evk', is_live=False, is_lowmem=False, consoles=None)
+ self.assertThat(self.expected_mx51evk_config, IsEqualToDict(config))
+
+ def test_ux500_live(self):
+ config = get_board_config(
+ 'ux500', is_live=True, is_lowmem=False, consoles=None)
+ boot_cmd = (
+ "setenv bootcmd 'fatload mmc 1:1 0x00100000 uImage; fatload "
+ "mmc 1:1 0x08000000 uInitrd; bootm 0x00100000 0x08000000'\n"
+ "setenv bootargs ' console = tty0 console = ttyAMA2,115200n8 "
+ "serialtty = ttyAMA2 boot=casper rootwait ro earlyprintk "
+ "rootdelay = 1 fixrtc nocompcache mem = 96M@0 "
+ "mem_modem = 32M@96M mem = 44M@128M pmem = 22M@172M "
+ "mem = 30M@194M mem_mali = 32M@224M pmem_hwb = 54M@256M "
+ "hwmem = 48M@302M mem = 152M@360M'\nboot")
+ expected = self.expected_ux500_config.copy()
+ expected['boot_cmd'] = boot_cmd
+ expected['serial_opts'] = (
+ ' console = tty0 console = ttyAMA2,115200n8 serialtty = ttyAMA2')
+ self.assertThat(expected, IsEqualToDict(config))
+
+ def test_ux500(self):
+ config = get_board_config(
+ 'ux500', is_live=False, is_lowmem=False, consoles=None)
+ self.assertThat(self.expected_ux500_config, IsEqualToDict(config))
+
+ def test_panda(self):
+ config = get_board_config(
+ 'panda', is_live=False, is_lowmem=False, consoles=None)
+ self.assertThat(self.expected_panda_config, IsEqualToDict(config))
+
+ def test_panda_live(self):
+ config = get_board_config(
+ 'panda', is_live=True, is_lowmem=False, consoles=None)
+ boot_cmd = (
+ "setenv bootcmd 'fatload mmc 0:1 0x80200000 uImage; "
+ "fatload mmc 0:1 0x81600000 uInitrd; bootm 0x80200000 "
+ "0x81600000'\nsetenv bootargs ' console = tty0 "
+ "console = ttyO2,115200n8 serialtty = ttyO2 boot=casper "
+ "rootwait ro earlyprintk fixrtc nocompcache vram = 32M "
+ "omapfb.debug = y omapfb.vram = 0:8M mem = 463M ip = none'\nboot")
+ expected = self.expected_panda_config.copy()
+ expected['boot_cmd'] = boot_cmd
+ expected['serial_opts'] = (
+ ' console = tty0 console = ttyO2,115200n8 serialtty = ttyO2')
+ self.assertThat(expected, IsEqualToDict(config))
+
+ def test_beagle(self):
+ config = get_board_config(
+ 'beagle', is_live=False, is_lowmem=False, consoles=None)
+ self.assertThat(self.expected_beagle_config, IsEqualToDict(config))
+
+ def test_beagle_live(self):
+ config = get_board_config(
+ 'beagle', is_live=True, is_lowmem=False, consoles=None)
+ boot_cmd = (
+ "setenv bootcmd 'fatload mmc 0:1 0x80000000 uImage; "
+ "fatload mmc 0:1 0x81600000 uInitrd; bootm 0x80000000 "
+ "0x81600000'\nsetenv bootargs ' console=tty0 "
+ "console=ttyS2,115200n8 serialtty=ttyS2 boot=casper rootwait ro "
+ "earlyprintk fixrtc nocompcache vram=12M omapfb.debug=y "
+ "omapfb.mode=dvi:1280x720MR-16@60'\nboot")
+ expected = self.expected_beagle_config.copy()
+ expected['boot_cmd'] = boot_cmd
+ expected['serial_opts'] = (
+ ' console=tty0 console=ttyS2,115200n8 serialtty=ttyS2')
+ self.assertThat(expected, IsEqualToDict(config))
+
+
+class TestRemoveBinaryDir(TestCaseWithFixtures):
+
+ def setUp(self):
+ super(TestRemoveBinaryDir, self).setUp()
+ self.temp_dir_fixture = CreateTempDirFixture()
+ self.useFixture(self.temp_dir_fixture)
+
+ def test_remove_dir(self):
+ rc = remove_dir(
+ self.temp_dir_fixture.get_temp_dir(), as_root=False)
+ self.assertEqual(rc, 0)
+ self.assertFalse(
+ os.path.exists(self.temp_dir_fixture.get_temp_dir()))
+
+
+class TestUnpackBinaryTarball(TestCaseWithFixtures):
+
+ def setUp(self):
+ super(TestUnpackBinaryTarball, self).setUp()
+
+ self.tar_dir_fixture = CreateTempDirFixture()
+ self.useFixture(self.tar_dir_fixture)
+
+ self.tarball_fixture = CreateTarballFixture(
+ self.tar_dir_fixture.get_temp_dir())
+ self.useFixture(self.tarball_fixture)
+
+ def test_unpack_binary_tarball(self):
+ tmp_dir = self.useFixture(CreateTempDirFixture()).get_temp_dir()
+ rc = unpack_binary_tarball(
+ self.tarball_fixture.get_tarball(), tmp_dir, as_root=False)
+ self.assertEqual(rc, 0)
+
+
+class TestCmdRunner(TestCaseWithFixtures):
+
+ def test_run(self):
+ fixture = MockCmdRunnerPopenFixture()
+ self.useFixture(fixture)
+ proc = cmd_runner.run(['foo', 'bar', 'baz'])
+ self.assertEqual(0, proc.returncode)
+ self.assertEqual([['foo', 'bar', 'baz']], fixture.mock.calls)
+
+ def test_run_as_root(self):
+ fixture = MockCmdRunnerPopenFixture()
+ self.useFixture(fixture)
+ cmd_runner.run(['foo', 'bar'], as_root=True)
+ self.assertEqual([['sudo', 'foo', 'bar']], fixture.mock.calls)
+
+ def test_run_succeeds_on_zero_return_code(self):
+ proc = cmd_runner.run(['true'])
+ # Need to wait() here as we're using the real Popen.
+ proc.wait()
+ self.assertEqual(0, proc.returncode)
+
+ def test_run_raises_exception_on_non_zero_return_code(self):
+ def run_and_wait():
+ proc = cmd_runner.run(['false'])
+ proc.wait()
+ self.assertRaises(
+ cmd_runner.SubcommandNonZeroReturnValue, run_and_wait)
+
+ def test_run_must_be_given_list_as_args(self):
+ self.assertRaises(AssertionError, cmd_runner.run, 'true')
+
+ def test_Popen(self):
+ proc = cmd_runner.Popen('true')
+ returncode = proc.wait()
+ self.assertEqual(0, returncode)
+
+
+class TestPopulateBoot(TestCaseWithFixtures):
+
+ def _mock_get_file_matching(self):
+ self.useFixture(MockSomethingFixture(
+ populate_boot, '_get_file_matching',
+ lambda regex: regex))
+
+ def _mock_Popen(self):
+ fixture = MockCmdRunnerPopenFixture()
+ self.useFixture(fixture)
+ return fixture
+
+ def test_make_uImage(self):
+ self._mock_get_file_matching()
+ fixture = self._mock_Popen()
+ make_uImage('load_addr', 'parts_dir', 'sub_arch', 'boot_disk')
+ expected = [
+ 'sudo', 'mkimage', '-A', 'arm', '-O', 'linux', '-T', 'kernel',
+ '-C', 'none', '-a', 'load_addr', '-e', 'load_addr', '-n', 'Linux',
+ '-d', 'parts_dir/vmlinuz-*-sub_arch', 'boot_disk/uImage']
+ self.assertEqual([expected], fixture.mock.calls)
+
+ def test_make_uInitrd(self):
+ self._mock_get_file_matching()
+ fixture = self._mock_Popen()
+ make_uInitrd('parts_dir', 'sub_arch', 'boot_disk')
+ expected = [
+ 'sudo', 'mkimage', '-A', 'arm', '-O', 'linux', '-T', 'ramdisk',
+ '-C', 'none', '-a', '0', '-e', '0', '-n', 'initramfs',
+ '-d', 'parts_dir/initrd.img-*-sub_arch', 'boot_disk/uInitrd']
+ self.assertEqual([expected], fixture.mock.calls)
+
+ def test_make_boot_script(self):
+ self._mock_get_file_matching()
+ fixture = self._mock_Popen()
+ tempdir = self.useFixture(CreateTempDirFixture()).tempdir
+ make_boot_script('boot script data', tempdir, 'boot_script')
+ expected = [
+ 'sudo', 'mkimage', '-A', 'arm', '-O', 'linux', '-T', 'script',
+ '-C', 'none', '-a', '0', '-e', '0', '-n', 'boot script',
+ '-d', '%s/boot.cmd' % tempdir, 'boot_script']
+ self.assertEqual([expected], fixture.mock.calls)
+
+ def test_get_file_matching(self):
+ prefix = ''.join(
+ random.choice(string.ascii_lowercase) for x in range(5))
+ file1 = self.createTempFileAsFixture(prefix)
+ directory = os.path.dirname(file1)
+ self.assertEqual(
+ file1, _get_file_matching('%s/%s*' % (directory, prefix)))
+
+ def test_get_file_matching_too_many_files_found(self):
+ prefix = ''.join(
+ random.choice(string.ascii_lowercase) for x in range(5))
+ file1 = self.createTempFileAsFixture(prefix)
+ file2 = self.createTempFileAsFixture(prefix)
+ directory = os.path.dirname(file1)
+ self.assertRaises(
+ ValueError, _get_file_matching, '%s/%s*' % (directory, prefix))
+
+ def test_get_file_matching_no_files_found(self):
+ self.assertRaises(
+ ValueError, _get_file_matching, '/foo/bar/baz/*non-existent')
+
+ def test_run_mkimage(self):
+ # Create a fake boot script.
+ filename = self.createTempFileAsFixture()
+ f = open(filename, 'w')
+ f.write("setenv bootcmd 'fatload mmc 0:1 0x80000000 uImage;\nboot")
+ f.close()
+
+ img = self.createTempFileAsFixture()
+ # Use that fake boot script to create a boot loader using mkimage.
+ # Send stdout to /dev/null as mkimage will print to stdout and we
+ # don't want that.
+ retval = _run_mkimage(
+ 'script', '0', '0', 'boot script', filename, img,
+ stdout=open('/dev/null', 'w'), as_root=False)
+
+ self.assertEqual(0, retval)
+
+
+class TestCreatePartitions(TestCaseWithFixtures):
+
+ media = Media('/dev/xdz')
+
+ def setUp(self):
+ super(TestCreatePartitions, self).setUp()
+ # Stub time.sleep() as create_partitions() use that.
+ self.orig_sleep = time.sleep
+ time.sleep = lambda s: None
+
+ def tearDown(self):
+ super(TestCreatePartitions, self).tearDown()
+ time.sleep = self.orig_sleep
+
+ def test_create_partitions_for_mx51evk(self):
+ # For this board we create a one cylinder partition at the beginning.
+ popen_fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ sfdisk_fixture = self.useFixture(MockRunSfdiskCommandsFixture())
+
+ create_partitions('mx51evk', self.media, 32, 255, 63, '')
+
+ self.assertEqual(
+ [['sudo', 'parted', '-s', self.media.path, 'mklabel', 'msdos'],
+ ['sync']],
+ popen_fixture.mock.calls)
+ self.assertEqual(
+ [(',1,0xDA', 255, 63, '', self.media.path),
+ (',9,0x0C,*\n,,,-', 255, 63, '', self.media.path)],
+ sfdisk_fixture.mock.calls)
+
+ def test_create_partitions_for_beagle(self):
+ popen_fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ sfdisk_fixture = self.useFixture(MockRunSfdiskCommandsFixture())
+
+ create_partitions('beagle', self.media, 32, 255, 63, '')
+
+ self.assertEqual(
+ [['sudo', 'parted', '-s', self.media.path, 'mklabel', 'msdos'],
+ ['sync']],
+ popen_fixture.mock.calls)
+ self.assertEqual(
+ [(',9,0x0C,*\n,,,-', 255, 63, '', self.media.path)],
+ sfdisk_fixture.mock.calls)
+
+ def test_create_partitions_with_img_file(self):
+ popen_fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ sfdisk_fixture = self.useFixture(MockRunSfdiskCommandsFixture())
+
+ tempfile = self.createTempFileAsFixture()
+ create_partitions('beagle', Media(tempfile), 32, 255, 63, '')
+
+ # Unlike the test for partitioning of a regular block device, in this
+ # case parted was not called as there's no existing partition table
+ # for us to overwrite on the image file.
+ self.assertEqual([['sync']], popen_fixture.mock.calls)
+
+ self.assertEqual(
+ [(',9,0x0C,*\n,,,-', 255, 63, '', tempfile)],
+ sfdisk_fixture.mock.calls)
+
+ def test_run_sfdisk_commands(self):
+ tempfile = self.createTempFileAsFixture()
+ proc = cmd_runner.run(
+ ['qemu-img', 'create', '-f', 'raw', tempfile, '10M'],
+ stdout=subprocess.PIPE)
+ proc.communicate()
+ stdout, stderr = run_sfdisk_commands(
+ ',1,0xDA', 5, 63, '', tempfile, as_root=False,
+ stderr=subprocess.PIPE)
+ self.assertIn('Successfully wrote the new partition table', stdout)
+
+ def test_run_sfdisk_commands_raises_on_non_zero_returncode(self):
+ tempfile = self.createTempFileAsFixture()
+ self.assertRaises(
+ cmd_runner.SubcommandNonZeroReturnValue,
+ run_sfdisk_commands,
+ ',1,0xDA', 5, 63, '', tempfile, as_root=False,
+ stderr=subprocess.PIPE)
+
+
+class TestPartitionSetup(TestCaseWithFixtures):
+
+ def setUp(self):
+ super(TestPartitionSetup, self).setUp()
+ # Stub time.sleep() as create_partitions() use that.
+ self.orig_sleep = time.sleep
+ time.sleep = lambda s: None
+
+ def tearDown(self):
+ super(TestPartitionSetup, self).tearDown()
+ time.sleep = self.orig_sleep
+
+ def test_convert_size_in_kbytes_to_bytes(self):
+ self.assertEqual(512 * 1024, convert_size_to_bytes('512K'))
+
+ def test_convert_size_in_mbytes_to_bytes(self):
+ self.assertEqual(100 * 1024**2, convert_size_to_bytes('100M'))
+
+ def test_convert_size_in_gbytes_to_bytes(self):
+ self.assertEqual(12 * 1024**3, convert_size_to_bytes('12G'))
+
+ def test_convert_size_in_kbytes_to_bytes_rounds_to_256k_multiple(self):
+ # See comment in convert_size_to_bytes as to why we need to do this.
+ self.assertEqual(
+ 3891 * (1024 * 256), convert_size_to_bytes('1000537K'))
+
+ def test_calculate_partition_size_and_offset(self):
+ tempfile = self._create_qemu_img_with_partitions(',1,0x0C,*\n,,,-')
+ vfat_size, vfat_offset, linux_size, linux_offset = (
+ calculate_partition_size_and_offset(tempfile))
+ self.assertEqual(
+ [129024L, 32256L, 10321920L, 161280L],
+ [vfat_size, vfat_offset, linux_size, linux_offset])
+
+ def test_get_boot_and_root_partitions_for_media_with_2_partitions(self):
+ self.useFixture(MockSomethingFixture(
+ partitions, '_get_partition_count', lambda media: 2))
+ tempfile = self._create_qemu_img_with_partitions(',1,0x0C,*\n,,,-')
+ media = Media(tempfile)
+ # Pretend the image file is a block device, or else
+ # get_boot_and_root_partitions_for_media will choke.
+ media.is_block_device = True
+ self.assertEqual(
+ ("%s%d" % (tempfile, 1), "%s%d" % (tempfile, 2)),
+ get_boot_and_root_partitions_for_media(media))
+
+ def test_get_boot_and_root_partitions_for_media_with_3_partitions(self):
+ self.useFixture(MockSomethingFixture(
+ partitions, '_get_partition_count', lambda media: 3))
+ tempfile = self._create_qemu_img_with_partitions(
+ ',1,0xDA\n,1,0x0C,*\n,,,-')
+ media = Media(tempfile)
+ # Pretend the image file is a block device, or else
+ # get_boot_and_root_partitions_for_media will choke.
+ media.is_block_device = True
+ self.assertEqual(
+ ("%s%d" % (tempfile, 2), "%s%d" % (tempfile, 3)),
+ get_boot_and_root_partitions_for_media(media))
+
+ def _create_qemu_img_with_partitions(self, sfdisk_commands):
+ tempfile = self.createTempFileAsFixture()
+ proc = cmd_runner.run(
+ ['qemu-img', 'create', '-f', 'raw', tempfile, '10M'],
+ stdout=subprocess.PIPE)
+ proc.communicate()
+ stdout, stderr = run_sfdisk_commands(
+ sfdisk_commands, 5, 63, '', tempfile, as_root=False,
+ # Throw away stderr as sfdisk complains a lot when operating on a
+ # qemu image.
+ stderr=subprocess.PIPE)
+ self.assertIn('Successfully wrote the new partition table', stdout)
+ return tempfile
+
+ def test_ensure_partition_is_not_mounted_for_mounted_partition(self):
+ self.useFixture(MockSomethingFixture(
+ partitions, 'is_partition_mounted', lambda part: True))
+ popen_fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ ensure_partition_is_not_mounted('/dev/whatever')
+ self.assertEqual(
+ [['sudo', 'umount', '/dev/whatever']], popen_fixture.mock.calls)
+
+ def test_ensure_partition_is_not_mounted_for_umounted_partition(self):
+ self.useFixture(MockSomethingFixture(
+ partitions, 'is_partition_mounted', lambda part: False))
+ popen_fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ ensure_partition_is_not_mounted('/dev/whatever')
+ self.assertEqual(None, popen_fixture.mock.calls)
+
+ def test_get_boot_and_root_loopback_devices(self):
+ tempfile = self._create_qemu_img_with_partitions(',1,0x0C,*\n,,,-')
+ atexit_fixture = self.useFixture(MockSomethingFixture(
+ atexit, 'register', AtExitRegister()))
+ popen_fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ # We can't test the return value of get_boot_and_root_loopback_devices
+ # because it'd require running losetup as root, so we just make sure
+ # it calls losetup correctly.
+ get_boot_and_root_loopback_devices(tempfile)
+ self.assertEqual(
+ [['sudo', 'losetup', '-f', '--show', tempfile, '--offset',
+ '32256', '--sizelimit', '129024'],
+ ['sudo', 'losetup', '-f', '--show', tempfile, '--offset',
+ '161280', '--sizelimit', '10321920']],
+ popen_fixture.mock.calls)
+
+ # get_boot_and_root_loopback_devices will also setup two exit handlers
+ # to de-register the loopback devices set up above.
+ self.assertEqual(2, len(atexit_fixture.mock.funcs))
+ popen_fixture.mock.calls = []
+ atexit_fixture.mock.run_funcs()
+ # We did not really run losetup above (as it requires root) so here we
+ # don't have a device to pass to 'losetup -d', but when a device is
+ # setup it is passed to the atexit handler.
+ self.assertEquals(
+ [['sudo', 'losetup', '-d', ''], ['sudo', 'losetup', '-d', '']],
+ popen_fixture.mock.calls)
+
+ def test_setup_partitions_for_image_file(self):
+ # In practice we could pass an empty image file to setup_partitions,
+ # but here we mock Popen() and thanks to that the image is not setup
+ # (via qemu-img) inside setup_partitions. That's why we pass an
+ # already setup image file.
+ tempfile = self._create_qemu_img_with_partitions(',1,0x0C,*\n,,,-')
+ popen_fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ self.useFixture(MockSomethingFixture(
+ sys, 'stdout', open('/dev/null', 'w')))
+ self.useFixture(MockSomethingFixture(
+ partitions, 'is_partition_mounted', lambda part: False))
+ self.useFixture(MockSomethingFixture(
+ partitions, 'get_boot_and_root_loopback_devices',
+ lambda image: ('/dev/loop99', '/dev/loop98')))
+ uuid = '2e82008e-1af3-4699-8521-3bf5bac1e67a'
+ bootfs, rootfs = setup_partitions(
+ 'beagle', Media(tempfile), 32, '2G', 'boot', 'root', 'ext3',
+ uuid, True, True, True)
+ self.assertEqual(
+ # This is the call that would create the image file.
+ [['qemu-img', 'create', '-f', 'raw', tempfile, '2G'],
+ # This call would partition the image file.
+ ['sudo', 'sfdisk', '-D', '-H', '255', '-S', '63', '-C', '261',
+ tempfile],
+ # Make sure changes are written to disk.
+ ['sync'],
+ ['sudo', 'mkfs.vfat', '-F', '32', bootfs, '-n', 'boot'],
+ ['sudo', 'mkfs.ext3', '-U', uuid, rootfs, '-L', 'root']],
+ popen_fixture.mock.calls)
+
+ def test_setup_partitions_for_block_device(self):
+ self.useFixture(MockSomethingFixture(
+ sys, 'stdout', open('/dev/null', 'w')))
+ self.useFixture(MockSomethingFixture(
+ partitions, '_get_partition_count', lambda media: 2))
+ # Pretend the partitions are mounted.
+ self.useFixture(MockSomethingFixture(
+ partitions, 'is_partition_mounted', lambda part: True))
+ tempfile = self._create_qemu_img_with_partitions(',1,0x0C,*\n,,,-')
+ media = Media(tempfile)
+ # Pretend our tempfile is a block device.
+ media.is_block_device = True
+ popen_fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ uuid = '2e82008e-1af3-4699-8521-3bf5bac1e67a'
+ bootfs, rootfs = setup_partitions(
+ 'beagle', media, 32, '2G', 'boot', 'root', 'ext3', uuid, True,
+ True, True)
+ self.assertEqual(
+ [['sudo', 'parted', '-s', tempfile, 'mklabel', 'msdos'],
+ ['sudo', 'sfdisk', '-D', '-H', '255', '-S', '63', tempfile],
+ ['sync'],
+ # Since the partitions are mounted, setup_partitions will umount
+ # them before running mkfs.
+ ['sudo', 'umount', bootfs],
+ ['sudo', 'mkfs.vfat', '-F', '32', bootfs, '-n', 'boot'],
+ ['sudo', 'umount', rootfs],
+ ['sudo', 'mkfs.ext3', '-U', uuid, rootfs, '-L', 'root']],
+ popen_fixture.mock.calls)
+
+
+class TestPopulateRootFS(TestCaseWithFixtures):
+
+ lines_added_to_fstab = None
+ create_flash_kernel_config_called = False
+
+ def test_populate_rootfs(self):
+ def fake_append_to_fstab(disk, additions):
+ self.lines_added_to_fstab = additions
+
+ def fake_create_flash_kernel_config(disk, partition_offset):
+ self.create_flash_kernel_config_called = True
+
+ # Mock stdout, cmd_runner.Popen(), append_to_fstab and
+ # create_flash_kernel_config.
+ self.useFixture(MockSomethingFixture(
+ sys, 'stdout', open('/dev/null', 'w')))
+ self.useFixture(MockSomethingFixture(
+ rootfs, 'append_to_fstab', fake_append_to_fstab))
+ self.useFixture(MockSomethingFixture(
+ rootfs, 'create_flash_kernel_config',
+ fake_create_flash_kernel_config))
+ popen_fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ # Store a dummy rootdisk and contents_dir in a tempdir.
+ tempdir = self.useFixture(CreateTempDirFixture()).tempdir
+ root_disk = os.path.join(tempdir, 'rootdisk')
+ contents_dir = os.path.join(tempdir, 'contents')
+ contents_bin = os.path.join(contents_dir, 'bin')
+ contents_etc = os.path.join(contents_dir, 'etc')
+ os.makedirs(contents_bin)
+ os.makedirs(contents_etc)
+
+ populate_rootfs(
+ contents_dir, root_disk, partition='/dev/rootfs',
+ rootfs_type='ext3', rootfs_uuid='uuid', should_create_swap=True,
+ swap_size=100, partition_offset=0)
+
+ self.assertEqual(
+ ['UUID=uuid / ext3 errors=remount-ro 0 1 ',
+ '/SWAP.swap none swap sw 0 0'],
+ self.lines_added_to_fstab)
+ self.assertEqual(True, self.create_flash_kernel_config_called)
+ swap_file = os.path.join(root_disk, 'SWAP.swap')
+ expected = [
+ ['sudo', 'mount', '/dev/rootfs', root_disk],
+ ['sudo', 'mv', contents_bin, contents_etc, root_disk],
+ ['sudo', 'dd', 'if=/dev/zero', 'of=%s' % swap_file, 'bs=1M',
+ 'count=100'],
+ ['sudo', 'mkswap', swap_file],
+ ['sync'],
+ ['sudo', 'umount', root_disk]]
+ self.assertEqual(expected, popen_fixture.mock.calls)
+
+ def test_create_flash_kernel_config(self):
+ fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ tempdir = self.useFixture(CreateTempDirFixture()).tempdir
+
+ create_flash_kernel_config(tempdir, boot_partition_number=1)
+
+ calls = fixture.mock.calls
+ self.assertEqual(1, len(calls), calls)
+ call = calls[0]
+ # The call writes to a tmpfile and then moves it to the,
+ # /etc/flash-kernel.conf, so the tmpfile is the next to last in the
+ # list of arguments stored.
+ tmpfile = call[-2]
+ self.assertEqual(
+ ['sudo', 'mv', '-f', tmpfile,
+ '%s/etc/flash-kernel.conf' % tempdir],
+ call)
+ self.assertEqual('UBOOT_PART=/dev/mmcblk0p1', open(tmpfile).read())
+
+ def test_move_contents(self):
+ tempdir = self.useFixture(CreateTempDirFixture()).tempdir
+ popen_fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ file1 = self.createTempFileAsFixture(dir=tempdir)
+
+ move_contents(tempdir, '/tmp/')
+
+ self.assertEqual([['sudo', 'mv', file1, '/tmp/']],
+ popen_fixture.mock.calls)
+
+ def test_has_space_left_for_swap(self):
+ statvfs = os.statvfs('/')
+ space_left = statvfs.f_bavail * statvfs.f_bsize
+ swap_size_in_megs = space_left / (1024**2)
+ self.assertTrue(
+ has_space_left_for_swap('/', swap_size_in_megs))
+
+ def test_has_no_space_left_for_swap(self):
+ statvfs = os.statvfs('/')
+ space_left = statvfs.f_bavail * statvfs.f_bsize
+ swap_size_in_megs = (space_left / (1024**2)) + 1
+ self.assertFalse(
+ has_space_left_for_swap('/', swap_size_in_megs))
+
+ def test_write_data_to_protected_file(self):
+ fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ data = 'foo'
+ path = '/etc/nonexistant'
+
+ write_data_to_protected_file(path, data)
+
+ calls = fixture.mock.calls
+ self.assertEqual(1, len(calls), calls)
+ call = calls[0]
+ # The call moves tmpfile to the given path, so tmpfile is the next to
+ # last in the list of arguments stored.
+ tmpfile = call[-2]
+ self.assertEqual(['sudo', 'mv', '-f', tmpfile, path], call)
+ self.assertEqual(data, open(tmpfile).read())
+
+
+class TestCheckDevice(TestCaseWithFixtures):
+
+ def _mock_does_device_exist_true(self):
+ self.useFixture(MockSomethingFixture(
+ check_device, '_does_device_exist', lambda device: True))
+
+ def _mock_does_device_exist_false(self):
+ self.useFixture(MockSomethingFixture(
+ check_device, '_does_device_exist', lambda device: False))
+
+ def _mock_print_devices(self):
+ self.useFixture(MockSomethingFixture(
+ check_device, '_print_devices', lambda: None))
+
+ def _mock_select_device(self):
+ self.useFixture(MockSomethingFixture(
+ check_device, '_select_device', lambda device: True))
+
+ def _mock_deselect_device(self):
+ self.useFixture(MockSomethingFixture(
+ check_device, '_select_device', lambda device: False))
+
+ def _mock_sys_stdout(self):
+ self.useFixture(MockSomethingFixture(
+ sys, 'stdout', open(os.devnull, 'w')))
+
+ def setUp(self):
+ super(TestCheckDevice, self).setUp()
+ self._mock_sys_stdout()
+ self._mock_print_devices()
+
+ def test_ensure_device_partitions_not_mounted(self):
+ partitions_umounted = []
+ def ensure_partition_is_not_mounted_mock(part):
+ partitions_umounted.append(part)
+ self.useFixture(MockSomethingFixture(
+ partitions, 'ensure_partition_is_not_mounted',
+ ensure_partition_is_not_mounted_mock))
+ self.useFixture(MockSomethingFixture(
+ glob, 'glob', lambda pattern: ['/dev/sdz1', '/dev/sdz2']))
+ check_device._ensure_device_partitions_not_mounted('/dev/sdz')
+ self.assertEquals(['/dev/sdz1', '/dev/sdz2'], partitions_umounted)
+
+ def test_check_device_and_select(self):
+ self._mock_does_device_exist_true()
+ self._mock_select_device()
+ self.assertTrue(
+ check_device.confirm_device_selection_and_ensure_it_is_ready(
+ None))
+
+ def test_check_device_and_deselect(self):
+ self._mock_does_device_exist_true()
+ self._mock_deselect_device()
+ self.assertFalse(
+ check_device.confirm_device_selection_and_ensure_it_is_ready(
+ None))
+
+ def test_check_device_not_found(self):
+ self._mock_does_device_exist_false()
+ self.assertFalse(
+ check_device.confirm_device_selection_and_ensure_it_is_ready(
+ None))
+
+
+class AtExitRegister(object):
+ funcs = None
+ def __call__(self, func, *args, **kwargs):
+ if self.funcs is None:
+ self.funcs = []
+ self.funcs.append((func, args, kwargs))
+
+ def run_funcs(self):
+ for func, args, kwargs in self.funcs:
+ func(*args, **kwargs)
+
+
+
+class TestInstallHWPack(TestCaseWithFixtures):
+
+ def test_temporarily_overwrite_file_on_dir(self):
+ fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ temporarily_overwrite_file_on_dir('/path/to/file', '/dir', '/tmp/dir')
+ self.assertEquals(
+ [['sudo', 'mv', '-f', '/dir/file', '/tmp/dir/file'],
+ ['sudo', 'cp', '/path/to/file', '/dir']],
+ fixture.mock.calls)
+
+ fixture.mock.calls = []
+ run_local_atexit_funcs()
+ self.assertEquals(
+ [['sudo', 'mv', '-f', '/tmp/dir/file', '/dir']],
+ fixture.mock.calls)
+
+ def test_copy_file(self):
+ fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ copy_file('/path/to/file', '/dir')
+ self.assertEquals(
+ [['sudo', 'cp', '/path/to/file', '/dir']],
+ fixture.mock.calls)
+
+ fixture.mock.calls = []
+ run_local_atexit_funcs()
+ self.assertEquals(
+ [['sudo', 'rm', '-f', '/dir/file']], fixture.mock.calls)
+
+ def test_mount_chroot_proc(self):
+ fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ mount_chroot_proc('chroot')
+ self.assertEquals(
+ [['sudo', 'mount', 'proc', 'chroot/proc', '-t', 'proc']],
+ fixture.mock.calls)
+
+ fixture.mock.calls = []
+ run_local_atexit_funcs()
+ self.assertEquals(
+ [['sudo', 'umount', '-v', 'chroot/proc']], fixture.mock.calls)
+
+ def test_install_hwpack(self):
+ self.useFixture(MockSomethingFixture(
+ sys, 'stdout', open('/dev/null', 'w')))
+ fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ force_yes = False
+ install_hwpack('chroot', 'hwpack.tgz', force_yes)
+ self.assertEquals(
+ [['sudo', 'cp', 'hwpack.tgz', 'chroot'],
+ ['sudo', 'chroot', 'chroot', 'linaro-hwpack-install',
+ '/hwpack.tgz']],
+ fixture.mock.calls)
+
+ fixture.mock.calls = []
+ run_local_atexit_funcs()
+ self.assertEquals(
+ [['sudo', 'rm', '-f', 'chroot/hwpack.tgz']], fixture.mock.calls)
+
+ def test_install_hwpacks(self):
+ self.useFixture(MockSomethingFixture(
+ sys, 'stdout', open('/dev/null', 'w')))
+ fixture = self.useFixture(MockCmdRunnerPopenFixture())
+ force_yes = True
+ install_hwpacks(
+ 'chroot', '/tmp/dir', force_yes, 'hwpack1.tgz', 'hwpack2.tgz')
+ self.assertEquals(
+ [['sudo', 'mv', '-f', 'chroot/etc/resolv.conf',
+ '/tmp/dir/resolv.conf'],
+ ['sudo', 'cp', '/etc/resolv.conf', 'chroot/etc'],
+ ['sudo', 'mv', '-f', 'chroot/etc/hosts', '/tmp/dir/hosts'],
+ ['sudo', 'cp', '/etc/hosts', 'chroot/etc'],
+ ['sudo', 'cp', '/usr/bin/qemu-arm-static', 'chroot/usr/bin'],
+ ['sudo', 'cp', 'linaro_media_create/../linaro-hwpack-install',
+ 'chroot/usr/bin'],
+ ['sudo', 'mount', 'proc', 'chroot/proc', '-t', 'proc'],
+ ['sudo', 'cp', 'hwpack1.tgz', 'chroot'],
+ ['sudo', 'chroot', 'chroot', 'linaro-hwpack-install',
+ '--force-yes', '/hwpack1.tgz'],
+ ['sudo', 'cp', 'hwpack2.tgz', 'chroot'],
+ ['sudo', 'chroot', 'chroot', 'linaro-hwpack-install',
+ '--force-yes', '/hwpack2.tgz'],
+ ['sudo', 'rm', '-f', 'chroot/hwpack2.tgz'],
+ ['sudo', 'rm', '-f', 'chroot/hwpack1.tgz'],
+ ['sudo', 'umount', '-v', 'chroot/proc'],
+ ['sudo', 'rm', '-f', 'chroot/usr/bin/linaro-hwpack-install'],
+ ['sudo', 'rm', '-f', 'chroot/usr/bin/qemu-arm-static'],
+ ['sudo', 'mv', '-f', '/tmp/dir/hosts', 'chroot/etc'],
+ ['sudo', 'mv', '-f', '/tmp/dir/resolv.conf', 'chroot/etc']],
+ fixture.mock.calls)
diff --git a/linaro_media_create/unpack_binary_tarball.py b/linaro_media_create/unpack_binary_tarball.py
new file mode 100644
index 0000000..8eeea29
--- /dev/null
+++ b/linaro_media_create/unpack_binary_tarball.py
@@ -0,0 +1,9 @@
+from linaro_media_create import cmd_runner
+
+
+def unpack_binary_tarball(tarball, unpack_dir, as_root=True):
+ proc = cmd_runner.run(
+ ['tar', '--numeric-owner', '-C', unpack_dir, '-xf', tarball],
+ as_root=as_root)
+ proc.wait()
+ return proc.returncode