diff options
author | Guilherme Salgado <salgado@canonical.com> | 2011-01-11 15:26:54 -0600 |
---|---|---|
committer | Guilherme Salgado <salgado@canonical.com> | 2011-01-11 15:26:54 -0600 |
commit | 6759a060b6c741c8b0f830db83a11b64d9ddefb8 (patch) | |
tree | 6ff43cf5f57c631b0d687150c6f6a786ace03d6e /linaro_media_create | |
parent | 0e4ecd19250300e1f5ee3e733f85e9d2f6285749 (diff) |
Rename the media_create python package to linaro_media_create, to match the name of the deb package (python-linaro-media-create)
Diffstat (limited to 'linaro_media_create')
-rw-r--r-- | linaro_media_create/__init__.py | 207 | ||||
-rw-r--r-- | linaro_media_create/boot_cmd.py | 0 | ||||
-rwxr-xr-x | linaro_media_create/check_device.py | 114 | ||||
-rw-r--r-- | linaro_media_create/cmd_runner.py | 58 | ||||
-rw-r--r-- | linaro_media_create/ensure_command.py | 13 | ||||
-rw-r--r-- | linaro_media_create/hwpack.py | 118 | ||||
-rw-r--r-- | linaro_media_create/partitions.py | 299 | ||||
-rw-r--r-- | linaro_media_create/populate_boot.py | 167 | ||||
-rw-r--r-- | linaro_media_create/remove_binary_dir.py | 11 | ||||
-rw-r--r-- | linaro_media_create/rootfs.py | 113 | ||||
-rw-r--r-- | linaro_media_create/tests/__init__.py | 8 | ||||
-rwxr-xr-x | linaro_media_create/tests/fixtures.py | 123 | ||||
-rw-r--r-- | linaro_media_create/tests/test_media_create.py | 1047 | ||||
-rw-r--r-- | linaro_media_create/unpack_binary_tarball.py | 9 |
14 files changed, 2287 insertions, 0 deletions
diff --git a/linaro_media_create/__init__.py b/linaro_media_create/__init__.py new file mode 100644 index 0000000..e22e9f8 --- /dev/null +++ b/linaro_media_create/__init__.py @@ -0,0 +1,207 @@ +import argparse +import uuid + +ROOTFS_UUID = str(uuid.uuid4()) +KNOWN_BOARDS = ['beagle', 'igep', 'mx51evk', 'panda', 'ux500', 'vexpress'] + + +class Live256MegsAction(argparse.Action): + """A custom argparse.Action for the --live-256m option. + + It is a store_true action for the given dest plus a store_true action for + 'is_live'. + """ + + def __init__(self, option_strings, dest, default=None, required=False, + help=None, metavar=None): + super(Live256MegsAction, self).__init__( + option_strings=option_strings, dest=dest, nargs=0, + default=False, required=required, help=help) + + def __call__(self, parser, namespace, values, option_string=None): + setattr(namespace, self.dest, True) + setattr(namespace, 'is_live', True) + + +def get_args_parser(): + """Get the ArgumentParser for the arguments given on the command line.""" + parser = argparse.ArgumentParser() + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + '--mmc', dest='device', help='The storage device to use.') + group.add_argument( + '--image_file', dest='device', + help='File where we should write the QEMU image.') + parser.add_argument( + '--dev', required=True, dest='board', choices=KNOWN_BOARDS, + help='Generate an SD card or image for the given board.') + parser.add_argument( + '--rootfs', default='ext3', choices=['ext2', 'ext3', 'ext4', 'btrfs'], + help='Type of filesystem to use for the rootfs') + parser.add_argument( + '--rfs_label', default='rootfs', + help='Label to use for the root filesystem.') + parser.add_argument( + '--boot_label', default='boot', + help='Label to use for the boot filesystem.') + parser.add_argument( + '--swap_file', type=int, + help='Create a swap file of the given size (in MBs).') + group = parser.add_mutually_exclusive_group() + group.add_argument( + '--live', dest='is_live', action='store_true', + help=('Create boot command for casper/live images; if this is not ' + 'provided a UUID for the rootfs is generated and used as the ' + 'root= option')) + group.add_argument( + '--live-256m', dest='is_lowmem', action=Live256MegsAction, + help=('Create boot command for casper/live images; adds ' + 'only-ubiquity option to allow use of live installer on ' + 'boards with 256M memory - like beagle.')) + parser.add_argument( + '--console', action='append', dest='consoles', + help=('Add a console to kernel boot parameter; this parameter can be ' + 'defined multiple times.')) + parser.add_argument( + '--hwpack', action='append', dest='hwpacks', required=True, + help=('A hardware pack that should be installed in the rootfs; this ' + 'parameter can be defined multiple times.')) + parser.add_argument( + '--hwpack-force-yes', action='store_true', + help='Pass --force-yes to linaro-hwpack-install') + parser.add_argument( + '--image_size', default='2G', + help=('The image size, specified in mega/giga bytes (e.g. 3000M or ' + '3G); use with --image_file only')) + parser.add_argument( + '--binary', default='binary-tar.tar.gz', required=True, + help=('The tarball containing the rootfs used to create the bootable ' + 'system.')) + parser.add_argument( + '--no-rootfs', dest='should_format_rootfs', action='store_false', + help='Do not deploy the root filesystem.') + parser.add_argument( + '--no-bootfs', dest='should_format_bootfs', action='store_false', + help='Do not deploy the boot filesystem.') + parser.add_argument( + '--no-part', dest='should_create_partitions', action='store_false', + help='Reuse existing partitions on the given media.') + return parser + + +def get_board_config(board, is_live, is_lowmem, consoles): + """Return a dict containing the configs to create an image for a board. + + :param args: An argparse.ArgumentParser object containing the arguments + passed to linaro-media-create. + """ + mmc_part_offset = 0 + mmc_option = '0:1' + boot_args_options = 'rootwait ro' + uboot_flavor = None + fat_size = 32 + serial_opts = '' + if consoles is not None: + for console in consoles: + serial_opts += ' console=%s' % console + + # XXX: I think this is not needed as we have board-specific + # serial options for when is_live is true. + if is_live: + serial_opts += ' serialtty=ttyS2' + + if board in ('beagle', 'igep'): + if board == 'beagle': + uboot_flavor = 'omap3_beagle' + serial_opts += ' console=tty0 console=ttyS2,115200n8' + live_serial_opts = 'serialtty=ttyS2' + kernel_addr = '0x80000000' + initrd_addr = '0x81600000' + load_addr = '0x80008000' + sub_arch = 'linaro-omap' + boot_script = 'boot.scr' + boot_args_options += ( + ' earlyprintk fixrtc nocompcache vram=12M omapfb.debug=y' + ' omapfb.mode=dvi:1280x720MR-16@60') + + elif board == 'panda': + uboot_flavor = 'omap4_panda' + serial_opts += ' console = tty0 console = ttyO2,115200n8' + live_serial_opts = 'serialtty = ttyO2' + kernel_addr = '0x80200000' + initrd_addr = '0x81600000' + load_addr = '0x80008000' + sub_arch = 'omap4' + boot_script = 'boot.scr' + boot_args_options += ( + ' earlyprintk fixrtc nocompcache vram = 32M omapfb.debug = y' + ' omapfb.vram = 0:8M mem = 463M ip = none') + + elif board == 'ux500': + serial_opts += ' console = tty0 console = ttyAMA2,115200n8' + live_serial_opts = 'serialtty = ttyAMA2' + kernel_addr = '0x00100000' + initrd_addr = '0x08000000' + load_addr = '0x00008000' + sub_arch = 'ux500' + boot_script = 'flash.scr' + boot_args_options += ( + ' earlyprintk rootdelay = 1 fixrtc nocompcache' + ' mem = 96M@0 mem_modem = 32M@96M mem = 44M@128M pmem = 22M@172M' + ' mem = 30M@194M mem_mali = 32M@224M pmem_hwb = 54M@256M' + ' hwmem = 48M@302M mem = 152M@360M') + mmc_option = '1:1' + + elif board == 'mx51evk': + serial_opts += ' console = tty0 console = ttymxc0,115200n8' + live_serial_opts = 'serialtty = ttymxc0' + kernel_addr = '0x90000000' + initrd_addr = '0x90800000' + load_addr = '0x90008000' + sub_arch = 'linaro-mx51' + boot_script = 'boot.scr' + mmc_part_offset = 1 + mmc_option = '0:2' + + elif board == 'vexpress': + uboot_flavor = 'ca9x4_ct_vxp' + serial_opts += ' console = tty0 console = ttyAMA0,38400n8' + live_serial_opts = 'serialtty = ttyAMA0' + kernel_addr = '0x60008000' + initrd_addr = '0x81000000' + load_addr = kernel_addr + sub_arch = 'linaro-vexpress' + boot_script = None + # ARM Boot Monitor is used to load u-boot, uImage etc. into flash and + # only allows for FAT16 + fat_size = 16 + + else: + raise ValueError("Unkown board: %s" % board) + + lowmem_opt = '' + boot_snippet = 'root=UUID=%s' % ROOTFS_UUID + if is_live: + serial_opts += ' %s' % live_serial_opts + boot_snippet = 'boot=casper' + if is_lowmem: + lowmem_opt = 'only-ubiquity' + + boot_cmd = ( + "setenv bootcmd 'fatload mmc %(mmc_option)s %(kernel_addr)s " + "uImage; fatload mmc %(mmc_option)s %(initrd_addr)s uInitrd; " + "bootm %(kernel_addr)s %(initrd_addr)s'\n" + "setenv bootargs '%(serial_opts)s %(lowmem_opt)s " + "%(boot_snippet)s %(boot_args_options)s'\n" + "boot" % vars()) + + # Instead of constructing a dict here, we could create a separate class + # for the config of every board, with the varying bits stored as class + # variables. At this point I don't see much advantage in doing that, + # though. + return dict( + kernel_addr=kernel_addr, initrd_addr=initrd_addr, load_addr=load_addr, + sub_arch=sub_arch, boot_script=boot_script, fat_size=fat_size, + boot_args_options=boot_args_options, serial_opts=serial_opts, + uboot_flavor=uboot_flavor, mmc_part_offset=mmc_part_offset, + mmc_option=mmc_option, boot_cmd=boot_cmd) diff --git a/linaro_media_create/boot_cmd.py b/linaro_media_create/boot_cmd.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/linaro_media_create/boot_cmd.py diff --git a/linaro_media_create/check_device.py b/linaro_media_create/check_device.py new file mode 100755 index 0000000..6130ff5 --- /dev/null +++ b/linaro_media_create/check_device.py @@ -0,0 +1,114 @@ +import glob +import sys + +import dbus + +from linaro_media_create import partitions + + +def _get_system_bus_and_udisks_iface(): + """Return the system bus and the UDisks interface. + + :return: System bus and UDisks inteface tuple. + """ + bus = dbus.SystemBus() + udisks = dbus.Interface( + bus.get_object("org.freedesktop.UDisks", "/org/freedesktop/UDisks"), + 'org.freedesktop.UDisks') + + return (bus, udisks) + + +def _get_dbus_property(prop, device, path): + """ Return a named property for a specific device. + + :param prop: Named property. + :param device: Device object. + :param path: Device path. + :return: Device property. + """ + return device.Get( + path, prop, dbus_interface='org.freedesktop.DBus.Properties') + + +def _does_device_exist(path): + """Checks if the provided path is an existing device. + + :param path: Disk device path. + :return: True if the device exist, else False. + """ + bus, udisks = _get_system_bus_and_udisks_iface() + try: + udisks.get_dbus_method('FindDeviceByDeviceFile')(path) + except dbus.exceptions.DBusException: + # TODO: Check that this exception isn't hiding other errors. + return False + + return True + + +def _print_devices(): + """Print disk devices found on the system.""" + bus, udisks = _get_system_bus_and_udisks_iface() + print '%-16s %-16s %s' % ('Device', 'Mount point', 'Size') + devices = udisks.get_dbus_method('EnumerateDevices')() + for path in devices: + device = bus.get_object("org.freedesktop.UDisks", path) + device_file = _get_dbus_property('DeviceFile', device, path) + + mount_paths = _get_dbus_property('device-mount-paths', device, path) + mount_point = ''.join(b for b in mount_paths) + if mount_point == '': + mount_point = 'none' + + if _get_dbus_property('DeviceIsPartition', device, path): + part_size = _get_dbus_property('partition-size', device, path) + print '%-16s %-16s %dMB' % ( + device_file, mount_point, part_size / 1024**2) + else: + device_size = _get_dbus_property('device-size', device, path) + print '%-16s %-16s %dMB' % ( + device_file, mount_point, device_size / 1024**2) + + +def _select_device(device): + """Ask the user to confirm the selected device. + + :param device: Device path. + :return: True if the user confirms the selection, else False. + """ + resp = raw_input('Are you 100%% sure, on selecting [%s] (y/n)? ' % device) + if resp.lower() != 'y': + return False + return True + + +def _ensure_device_partitions_not_mounted(device): + """Ensure all partitions of the given device are not mounted.""" + # Use '%s?*' as we only want the device files representing + # partitions and not the one representing the device itself. + for part in glob.glob('%s?*' % device): + partitions.ensure_partition_is_not_mounted(part) + + +def confirm_device_selection_and_ensure_it_is_ready(device): + """Confirm this is the device to use and ensure it's ready. + + If the device exists, the user is asked to confirm that this is the + device to use. Upon confirmation we ensure all partitions of the device + are umounted. + + :param device: The path to the device. + :return: True if the device exist and is selected, else False. + """ + if _does_device_exist(device): + print '\nI see...' + _print_devices() + if _select_device(device): + _ensure_device_partitions_not_mounted(device) + return True + else: + print '\nAre you sure? I do not see [%s].' % device + print 'Here is what I see...' + _print_devices() + return False diff --git a/linaro_media_create/cmd_runner.py b/linaro_media_create/cmd_runner.py new file mode 100644 index 0000000..22c603c --- /dev/null +++ b/linaro_media_create/cmd_runner.py @@ -0,0 +1,58 @@ +import os +import subprocess + + +def run(args, as_root=False, stdin=None, stdout=None, stderr=None): + """Run the given command as a sub process. + + Return a Popen instance. + + Callsites must wait() or communicate() with the returned Popen instance. + + :param command: A list or tuple containing the command to run and the + arguments that should be passed to it. + :param as_root: Should the given command be run as root (with sudo)? + :param stdin: Same as in subprocess.Popen(). + :param stdout: Same as in subprocess.Popen(). + :param stderr: Same as in subprocess.Popen(). + """ + assert isinstance(args, (list, tuple)), ( + "The command to run must be a list or tuple, found: %s" % type(args)) + # TODO: We might want to always use 'sudo -E' here to avoid problems like + # https://launchpad.net/bugs/673570 + if as_root: + args = args[:] + args.insert(0, 'sudo') + return Popen(args, stdin=stdin, stdout=stdout, stderr=stderr) + + +class Popen(subprocess.Popen): + """A version of Popen which raises an error on non-zero returncode. + + Once the subprocess completes we check its returncode and raise + SubcommandNonZeroReturnValue if it's non-zero. + """ + + def __init__(self, args, env=None, **kwargs): + self._my_args = args + if env is None: + env = os.environ.copy() + env['LC_ALL'] = 'C' + super(Popen, self).__init__(args, env=env, **kwargs) + + def wait(self): + returncode = super(Popen, self).wait() + if returncode != 0: + raise SubcommandNonZeroReturnValue(self._my_args, returncode) + return returncode + + +class SubcommandNonZeroReturnValue(Exception): + + def __init__(self, command, return_value): + self.command = command + self.retval = return_value + + def __str__(self): + return 'Sub process "%s" returned a non-zero value: %d' % ( + self.command, self.retval) diff --git a/linaro_media_create/ensure_command.py b/linaro_media_create/ensure_command.py new file mode 100644 index 0000000..ad8755e --- /dev/null +++ b/linaro_media_create/ensure_command.py @@ -0,0 +1,13 @@ +import os +import sys + + +def apt_get_install(command, package): + print ("Installing required command %s from package %s" + % (command, package)) + os.system('sudo apt-get install %s' % package) + + +def ensure_command(command, package): + if os.system('which %s 2>/dev/null 1>/dev/null' % command) != 0: + apt_get_install(command, package) diff --git a/linaro_media_create/hwpack.py b/linaro_media_create/hwpack.py new file mode 100644 index 0000000..54a7ff5 --- /dev/null +++ b/linaro_media_create/hwpack.py @@ -0,0 +1,118 @@ +import os +import platform + +from linaro_media_create import cmd_runner +from linaro_media_create.ensure_command import ensure_command + + +# It'd be nice if we could use atexit here, but all the things we need to undo +# have to happen right after install_hwpacks completes and the atexit +# functions would only be called after l-m-c.py exits. +local_atexit = [] + +def install_hwpacks(chroot_dir, tmp_dir, hwpack_force_yes, *hwpack_files): + """Install the given hwpacks onto the given chroot.""" + + chroot_etc = os.path.join(chroot_dir, 'etc') + temporarily_overwrite_file_on_dir('/etc/resolv.conf', chroot_etc, tmp_dir) + temporarily_overwrite_file_on_dir('/etc/hosts', chroot_etc, tmp_dir) + + if not platform.machine().startswith('arm'): + ensure_command('qemu-arm-static', 'qemu-arm-static') + ensure_command('qemu-img', 'qemu-kvm') + copy_file('/usr/bin/qemu-arm-static', + os.path.join(chroot_dir, 'usr', 'bin')) + + # FIXME: This is an ugly hack to make sure we use the l-h-i script from + # the current development tree when possible. + here = os.path.dirname(__file__) + linaro_hwpack_install_path = os.path.join( + here, '..', 'linaro-hwpack-install') + if not os.path.exists(linaro_hwpack_install_path): + linaro_hwpack_install_path = '/usr/bin/linaro-hwpack-install' + copy_file(linaro_hwpack_install_path, + os.path.join(chroot_dir, 'usr', 'bin')) + + mount_chroot_proc(chroot_dir) + + for hwpack_file in hwpack_files: + install_hwpack(chroot_dir, hwpack_file, hwpack_force_yes) + + run_local_atexit_funcs() + + +def install_hwpack(chroot_dir, hwpack_file, hwpack_force_yes): + """Install an hwpack on the given chroot. + + Copy the hwpack file to the chroot and run linaro-hwpack-install passing + that hwpack file to it. If hwpack_force_yes is True, also pass + --force-yes to linaro-hwpack-install. + """ + hwpack_basename = os.path.basename(hwpack_file) + copy_file(hwpack_file, chroot_dir) + print "-" * 60 + print "Installing (apt-get) %s in target rootfs." % hwpack_basename + args = ['chroot', chroot_dir, 'linaro-hwpack-install'] + if hwpack_force_yes: + args.append('--force-yes') + args.append('/%s' % hwpack_basename) + cmd_runner.run(args, as_root=True).wait() + print "-" * 60 + + +def mount_chroot_proc(chroot_dir): + """Mount a /proc filesystem on the given chroot. + + Also register a function in local_atexit to unmount that /proc filesystem. + """ + chroot_proc = os.path.join(chroot_dir, 'proc') + proc = cmd_runner.run( + ['mount', 'proc', chroot_proc, '-t', 'proc'], as_root=True) + proc.wait() + def umount_chroot_proc(): + cmd_runner.run(['umount', '-v', chroot_proc], as_root=True).wait() + local_atexit.append(umount_chroot_proc) + + +def copy_file(filepath, directory): + """Copy the given file to the given directory. + + The copying of the file is done in a subprocess and run using sudo. + + We also register a function in local_atexit to remove the file from the + given directory. + """ + cmd_runner.run(['cp', filepath, directory], as_root=True).wait() + + def undo(): + new_path = os.path.join(directory, os.path.basename(filepath)) + cmd_runner.run(['rm', '-f', new_path], as_root=True).wait() + local_atexit.append(undo) + + +def temporarily_overwrite_file_on_dir(filepath, directory, tmp_dir): + """Temporarily replace a file on the given directory. + + We'll move the existing file on the given directory to a temp dir, then + copy over the given file to that directory and register a function in + local_atexit to move the orig file back to the given directory. + """ + basename = os.path.basename(filepath) + path_to_orig = os.path.join(tmp_dir, basename) + # Move the existing file from the given directory to the temp dir. + cmd_runner.run( + ['mv', '-f', os.path.join(directory, basename), path_to_orig], + as_root=True).wait() + # Now copy the given file onto the given directory. + cmd_runner.run(['cp', filepath, directory], as_root=True).wait() + + def undo(): + cmd_runner.run( + ['mv', '-f', path_to_orig, directory], as_root=True).wait() + local_atexit.append(undo) + + +def run_local_atexit_funcs(): + # Run the funcs in LIFO order, just like atexit does. + while len(local_atexit) > 0: + local_atexit.pop()() diff --git a/linaro_media_create/partitions.py b/linaro_media_create/partitions.py new file mode 100644 index 0000000..c68feb4 --- /dev/null +++ b/linaro_media_create/partitions.py @@ -0,0 +1,299 @@ +import atexit +import subprocess +import time + +import dbus +from parted import ( + Device, + Disk, + ) + +from linaro_media_create import cmd_runner + + +HEADS = 255 +SECTORS = 63 +SECTOR_SIZE = 512 # bytes +CYLINDER_SIZE = HEADS * SECTORS * SECTOR_SIZE +DBUS_PROPERTIES = 'org.freedesktop.DBus.Properties' +UDISKS = "org.freedesktop.UDisks" + + +# I wonder if it'd make sense to convert this into a small shim which calls +# the appropriate function for the given type of device? I think it's still +# small enough that there's not much benefit in doing that, but if it grows we +# might want to do it. +def setup_partitions(board, media, fat_size, image_size, + bootfs_label, rootfs_label, rootfs_type, rootfs_uuid, + should_create_partitions, should_format_bootfs, + should_format_rootfs): + """Make sure the given device is partitioned to boot the given board. + + :param board: The board's name, as a string. + :param media: The Media we should partition. + :param fat_size: The FAT size (16 or 32) for the boot partition. + :param image_size: The size of the image file, in case we're setting up a + QEMU image. + :param should_create_partitions: Whether or not we should erase existing + partitions and create new ones. + """ + cylinders = None + if not media.is_block_device: + image_size_in_bytes = convert_size_to_bytes(image_size) + cylinders = image_size_in_bytes / CYLINDER_SIZE + image_size_in_bytes = cylinders * CYLINDER_SIZE + proc = cmd_runner.run( + ['qemu-img', 'create', '-f', 'raw', media.path, image_size], + stdout=open('/dev/null', 'w')) + proc.wait() + + if should_create_partitions: + create_partitions(board, media, fat_size, HEADS, SECTORS, cylinders) + + if media.is_block_device: + bootfs, rootfs = get_boot_and_root_partitions_for_media(media) + else: + bootfs, rootfs = get_boot_and_root_loopback_devices(media.path) + + if should_format_bootfs: + print "\nFormating boot partition\n" + # It looks like KDE somehow automounts the partitions after you + # repartition a disk so we need to unmount them here to create the + # filesystem. + ensure_partition_is_not_mounted(bootfs) + proc = cmd_runner.run( + ['mkfs.vfat', '-F', str(fat_size), bootfs, '-n', bootfs_label], + as_root=True) + proc.wait() + + if should_format_rootfs: + print "\nFormating root partition\n" + mkfs = 'mkfs.%s' % rootfs_type + ensure_partition_is_not_mounted(rootfs) + proc = cmd_runner.run( + [mkfs, '-U', rootfs_uuid, rootfs, '-L', rootfs_label], + as_root=True) + proc.wait() + + return bootfs, rootfs + + +def ensure_partition_is_not_mounted(partition): + """Ensure the given partition is not mounted, umounting if necessary.""" + if is_partition_mounted(partition): + cmd_runner.run(['umount', partition], as_root=True).wait() + + +def is_partition_mounted(partition): + """Is the given partition mounted?""" + device_path = _get_udisks_device_path(partition) + device = dbus.SystemBus().get_object(UDISKS, device_path) + is_partition = device.Get( + device_path, 'DeviceIsPartition', dbus_interface=DBUS_PROPERTIES) + return device.Get( + device_path, 'DeviceIsMounted', dbus_interface=DBUS_PROPERTIES) + + +def get_boot_and_root_loopback_devices(image_file): + """Return the boot and root loopback devices for the given image file. + + Register the loopback devices as well. + """ + vfat_size, vfat_offset, linux_size, linux_offset = ( + calculate_partition_size_and_offset(image_file)) + boot_device = register_loopback(image_file, vfat_offset, vfat_size) + root_device = register_loopback(image_file, linux_offset, linux_size) + return boot_device, root_device + + +def register_loopback(image_file, offset, size): + """Register a loopback device with an atexit handler to de-register it.""" + def undo(device): + cmd_runner.run(['losetup', '-d', device], as_root=True).wait() + + proc = cmd_runner.run( + ['losetup', '-f', '--show', image_file, '--offset', + str(offset), '--sizelimit', str(size)], + stdout=subprocess.PIPE, as_root=True) + device, _ = proc.communicate() + device = device.strip() + atexit.register(undo, device) + return device + + +def calculate_partition_size_and_offset(image_file): + """Return the size and offset of the boot and root partitions. + + Both the size and offset are in sectors. + + :param image_file: A string containing the path to the image_file. + :return: A 4-tuple containing the offset and size of the boot partition + followed by the offset and size of the root partition. + """ + # Here we can use parted.Device to read the partitions because we're + # reading from a regular file rather than a block device. If it was a + # block device we'd need root rights. + disk = Disk(Device(image_file)) + vfat_partition = None + for partition in disk.partitions: + if 'boot' in partition.getFlagsAsString(): + geometry = partition.geometry + vfat_offset = geometry.start * 512 + vfat_size = geometry.length * 512 + vfat_partition = partition + + assert vfat_partition is not None, ( + "Couldn't find boot partition on %s" % image_file) + linux_partition = vfat_partition.nextPartition() + geometry = linux_partition.geometry + linux_offset = geometry.start * 512 + linux_size = geometry.length * 512 + return vfat_size, vfat_offset, linux_size, linux_offset + + +def get_boot_and_root_partitions_for_media(media): + """Return the device files for the boot and root partitions of media. + + If the given media has 2 partitions, the first is boot and the second is + root. If there are 3 partitions, the second is boot and third is root. + + If there are any other number of partitions, ValueError is raised. + + This function must only be used for block devices. + """ + assert media.is_block_device, ( + "This function must only be used for block devices") + + partition_count = _get_partition_count(media) + + if partition_count == 2: + partition_offset = 0 + elif partition_count == 3: + partition_offset = 1 + else: + raise ValueError( + "Unexpected number of partitions on %s: %d" % ( + media.path, partition_count)) + + boot_partition = "%s%d" % (media.path, 1 + partition_offset) + root_partition = "%s%d" % (media.path, 2 + partition_offset) + return boot_partition, root_partition + + +def _get_partition_count(media): + """Return the number of partitions on the given media.""" + # We could do the same easily using python-parted but it requires root + # rights to read block devices, so we use UDisks here. + device_path = _get_udisks_device_path(media.path) + device = dbus.SystemBus().get_object(UDISKS, device_path) + return device.Get( + device_path, 'PartitionTableCount', dbus_interface=DBUS_PROPERTIES) + + +def _get_udisks_device_path(device): + """Return the UDisks path for the given device.""" + bus = dbus.SystemBus() + udisks = dbus.Interface( + bus.get_object(UDISKS, "/org/freedesktop/UDisks"), UDISKS) + return udisks.get_dbus_method('FindDeviceByDeviceFile')(device) + + +def convert_size_to_bytes(size): + """Convert a size string in Kbytes, Mbytes or Gbytes to bytes.""" + unit = size[-1].upper() + real_size = int(size[:-1]) + if unit == 'K': + real_size = real_size * 1024 + elif unit == 'M': + real_size = real_size * 1024 * 1024 + elif unit == 'G': + real_size = real_size * 1024 * 1024 * 1024 + else: + raise ValueError("Unknown size format: %s. Use K[bytes], M[bytes] " + "or G[bytes]" % size) + + # Round the size of the raw disk image up to a multiple of 256K so it is + # an exact number of SD card erase blocks in length. Otherwise Linux + # under qemu cannot access the last part of the card and is likely to + # complain that the last partition on the disk has been truncated. This + # doesn't appear to work in all cases, though, as can be seen on + # https://bugs.launchpad.net/linux-linaro/+bug/673335. + if real_size % (1024 * 256): + cylinders = real_size / CYLINDER_SIZE + real_size = cylinders * CYLINDER_SIZE + real_size = ((((real_size - 1) / (1024 * 256)) + 1) * (1024 * 256)) + + return real_size + + +def run_sfdisk_commands(commands, heads, sectors, cylinders, device, + as_root=True, stderr=None): + """Run the given commands under sfdisk. + + :param commands: A string of sfdisk commands; each on a separate line. + :return: A 2-tuple containing the subprocess' stdout and stderr. + """ + args = ['sfdisk', + '-D', + '-H', str(heads), + '-S', str(sectors)] + if cylinders is not None: + args.extend(['-C', str(cylinders)]) + args.append(device) + proc = cmd_runner.run( + args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=stderr, + as_root=as_root) + return proc.communicate("%s\n" % commands) + + +def create_partitions(board, media, fat_size, heads, sectors, cylinders=None): + """Partition the given media according to the board requirements. + + :param board: A string with the board type (e.g. beagle, panda, etc) + :param media: A setup_partitions.Media object to partition. + :param fat_size: The type of FATs used in the boot partition (16 or 32). + :param heads: Number of heads to use in the disk geometry of + partitions. + :param sectors: Number of sectors to use in the disk geometry of + partitions. + :param cylinders: The number of cylinders to pass to sfdisk's -C argument. + If None the -C argument is not passed. + """ + if media.is_block_device: + # Overwrite any existing partition tables with a fresh one. + proc = cmd_runner.run( + ['parted', '-s', media.path, 'mklabel', 'msdos'], as_root=True) + proc.wait() + + if fat_size == 32: + partition_type = '0x0C' + else: + partition_type = '0x0E' + + if board == 'mx51evk': + # Create a one cylinder partition for fixed-offset bootloader data at + # the beginning of the image (size is one cylinder, so 8224768 bytes + # with the first sector for MBR). + run_sfdisk_commands(',1,0xDA', heads, sectors, cylinders, media.path) + + # Create a VFAT or FAT16 partition of 9 cylinders (74027520 bytes, ~70 + # MiB), followed by a Linux-type partition containing the rest of the free + # space. + sfdisk_cmd = ',9,%s,*\n,,,-' % partition_type + run_sfdisk_commands(sfdisk_cmd, heads, sectors, cylinders, media.path) + + # Sync and sleep to wait for the partition to settle. + cmd_runner.run(['sync']).wait() + # Sleeping just 1 second seems to be enough here, but if we start getting + # errors because the disk is not partitioned then we should revisit this. + # XXX: This sleep can probably die now; need to do more tests before doing + # so, though. + time.sleep(1) + + +class Media(object): + """A representation of the media where Linaro will be installed.""" + + def __init__(self, path): + self.path = path + self.is_block_device = path.startswith('/dev/') diff --git a/linaro_media_create/populate_boot.py b/linaro_media_create/populate_boot.py new file mode 100644 index 0000000..8aef8cc --- /dev/null +++ b/linaro_media_create/populate_boot.py @@ -0,0 +1,167 @@ +import errno +import glob +import os + +from linaro_media_create import cmd_runner + + +def _run_mkimage(img_type, load_addr, entry_point, name, img_data, img, + stdout=None, as_root=True): + cmd = ['mkimage', + '-A', 'arm', + '-O', 'linux', + '-T', img_type, + '-C', 'none', + '-a', load_addr, + '-e', load_addr, + '-n', name, + '-d', img_data, + img] + proc = cmd_runner.run(cmd, as_root=as_root, stdout=stdout) + proc.wait() + return proc.returncode + + +def _get_file_matching(regex): + """Return a file whose path matches the given regex. + + If zero or more than one files match, raise a ValueError. + """ + files = glob.glob(regex) + if len(files) == 1: + return files[0] + elif len(files) == 0: + raise ValueError( + "No files found matching '%s'; can't continue" % regex) + else: + # TODO: Could ask the user to chosse which file to use instead of + # raising an exception. + raise ValueError("Too many files matching '%s' found." % regex) + + +def make_uImage(load_addr, uboot_parts_dir, sub_arch, boot_disk): + img_data = _get_file_matching( + '%s/vmlinuz-*-%s' % (uboot_parts_dir, sub_arch)) + img = '%s/uImage' % boot_disk + return _run_mkimage( + 'kernel', load_addr, load_addr, 'Linux', img_data, img) + + +def make_uInitrd(uboot_parts_dir, sub_arch, boot_disk): + img_data = _get_file_matching( + '%s/initrd.img-*-%s' % (uboot_parts_dir, sub_arch)) + img = '%s/uInitrd' % boot_disk + return _run_mkimage('ramdisk', '0', '0', 'initramfs', img_data, img) + + +def make_boot_script(boot_script_data, tmp_dir, boot_script): + # Need to save the boot script data into a file that will be passed to + # mkimage. + data_file = '%s/boot.cmd' % tmp_dir + with open(data_file, 'w') as fd: + fd.write(boot_script_data) + return _run_mkimage( + 'script', '0', '0', 'boot script', data_file, boot_script) + + +def install_mx51evk_boot_loader(imx_file, boot_device_or_file): + proc = cmd_runner.run([ + "dd", + "if=%s" % imx_file, + "of=%s" % boot_device_or_file, + "bs=1024", + "seek=1", + "conv=notrunc"], as_root=True) + proc.wait() + + +def install_omap_boot_loader(mlo_file, boot_disk): + cmd_runner.run(["cp", "-v", mlo_file, boot_disk], as_root=True).wait() + # XXX: Is this really needed? + cmd_runner.run(["sync"]).wait() + + +def make_boot_ini(boot_script, boot_disk): + proc = cmd_runner.run( + ["cp", "-v", boot_script, "%s/boot.ini" % boot_disk], as_root=True) + proc.wait() + + +def populate_boot(board, board_config, chroot_dir, boot_partition, boot_disk, + boot_device_or_file, tmp_dir, is_live): + + parts_dir = 'boot' + if is_live: + parts_dir = 'casper' + uboot_parts_dir = os.path.join(chroot_dir, parts_dir) + + try: + os.makedirs(boot_disk) + except OSError, exc: + if exc.errno == errno.EEXIST: + pass + else: + raise + cmd_runner.run(['mount', boot_partition, boot_disk], as_root=True).wait() + + uboot_flavor = board_config.get('uboot_flavor') + if uboot_flavor is not None: + uboot_bin = os.path.join( + chroot_dir, 'usr', 'lib', 'u-boot', uboot_flavor, 'u-boot.bin') + cmd_runner.run( + ['cp', '-v', uboot_bin, boot_disk], as_root=True).wait() + + boot_script = "%(boot_disk)s/%(boot_script_name)s" % ( + dict(boot_disk=boot_disk, + boot_script_name=board_config['boot_script'])) + + load_addr = board_config['load_addr'] + sub_arch = board_config['sub_arch'] + boot_cmd = board_config['boot_cmd'] + + # TODO: Once linaro-media-create is fully ported to python, we should + # split this into several board-specific functions that are defined + # somewhere else and just called here. + if board in ["beagle", "panda"]: + xloader_dir = 'x-loader-omap' + if board == "panda": + xloader_dir = 'x-loader-omap4' + mlo_file = os.path.join( + chroot_dir, 'usr', 'lib', xloader_dir, 'MLO') + install_omap_boot_loader(mlo_file, boot_disk) + make_uImage(load_addr, uboot_parts_dir, sub_arch, boot_disk) + make_uInitrd(uboot_parts_dir, sub_arch, boot_disk) + make_boot_script(boot_cmd, tmp_dir, boot_script) + make_boot_ini(boot_script, boot_disk) + + elif board == "igep": + make_uImage(load_addr, uboot_parts_dir, sub_arch, boot_disk) + make_uInitrd(uboot_parts_dir, sub_arch, boot_disk) + make_boot_script(boot_cmd, tmp_dir, boot_script) + make_boot_ini(boot_script, boot_disk) + + elif board == "ux500": + make_uImage(load_addr, uboot_parts_dir, sub_arch, boot_disk) + make_uInitrd(uboot_parts_dir, sub_arch, boot_disk) + make_boot_script(boot_cmd, tmp_dir, boot_script) + + elif board == "vexpress": + make_uImage(load_addr, uboot_parts_dir, sub_arch, boot_disk) + make_uInitrd(uboot_parts_dir, sub_arch, boot_disk) + + elif board == "mx51evk": + install_mx51evk_boot_loader( + "binary/usr/lib/u-boot/mx51evk/u-boot.imx", boot_device_or_file) + make_uImage(load_addr, uboot_parts_dir, sub_arch, boot_disk) + make_uInitrd(uboot_parts_dir, sub_arch, boot_disk) + make_boot_script(boot_cmd, tmp_dir, boot_script) + + else: + raise AssertionError( + "Internal error; missing support for board: %s" % board) + + cmd_runner.run(['sync']).wait() + try: + cmd_runner.run(['umount', boot_disk], as_root=True).wait() + except cmd_runner.SubcommandNonZeroReturnValue: + pass diff --git a/linaro_media_create/remove_binary_dir.py b/linaro_media_create/remove_binary_dir.py new file mode 100644 index 0000000..c2b1438 --- /dev/null +++ b/linaro_media_create/remove_binary_dir.py @@ -0,0 +1,11 @@ +import os.path + +from linaro_media_create import cmd_runner + + +def remove_dir(directory, as_root=True): + if os.path.exists(directory): + proc = cmd_runner.run(['rm', '-rf', directory], as_root=as_root) + proc.wait() + return proc.returncode + return 0 diff --git a/linaro_media_create/rootfs.py b/linaro_media_create/rootfs.py new file mode 100644 index 0000000..3c8a721 --- /dev/null +++ b/linaro_media_create/rootfs.py @@ -0,0 +1,113 @@ +import glob +import os +import tempfile + +from linaro_media_create import cmd_runner + + +def populate_rootfs(content_dir, root_disk, partition, rootfs_type, + rootfs_uuid, should_create_swap, swap_size, + partition_offset): + """Populate the rootfs and make the necessary tweaks to make it usable. + + This consists of: + 1. Create a directory on the path specified by root_disk + 2. Mount the given partition onto the created directory. + 3. Move the contents of content_dir to that directory. + 4. If should_create_swap, then create it with the given size. + 5. Add fstab entries for the / filesystem and swap (if created). + 6. Create a /etc/flash-kernel.conf containing the target's boot device. + 7. Unmount the partition we mounted on step 2. + """ + print "\nPopulating rootfs partition" + print "Be patient, this may take a few minutes\n" + # Create a directory to mount the rootfs partition. + os.makedirs(root_disk) + + cmd_runner.run(['mount', partition, root_disk], as_root=True).wait() + + move_contents(content_dir, root_disk) + + fstab_additions = ["UUID=%s / %s errors=remount-ro 0 1 " % ( + rootfs_uuid, rootfs_type)] + if should_create_swap: + print "\nCreating SWAP File\n" + if has_space_left_for_swap(root_disk, swap_size): + proc = cmd_runner.run([ + 'dd', + 'if=/dev/zero', + 'of=%s/SWAP.swap' % root_disk, + 'bs=1M', + 'count=%s' % swap_size], as_root=True) + proc.wait() + proc = cmd_runner.run( + ['mkswap', '%s/SWAP.swap' % root_disk], as_root=True) + proc.wait() + fstab_additions.append("/SWAP.swap none swap sw 0 0") + else: + print ("Swap file is bigger than space left on partition; " + "continuing without swap.") + + append_to_fstab(root_disk, fstab_additions) + + print "\nCreating /etc/flash-kernel.conf\n" + create_flash_kernel_config(root_disk, 1 + partition_offset) + + cmd_runner.run(['sync']).wait() + # The old code used to ignore failures here, but I don't think that's + # desirable so I'm using cmd_runner.run()'s standard behaviour, which will + # fail on a non-zero return value. + cmd_runner.run(['umount', root_disk], as_root=True).wait() + + +def create_flash_kernel_config(root_disk, boot_partition_number): + """Create a flash-kernel.conf file under root_disk/etc. + + Uses the given partition number to figure out the boot partition. + """ + target_boot_dev = '/dev/mmcblk0p%s' % boot_partition_number + flash_kernel = os.path.join(root_disk, 'etc', 'flash-kernel.conf') + write_data_to_protected_file( + flash_kernel, "UBOOT_PART=%s" % target_boot_dev) + + +def move_contents(from_, root_disk): + """Move everything under from_ to the given root disk. + + Uses sudo for moving. + """ + assert os.path.isdir(from_), "%s is not a directory" % from_ + files = glob.glob(os.path.join(from_, '*')) + mv_cmd = ['mv'] + mv_cmd.extend(files) + mv_cmd.append(root_disk) + cmd_runner.run(mv_cmd, as_root=True).wait() + + +def has_space_left_for_swap(root_disk, swap_size_in_mega_bytes): + """Is there enough space for a swap file in the given root disk?""" + statvfs = os.statvfs(root_disk) + free_space = statvfs.f_bavail * statvfs.f_bsize + swap_size_in_bytes = int(swap_size_in_mega_bytes) * 1024**2 + if free_space >= swap_size_in_bytes: + return True + return False + + +def append_to_fstab(root_disk, fstab_additions): + fstab = os.path.join(root_disk, 'etc', 'fstab') + data = open(fstab).read() + '\n' + '\n'.join(fstab_additions) + write_data_to_protected_file(fstab, data) + + +def write_data_to_protected_file(path, data): + """Write data to the file on the given path. + + This is meant to be used when the given file is only writable by root, and + we overcome that by writing the data to a tempfile and then moving the + tempfile on top of the given one using sudo. + """ + _, tmpfile = tempfile.mkstemp() + with open(tmpfile, 'w') as fd: + fd.write(data) + cmd_runner.run(['mv', '-f', tmpfile, path], as_root=True).wait() diff --git a/linaro_media_create/tests/__init__.py b/linaro_media_create/tests/__init__.py new file mode 100644 index 0000000..49b2b6d --- /dev/null +++ b/linaro_media_create/tests/__init__.py @@ -0,0 +1,8 @@ +import unittest + +def test_suite(): + module_names = ['linaro_media_create.tests.test_media_create', + ] + loader = unittest.TestLoader() + suite = loader.loadTestsFromNames(module_names) + return suite diff --git a/linaro_media_create/tests/fixtures.py b/linaro_media_create/tests/fixtures.py new file mode 100755 index 0000000..261917f --- /dev/null +++ b/linaro_media_create/tests/fixtures.py @@ -0,0 +1,123 @@ +import os +import shutil +import subprocess +import tempfile + +from linaro_media_create import partitions +from linaro_media_create import cmd_runner + + +class CreateTempDirFixture(object): + + def __init__(self): + self.tempdir = None + + def setUp(self): + self.tempdir = tempfile.mkdtemp() + + def tearDown(self): + if os.path.exists(self.tempdir): + shutil.rmtree(self.tempdir) + + def get_temp_dir(self): + return self.tempdir + + +class CreateTarballFixture(object): + + def __init__(self, dir): + self.dir = dir + self.tarball = os.path.join(self.dir, 'tarball.tar.gz') + + def setUp(self): + # Create gzipped tar archive. + args = ['tar', '-czf', self.tarball, self.dir] + proc = subprocess.Popen(args) + proc.wait() + + def tearDown(self): + if os.path.exists(self.tarball): + os.remove(self.tarball) + + def get_tarball(self): + return self.tarball + + +class MockSomethingFixture(object): + """A fixture which mocks something on the given object. + + Replaces attr_name on obj with the given mock, undoing that upon + tearDown(). + """ + + def __init__(self, obj, attr_name, mock): + self.obj = obj + self.attr_name = attr_name + self.mock = mock + self.orig_attr = getattr(obj, attr_name) + + def setUp(self): + setattr(self.obj, self.attr_name, self.mock) + + def tearDown(self): + setattr(self.obj, self.attr_name, self.orig_attr) + + +class MockCmdRunnerPopen(object): + """A mock for cmd_runner.Popen() which stores the args given to it.""" + calls = None + def __call__(self, cmd, *args, **kwargs): + if self.calls is None: + self.calls = [] + if isinstance(cmd, basestring): + all_args = [cmd] + else: + all_args = cmd + all_args.extend(args) + self.calls.append(all_args) + self.returncode = 0 + return self + + def communicate(self, input=None): + self.wait() + return '', '' + + def wait(self): + return self.returncode + + +class MockCmdRunnerPopenFixture(MockSomethingFixture): + """A test fixture which mocks cmd_runner.do_run with the given mock. + + If no mock is given, a MockCmdRunnerPopen instance is used. + """ + + def __init__(self, mock=None): + if mock is None: + mock = MockCmdRunnerPopen() + super(MockCmdRunnerPopenFixture, self).__init__( + cmd_runner, 'Popen', mock) + + +class MockCallableWithPositionalArgs(object): + """A callable mock which just stores the positional args given to it. + + Every time an instance of this is "called", it will append a tuple + containing the positional arguments given to it to self.calls. + """ + calls = None + return_value = None + def __call__(self, *args): + if self.calls is None: + self.calls = [] + self.calls.append(args) + return self.return_value + + +class MockRunSfdiskCommandsFixture(MockSomethingFixture): + + def __init__(self): + mock = MockCallableWithPositionalArgs() + mock.return_value = ('', '') + super(MockRunSfdiskCommandsFixture, self).__init__( + partitions, 'run_sfdisk_commands', mock) diff --git a/linaro_media_create/tests/test_media_create.py b/linaro_media_create/tests/test_media_create.py new file mode 100644 index 0000000..f02f80b --- /dev/null +++ b/linaro_media_create/tests/test_media_create.py @@ -0,0 +1,1047 @@ +from contextlib import contextmanager +import atexit +import glob +import os +import random +import string +import subprocess +import sys +import time + +from testtools import TestCase +from testtools.matchers import Mismatch + +from hwpack.testing import TestCaseWithFixtures + +from linaro_media_create import ( + check_device, + cmd_runner, + ensure_command, + get_board_config, + populate_boot, + partitions, + rootfs, + ROOTFS_UUID, + ) +from linaro_media_create.hwpack import ( + copy_file, + install_hwpack, + install_hwpacks, + mount_chroot_proc, + run_local_atexit_funcs, + temporarily_overwrite_file_on_dir, + ) +from linaro_media_create.partitions import ( + calculate_partition_size_and_offset, + convert_size_to_bytes, + create_partitions, + ensure_partition_is_not_mounted, + get_boot_and_root_loopback_devices, + get_boot_and_root_partitions_for_media, + Media, + run_sfdisk_commands, + setup_partitions, + ) +from linaro_media_create.populate_boot import ( + make_boot_script, + make_uImage, + make_uInitrd, + _get_file_matching, + _run_mkimage, + ) +from linaro_media_create.remove_binary_dir import remove_dir +from linaro_media_create.rootfs import ( + create_flash_kernel_config, + has_space_left_for_swap, + move_contents, + populate_rootfs, + write_data_to_protected_file, + ) +from linaro_media_create.unpack_binary_tarball import unpack_binary_tarball + +from linaro_media_create.tests.fixtures import ( + CreateTempDirFixture, + CreateTarballFixture, + MockCmdRunnerPopenFixture, + MockSomethingFixture, + MockRunSfdiskCommandsFixture, + ) + + +class TestEnsureCommand(TestCase): + + apt_get_called = False + + def test_command_already_present(self): + with self.mock_apt_get_install(): + ensure_command.ensure_command('apt-get', 'apt') + self.assertFalse(self.apt_get_called) + + def test_command_not_present(self): + with self.mock_apt_get_install(): + ensure_command.ensure_command('apt-get-two-o', 'apt-2') + self.assertTrue(self.apt_get_called) + + @contextmanager + def mock_apt_get_install(self): + def mock_apt_get_install(cmd, pkg): + self.apt_get_called = True + orig_func = ensure_command.apt_get_install + ensure_command.apt_get_install = mock_apt_get_install + yield + ensure_command.apt_get_install = orig_func + + +class IsEqualToDict(object): + """A testtools matcher to compare dicts. + + When there are differences, only the differing keys/values are shown. + """ + + def __init__(self, expected): + self.expected = expected + + def match(self, actual): + actual_keys = set(actual.keys()) + expected_keys = set(self.expected.keys()) + instersection = actual_keys.intersection(expected_keys) + expected_only_keys = expected_keys.difference(actual_keys) + actual_only_keys = actual_keys.difference(expected_keys) + keys_with_differing_values = [] + for key in instersection: + if actual[key] != self.expected[key]: + keys_with_differing_values.append(key) + + if (len(expected_only_keys) == 0 and len(actual_only_keys) == 0 + and len(keys_with_differing_values) == 0): + return None + + expected_diffs = [] + for key in keys_with_differing_values + list(expected_only_keys): + expected_diffs.append("%s: %r" % (key, self.expected[key])) + expected_diffs = "\n".join(expected_diffs) + + actual_diffs = [] + for key in keys_with_differing_values + list(actual_only_keys): + actual_diffs.append("%s: %r" % (key, actual[key])) + actual_diffs = "\n".join(actual_diffs) + + mismatch_string = "\na = %s\n" % expected_diffs + mismatch_string += "=" * 60 + "\n" + mismatch_string += "b = %s" % actual_diffs + return IsEqualToDictMismatch(self.expected, mismatch_string, actual) + + +class IsEqualToDictMismatch(Mismatch): + + def __init__(self, expected, mismatch_string, other): + self.expected = expected + self._mismatch_string = mismatch_string + self.other = other + + def describe(self): + return self._mismatch_string + + +class TestGetBoardConfig(TestCase): + + expected_beagle_config = { + 'boot_cmd': ( + "setenv bootcmd 'fatload mmc 0:1 0x80000000 uImage; " + "fatload mmc 0:1 0x81600000 uInitrd; bootm 0x80000000 " + "0x81600000'\nsetenv bootargs ' console=tty0 " + "console=ttyS2,115200n8 root=UUID=%s rootwait ro earlyprintk " + "fixrtc nocompcache vram=12M omapfb.debug=y " + "omapfb.mode=dvi:1280x720MR-16@60'\nboot" % ROOTFS_UUID), + 'boot_args_options': ( + 'rootwait ro earlyprintk fixrtc nocompcache vram=12M ' + 'omapfb.debug=y omapfb.mode=dvi:1280x720MR-16@60'), + 'boot_script': 'boot.scr', + 'fat_size': 32, + 'initrd_addr': '0x81600000', + 'kernel_addr': '0x80000000', + 'load_addr': '0x80008000', + 'mmc_option': '0:1', + 'mmc_part_offset': 0, + 'serial_opts': ' console=tty0 console=ttyS2,115200n8', + 'sub_arch': 'linaro-omap', + 'uboot_flavor': 'omap3_beagle'} + + expected_panda_config = { + 'boot_cmd': ( + "setenv bootcmd 'fatload mmc 0:1 0x80200000 uImage; fatload mmc " + "0:1 0x81600000 uInitrd; bootm 0x80200000 0x81600000'\nsetenv " + "bootargs ' console = tty0 console = ttyO2,115200n8 " + "root=UUID=%s rootwait ro earlyprintk fixrtc nocompcache " + "vram = 32M omapfb.debug = y omapfb.vram = 0:8M mem = 463M " + "ip = none'\nboot" % ROOTFS_UUID), + 'boot_args_options': ( + 'rootwait ro earlyprintk fixrtc nocompcache vram = 32M ' + 'omapfb.debug = y omapfb.vram = 0:8M mem = 463M ip = none'), + 'boot_script': 'boot.scr', + 'fat_size': 32, + 'initrd_addr': '0x81600000', + 'kernel_addr': '0x80200000', + 'load_addr': '0x80008000', + 'mmc_option': '0:1', + 'mmc_part_offset': 0, + 'serial_opts': ' console = tty0 console = ttyO2,115200n8', + 'sub_arch': 'omap4', + 'uboot_flavor': 'omap4_panda'} + + expected_ux500_config = { + 'boot_args_options': ( + 'rootwait ro earlyprintk rootdelay = 1 fixrtc nocompcache ' + 'mem = 96M@0 mem_modem = 32M@96M mem = 44M@128M pmem = 22M@172M ' + 'mem = 30M@194M mem_mali = 32M@224M pmem_hwb = 54M@256M ' + 'hwmem = 48M@302M mem = 152M@360M'), + 'boot_cmd': ( + "setenv bootcmd 'fatload mmc 1:1 0x00100000 uImage; fatload mmc " + "1:1 0x08000000 uInitrd; bootm 0x00100000 0x08000000'\nsetenv " + "bootargs ' console = tty0 console = ttyAMA2,115200n8 " + "root=UUID=%s rootwait ro earlyprintk rootdelay = 1 fixrtc " + "nocompcache mem = 96M@0 mem_modem = 32M@96M mem = 44M@128M " + "pmem = 22M@172M mem = 30M@194M mem_mali = 32M@224M " + "pmem_hwb = 54M@256M hwmem = 48M@302M mem = 152M@360M'\nboot" + % ROOTFS_UUID), + 'boot_script': 'flash.scr', + 'fat_size': 32, + 'initrd_addr': '0x08000000', + 'kernel_addr': '0x00100000', + 'load_addr': '0x00008000', + 'mmc_option': '1:1', + 'mmc_part_offset': 0, + 'serial_opts': ' console = tty0 console = ttyAMA2,115200n8', + 'sub_arch': 'ux500', + 'uboot_flavor': None} + + expected_vexpress_config = { + 'boot_args_options': 'rootwait ro', + 'boot_cmd': ( + "setenv bootcmd 'fatload mmc 0:1 0x60008000 uImage; fatload mmc " + "0:1 0x81000000 uInitrd; bootm 0x60008000 0x81000000'\nsetenv " + "bootargs ' console = tty0 console = ttyAMA0,38400n8 " + "root=UUID=%s rootwait ro'\nboot" % ROOTFS_UUID), + 'boot_script': None, + 'fat_size': 16, + 'initrd_addr': '0x81000000', + 'kernel_addr': '0x60008000', + 'load_addr': '0x60008000', + 'mmc_option': '0:1', + 'mmc_part_offset': 0, + 'serial_opts': ' console = tty0 console = ttyAMA0,38400n8', + 'sub_arch': 'linaro-vexpress', + 'uboot_flavor': 'ca9x4_ct_vxp'} + + expected_mx51evk_config = { + 'boot_args_options': 'rootwait ro', + 'boot_cmd': ( + "setenv bootcmd 'fatload mmc 0:2 0x90000000 uImage; fatload mmc " + "0:2 0x90800000 uInitrd; bootm 0x90000000 0x90800000'\nsetenv " + "bootargs ' console = tty0 console = ttymxc0,115200n8 " + "root=UUID=%s rootwait ro'\nboot" % ROOTFS_UUID), + 'boot_script': 'boot.scr', + 'fat_size': 32, + 'initrd_addr': '0x90800000', + 'kernel_addr': '0x90000000', + 'load_addr': '0x90008000', + 'mmc_option': '0:2', + 'mmc_part_offset': 1, + 'serial_opts': ' console = tty0 console = ttymxc0,115200n8', + 'sub_arch': 'linaro-mx51', + 'uboot_flavor': None} + + def test_unknown_board(self): + self.assertRaises( + ValueError, get_board_config, 'foobar', is_live=True, + is_lowmem=False, consoles=None) + + def test_vexpress_live(self): + config = get_board_config( + 'vexpress', is_live=True, is_lowmem=False, consoles=None) + expected = self.expected_vexpress_config.copy() + expected['boot_cmd'] = ( + "setenv bootcmd 'fatload mmc 0:1 0x60008000 uImage; fatload mmc " + "0:1 0x81000000 uInitrd; bootm 0x60008000 0x81000000'\nsetenv " + "bootargs ' console = tty0 console = ttyAMA0,38400n8 " + "serialtty = ttyAMA0 boot=casper rootwait ro'\nboot") + expected['serial_opts'] = ( + ' console = tty0 console = ttyAMA0,38400n8 serialtty = ttyAMA0') + self.assertThat(expected, IsEqualToDict(config)) + + def test_vexpress(self): + config = get_board_config( + 'vexpress', is_live=False, is_lowmem=False, consoles=None) + self.assertThat(self.expected_vexpress_config, IsEqualToDict(config)) + + def test_mx51evk_live(self): + config = get_board_config( + 'mx51evk', is_live=True, is_lowmem=False, consoles=None) + expected = self.expected_mx51evk_config.copy() + expected['boot_cmd'] = ( + "setenv bootcmd 'fatload mmc 0:2 0x90000000 uImage; " + "fatload mmc 0:2 0x90800000 uInitrd; bootm 0x90000000 " + "0x90800000'\nsetenv bootargs ' console = tty0 " + "console = ttymxc0,115200n8 serialtty = ttymxc0 boot=casper " + "rootwait ro'\nboot") + expected['serial_opts'] = ( + ' console = tty0 console = ttymxc0,115200n8 serialtty = ttymxc0') + self.assertThat(expected, IsEqualToDict(config)) + + def test_mx51evk(self): + config = get_board_config( + 'mx51evk', is_live=False, is_lowmem=False, consoles=None) + self.assertThat(self.expected_mx51evk_config, IsEqualToDict(config)) + + def test_ux500_live(self): + config = get_board_config( + 'ux500', is_live=True, is_lowmem=False, consoles=None) + boot_cmd = ( + "setenv bootcmd 'fatload mmc 1:1 0x00100000 uImage; fatload " + "mmc 1:1 0x08000000 uInitrd; bootm 0x00100000 0x08000000'\n" + "setenv bootargs ' console = tty0 console = ttyAMA2,115200n8 " + "serialtty = ttyAMA2 boot=casper rootwait ro earlyprintk " + "rootdelay = 1 fixrtc nocompcache mem = 96M@0 " + "mem_modem = 32M@96M mem = 44M@128M pmem = 22M@172M " + "mem = 30M@194M mem_mali = 32M@224M pmem_hwb = 54M@256M " + "hwmem = 48M@302M mem = 152M@360M'\nboot") + expected = self.expected_ux500_config.copy() + expected['boot_cmd'] = boot_cmd + expected['serial_opts'] = ( + ' console = tty0 console = ttyAMA2,115200n8 serialtty = ttyAMA2') + self.assertThat(expected, IsEqualToDict(config)) + + def test_ux500(self): + config = get_board_config( + 'ux500', is_live=False, is_lowmem=False, consoles=None) + self.assertThat(self.expected_ux500_config, IsEqualToDict(config)) + + def test_panda(self): + config = get_board_config( + 'panda', is_live=False, is_lowmem=False, consoles=None) + self.assertThat(self.expected_panda_config, IsEqualToDict(config)) + + def test_panda_live(self): + config = get_board_config( + 'panda', is_live=True, is_lowmem=False, consoles=None) + boot_cmd = ( + "setenv bootcmd 'fatload mmc 0:1 0x80200000 uImage; " + "fatload mmc 0:1 0x81600000 uInitrd; bootm 0x80200000 " + "0x81600000'\nsetenv bootargs ' console = tty0 " + "console = ttyO2,115200n8 serialtty = ttyO2 boot=casper " + "rootwait ro earlyprintk fixrtc nocompcache vram = 32M " + "omapfb.debug = y omapfb.vram = 0:8M mem = 463M ip = none'\nboot") + expected = self.expected_panda_config.copy() + expected['boot_cmd'] = boot_cmd + expected['serial_opts'] = ( + ' console = tty0 console = ttyO2,115200n8 serialtty = ttyO2') + self.assertThat(expected, IsEqualToDict(config)) + + def test_beagle(self): + config = get_board_config( + 'beagle', is_live=False, is_lowmem=False, consoles=None) + self.assertThat(self.expected_beagle_config, IsEqualToDict(config)) + + def test_beagle_live(self): + config = get_board_config( + 'beagle', is_live=True, is_lowmem=False, consoles=None) + boot_cmd = ( + "setenv bootcmd 'fatload mmc 0:1 0x80000000 uImage; " + "fatload mmc 0:1 0x81600000 uInitrd; bootm 0x80000000 " + "0x81600000'\nsetenv bootargs ' console=tty0 " + "console=ttyS2,115200n8 serialtty=ttyS2 boot=casper rootwait ro " + "earlyprintk fixrtc nocompcache vram=12M omapfb.debug=y " + "omapfb.mode=dvi:1280x720MR-16@60'\nboot") + expected = self.expected_beagle_config.copy() + expected['boot_cmd'] = boot_cmd + expected['serial_opts'] = ( + ' console=tty0 console=ttyS2,115200n8 serialtty=ttyS2') + self.assertThat(expected, IsEqualToDict(config)) + + +class TestRemoveBinaryDir(TestCaseWithFixtures): + + def setUp(self): + super(TestRemoveBinaryDir, self).setUp() + self.temp_dir_fixture = CreateTempDirFixture() + self.useFixture(self.temp_dir_fixture) + + def test_remove_dir(self): + rc = remove_dir( + self.temp_dir_fixture.get_temp_dir(), as_root=False) + self.assertEqual(rc, 0) + self.assertFalse( + os.path.exists(self.temp_dir_fixture.get_temp_dir())) + + +class TestUnpackBinaryTarball(TestCaseWithFixtures): + + def setUp(self): + super(TestUnpackBinaryTarball, self).setUp() + + self.tar_dir_fixture = CreateTempDirFixture() + self.useFixture(self.tar_dir_fixture) + + self.tarball_fixture = CreateTarballFixture( + self.tar_dir_fixture.get_temp_dir()) + self.useFixture(self.tarball_fixture) + + def test_unpack_binary_tarball(self): + tmp_dir = self.useFixture(CreateTempDirFixture()).get_temp_dir() + rc = unpack_binary_tarball( + self.tarball_fixture.get_tarball(), tmp_dir, as_root=False) + self.assertEqual(rc, 0) + + +class TestCmdRunner(TestCaseWithFixtures): + + def test_run(self): + fixture = MockCmdRunnerPopenFixture() + self.useFixture(fixture) + proc = cmd_runner.run(['foo', 'bar', 'baz']) + self.assertEqual(0, proc.returncode) + self.assertEqual([['foo', 'bar', 'baz']], fixture.mock.calls) + + def test_run_as_root(self): + fixture = MockCmdRunnerPopenFixture() + self.useFixture(fixture) + cmd_runner.run(['foo', 'bar'], as_root=True) + self.assertEqual([['sudo', 'foo', 'bar']], fixture.mock.calls) + + def test_run_succeeds_on_zero_return_code(self): + proc = cmd_runner.run(['true']) + # Need to wait() here as we're using the real Popen. + proc.wait() + self.assertEqual(0, proc.returncode) + + def test_run_raises_exception_on_non_zero_return_code(self): + def run_and_wait(): + proc = cmd_runner.run(['false']) + proc.wait() + self.assertRaises( + cmd_runner.SubcommandNonZeroReturnValue, run_and_wait) + + def test_run_must_be_given_list_as_args(self): + self.assertRaises(AssertionError, cmd_runner.run, 'true') + + def test_Popen(self): + proc = cmd_runner.Popen('true') + returncode = proc.wait() + self.assertEqual(0, returncode) + + +class TestPopulateBoot(TestCaseWithFixtures): + + def _mock_get_file_matching(self): + self.useFixture(MockSomethingFixture( + populate_boot, '_get_file_matching', + lambda regex: regex)) + + def _mock_Popen(self): + fixture = MockCmdRunnerPopenFixture() + self.useFixture(fixture) + return fixture + + def test_make_uImage(self): + self._mock_get_file_matching() + fixture = self._mock_Popen() + make_uImage('load_addr', 'parts_dir', 'sub_arch', 'boot_disk') + expected = [ + 'sudo', 'mkimage', '-A', 'arm', '-O', 'linux', '-T', 'kernel', + '-C', 'none', '-a', 'load_addr', '-e', 'load_addr', '-n', 'Linux', + '-d', 'parts_dir/vmlinuz-*-sub_arch', 'boot_disk/uImage'] + self.assertEqual([expected], fixture.mock.calls) + + def test_make_uInitrd(self): + self._mock_get_file_matching() + fixture = self._mock_Popen() + make_uInitrd('parts_dir', 'sub_arch', 'boot_disk') + expected = [ + 'sudo', 'mkimage', '-A', 'arm', '-O', 'linux', '-T', 'ramdisk', + '-C', 'none', '-a', '0', '-e', '0', '-n', 'initramfs', + '-d', 'parts_dir/initrd.img-*-sub_arch', 'boot_disk/uInitrd'] + self.assertEqual([expected], fixture.mock.calls) + + def test_make_boot_script(self): + self._mock_get_file_matching() + fixture = self._mock_Popen() + tempdir = self.useFixture(CreateTempDirFixture()).tempdir + make_boot_script('boot script data', tempdir, 'boot_script') + expected = [ + 'sudo', 'mkimage', '-A', 'arm', '-O', 'linux', '-T', 'script', + '-C', 'none', '-a', '0', '-e', '0', '-n', 'boot script', + '-d', '%s/boot.cmd' % tempdir, 'boot_script'] + self.assertEqual([expected], fixture.mock.calls) + + def test_get_file_matching(self): + prefix = ''.join( + random.choice(string.ascii_lowercase) for x in range(5)) + file1 = self.createTempFileAsFixture(prefix) + directory = os.path.dirname(file1) + self.assertEqual( + file1, _get_file_matching('%s/%s*' % (directory, prefix))) + + def test_get_file_matching_too_many_files_found(self): + prefix = ''.join( + random.choice(string.ascii_lowercase) for x in range(5)) + file1 = self.createTempFileAsFixture(prefix) + file2 = self.createTempFileAsFixture(prefix) + directory = os.path.dirname(file1) + self.assertRaises( + ValueError, _get_file_matching, '%s/%s*' % (directory, prefix)) + + def test_get_file_matching_no_files_found(self): + self.assertRaises( + ValueError, _get_file_matching, '/foo/bar/baz/*non-existent') + + def test_run_mkimage(self): + # Create a fake boot script. + filename = self.createTempFileAsFixture() + f = open(filename, 'w') + f.write("setenv bootcmd 'fatload mmc 0:1 0x80000000 uImage;\nboot") + f.close() + + img = self.createTempFileAsFixture() + # Use that fake boot script to create a boot loader using mkimage. + # Send stdout to /dev/null as mkimage will print to stdout and we + # don't want that. + retval = _run_mkimage( + 'script', '0', '0', 'boot script', filename, img, + stdout=open('/dev/null', 'w'), as_root=False) + + self.assertEqual(0, retval) + + +class TestCreatePartitions(TestCaseWithFixtures): + + media = Media('/dev/xdz') + + def setUp(self): + super(TestCreatePartitions, self).setUp() + # Stub time.sleep() as create_partitions() use that. + self.orig_sleep = time.sleep + time.sleep = lambda s: None + + def tearDown(self): + super(TestCreatePartitions, self).tearDown() + time.sleep = self.orig_sleep + + def test_create_partitions_for_mx51evk(self): + # For this board we create a one cylinder partition at the beginning. + popen_fixture = self.useFixture(MockCmdRunnerPopenFixture()) + sfdisk_fixture = self.useFixture(MockRunSfdiskCommandsFixture()) + + create_partitions('mx51evk', self.media, 32, 255, 63, '') + + self.assertEqual( + [['sudo', 'parted', '-s', self.media.path, 'mklabel', 'msdos'], + ['sync']], + popen_fixture.mock.calls) + self.assertEqual( + [(',1,0xDA', 255, 63, '', self.media.path), + (',9,0x0C,*\n,,,-', 255, 63, '', self.media.path)], + sfdisk_fixture.mock.calls) + + def test_create_partitions_for_beagle(self): + popen_fixture = self.useFixture(MockCmdRunnerPopenFixture()) + sfdisk_fixture = self.useFixture(MockRunSfdiskCommandsFixture()) + + create_partitions('beagle', self.media, 32, 255, 63, '') + + self.assertEqual( + [['sudo', 'parted', '-s', self.media.path, 'mklabel', 'msdos'], + ['sync']], + popen_fixture.mock.calls) + self.assertEqual( + [(',9,0x0C,*\n,,,-', 255, 63, '', self.media.path)], + sfdisk_fixture.mock.calls) + + def test_create_partitions_with_img_file(self): + popen_fixture = self.useFixture(MockCmdRunnerPopenFixture()) + sfdisk_fixture = self.useFixture(MockRunSfdiskCommandsFixture()) + + tempfile = self.createTempFileAsFixture() + create_partitions('beagle', Media(tempfile), 32, 255, 63, '') + + # Unlike the test for partitioning of a regular block device, in this + # case parted was not called as there's no existing partition table + # for us to overwrite on the image file. + self.assertEqual([['sync']], popen_fixture.mock.calls) + + self.assertEqual( + [(',9,0x0C,*\n,,,-', 255, 63, '', tempfile)], + sfdisk_fixture.mock.calls) + + def test_run_sfdisk_commands(self): + tempfile = self.createTempFileAsFixture() + proc = cmd_runner.run( + ['qemu-img', 'create', '-f', 'raw', tempfile, '10M'], + stdout=subprocess.PIPE) + proc.communicate() + stdout, stderr = run_sfdisk_commands( + ',1,0xDA', 5, 63, '', tempfile, as_root=False, + stderr=subprocess.PIPE) + self.assertIn('Successfully wrote the new partition table', stdout) + + def test_run_sfdisk_commands_raises_on_non_zero_returncode(self): + tempfile = self.createTempFileAsFixture() + self.assertRaises( + cmd_runner.SubcommandNonZeroReturnValue, + run_sfdisk_commands, + ',1,0xDA', 5, 63, '', tempfile, as_root=False, + stderr=subprocess.PIPE) + + +class TestPartitionSetup(TestCaseWithFixtures): + + def setUp(self): + super(TestPartitionSetup, self).setUp() + # Stub time.sleep() as create_partitions() use that. + self.orig_sleep = time.sleep + time.sleep = lambda s: None + + def tearDown(self): + super(TestPartitionSetup, self).tearDown() + time.sleep = self.orig_sleep + + def test_convert_size_in_kbytes_to_bytes(self): + self.assertEqual(512 * 1024, convert_size_to_bytes('512K')) + + def test_convert_size_in_mbytes_to_bytes(self): + self.assertEqual(100 * 1024**2, convert_size_to_bytes('100M')) + + def test_convert_size_in_gbytes_to_bytes(self): + self.assertEqual(12 * 1024**3, convert_size_to_bytes('12G')) + + def test_convert_size_in_kbytes_to_bytes_rounds_to_256k_multiple(self): + # See comment in convert_size_to_bytes as to why we need to do this. + self.assertEqual( + 3891 * (1024 * 256), convert_size_to_bytes('1000537K')) + + def test_calculate_partition_size_and_offset(self): + tempfile = self._create_qemu_img_with_partitions(',1,0x0C,*\n,,,-') + vfat_size, vfat_offset, linux_size, linux_offset = ( + calculate_partition_size_and_offset(tempfile)) + self.assertEqual( + [129024L, 32256L, 10321920L, 161280L], + [vfat_size, vfat_offset, linux_size, linux_offset]) + + def test_get_boot_and_root_partitions_for_media_with_2_partitions(self): + self.useFixture(MockSomethingFixture( + partitions, '_get_partition_count', lambda media: 2)) + tempfile = self._create_qemu_img_with_partitions(',1,0x0C,*\n,,,-') + media = Media(tempfile) + # Pretend the image file is a block device, or else + # get_boot_and_root_partitions_for_media will choke. + media.is_block_device = True + self.assertEqual( + ("%s%d" % (tempfile, 1), "%s%d" % (tempfile, 2)), + get_boot_and_root_partitions_for_media(media)) + + def test_get_boot_and_root_partitions_for_media_with_3_partitions(self): + self.useFixture(MockSomethingFixture( + partitions, '_get_partition_count', lambda media: 3)) + tempfile = self._create_qemu_img_with_partitions( + ',1,0xDA\n,1,0x0C,*\n,,,-') + media = Media(tempfile) + # Pretend the image file is a block device, or else + # get_boot_and_root_partitions_for_media will choke. + media.is_block_device = True + self.assertEqual( + ("%s%d" % (tempfile, 2), "%s%d" % (tempfile, 3)), + get_boot_and_root_partitions_for_media(media)) + + def _create_qemu_img_with_partitions(self, sfdisk_commands): + tempfile = self.createTempFileAsFixture() + proc = cmd_runner.run( + ['qemu-img', 'create', '-f', 'raw', tempfile, '10M'], + stdout=subprocess.PIPE) + proc.communicate() + stdout, stderr = run_sfdisk_commands( + sfdisk_commands, 5, 63, '', tempfile, as_root=False, + # Throw away stderr as sfdisk complains a lot when operating on a + # qemu image. + stderr=subprocess.PIPE) + self.assertIn('Successfully wrote the new partition table', stdout) + return tempfile + + def test_ensure_partition_is_not_mounted_for_mounted_partition(self): + self.useFixture(MockSomethingFixture( + partitions, 'is_partition_mounted', lambda part: True)) + popen_fixture = self.useFixture(MockCmdRunnerPopenFixture()) + ensure_partition_is_not_mounted('/dev/whatever') + self.assertEqual( + [['sudo', 'umount', '/dev/whatever']], popen_fixture.mock.calls) + + def test_ensure_partition_is_not_mounted_for_umounted_partition(self): + self.useFixture(MockSomethingFixture( + partitions, 'is_partition_mounted', lambda part: False)) + popen_fixture = self.useFixture(MockCmdRunnerPopenFixture()) + ensure_partition_is_not_mounted('/dev/whatever') + self.assertEqual(None, popen_fixture.mock.calls) + + def test_get_boot_and_root_loopback_devices(self): + tempfile = self._create_qemu_img_with_partitions(',1,0x0C,*\n,,,-') + atexit_fixture = self.useFixture(MockSomethingFixture( + atexit, 'register', AtExitRegister())) + popen_fixture = self.useFixture(MockCmdRunnerPopenFixture()) + # We can't test the return value of get_boot_and_root_loopback_devices + # because it'd require running losetup as root, so we just make sure + # it calls losetup correctly. + get_boot_and_root_loopback_devices(tempfile) + self.assertEqual( + [['sudo', 'losetup', '-f', '--show', tempfile, '--offset', + '32256', '--sizelimit', '129024'], + ['sudo', 'losetup', '-f', '--show', tempfile, '--offset', + '161280', '--sizelimit', '10321920']], + popen_fixture.mock.calls) + + # get_boot_and_root_loopback_devices will also setup two exit handlers + # to de-register the loopback devices set up above. + self.assertEqual(2, len(atexit_fixture.mock.funcs)) + popen_fixture.mock.calls = [] + atexit_fixture.mock.run_funcs() + # We did not really run losetup above (as it requires root) so here we + # don't have a device to pass to 'losetup -d', but when a device is + # setup it is passed to the atexit handler. + self.assertEquals( + [['sudo', 'losetup', '-d', ''], ['sudo', 'losetup', '-d', '']], + popen_fixture.mock.calls) + + def test_setup_partitions_for_image_file(self): + # In practice we could pass an empty image file to setup_partitions, + # but here we mock Popen() and thanks to that the image is not setup + # (via qemu-img) inside setup_partitions. That's why we pass an + # already setup image file. + tempfile = self._create_qemu_img_with_partitions(',1,0x0C,*\n,,,-') + popen_fixture = self.useFixture(MockCmdRunnerPopenFixture()) + self.useFixture(MockSomethingFixture( + sys, 'stdout', open('/dev/null', 'w'))) + self.useFixture(MockSomethingFixture( + partitions, 'is_partition_mounted', lambda part: False)) + self.useFixture(MockSomethingFixture( + partitions, 'get_boot_and_root_loopback_devices', + lambda image: ('/dev/loop99', '/dev/loop98'))) + uuid = '2e82008e-1af3-4699-8521-3bf5bac1e67a' + bootfs, rootfs = setup_partitions( + 'beagle', Media(tempfile), 32, '2G', 'boot', 'root', 'ext3', + uuid, True, True, True) + self.assertEqual( + # This is the call that would create the image file. + [['qemu-img', 'create', '-f', 'raw', tempfile, '2G'], + # This call would partition the image file. + ['sudo', 'sfdisk', '-D', '-H', '255', '-S', '63', '-C', '261', + tempfile], + # Make sure changes are written to disk. + ['sync'], + ['sudo', 'mkfs.vfat', '-F', '32', bootfs, '-n', 'boot'], + ['sudo', 'mkfs.ext3', '-U', uuid, rootfs, '-L', 'root']], + popen_fixture.mock.calls) + + def test_setup_partitions_for_block_device(self): + self.useFixture(MockSomethingFixture( + sys, 'stdout', open('/dev/null', 'w'))) + self.useFixture(MockSomethingFixture( + partitions, '_get_partition_count', lambda media: 2)) + # Pretend the partitions are mounted. + self.useFixture(MockSomethingFixture( + partitions, 'is_partition_mounted', lambda part: True)) + tempfile = self._create_qemu_img_with_partitions(',1,0x0C,*\n,,,-') + media = Media(tempfile) + # Pretend our tempfile is a block device. + media.is_block_device = True + popen_fixture = self.useFixture(MockCmdRunnerPopenFixture()) + uuid = '2e82008e-1af3-4699-8521-3bf5bac1e67a' + bootfs, rootfs = setup_partitions( + 'beagle', media, 32, '2G', 'boot', 'root', 'ext3', uuid, True, + True, True) + self.assertEqual( + [['sudo', 'parted', '-s', tempfile, 'mklabel', 'msdos'], + ['sudo', 'sfdisk', '-D', '-H', '255', '-S', '63', tempfile], + ['sync'], + # Since the partitions are mounted, setup_partitions will umount + # them before running mkfs. + ['sudo', 'umount', bootfs], + ['sudo', 'mkfs.vfat', '-F', '32', bootfs, '-n', 'boot'], + ['sudo', 'umount', rootfs], + ['sudo', 'mkfs.ext3', '-U', uuid, rootfs, '-L', 'root']], + popen_fixture.mock.calls) + + +class TestPopulateRootFS(TestCaseWithFixtures): + + lines_added_to_fstab = None + create_flash_kernel_config_called = False + + def test_populate_rootfs(self): + def fake_append_to_fstab(disk, additions): + self.lines_added_to_fstab = additions + + def fake_create_flash_kernel_config(disk, partition_offset): + self.create_flash_kernel_config_called = True + + # Mock stdout, cmd_runner.Popen(), append_to_fstab and + # create_flash_kernel_config. + self.useFixture(MockSomethingFixture( + sys, 'stdout', open('/dev/null', 'w'))) + self.useFixture(MockSomethingFixture( + rootfs, 'append_to_fstab', fake_append_to_fstab)) + self.useFixture(MockSomethingFixture( + rootfs, 'create_flash_kernel_config', + fake_create_flash_kernel_config)) + popen_fixture = self.useFixture(MockCmdRunnerPopenFixture()) + # Store a dummy rootdisk and contents_dir in a tempdir. + tempdir = self.useFixture(CreateTempDirFixture()).tempdir + root_disk = os.path.join(tempdir, 'rootdisk') + contents_dir = os.path.join(tempdir, 'contents') + contents_bin = os.path.join(contents_dir, 'bin') + contents_etc = os.path.join(contents_dir, 'etc') + os.makedirs(contents_bin) + os.makedirs(contents_etc) + + populate_rootfs( + contents_dir, root_disk, partition='/dev/rootfs', + rootfs_type='ext3', rootfs_uuid='uuid', should_create_swap=True, + swap_size=100, partition_offset=0) + + self.assertEqual( + ['UUID=uuid / ext3 errors=remount-ro 0 1 ', + '/SWAP.swap none swap sw 0 0'], + self.lines_added_to_fstab) + self.assertEqual(True, self.create_flash_kernel_config_called) + swap_file = os.path.join(root_disk, 'SWAP.swap') + expected = [ + ['sudo', 'mount', '/dev/rootfs', root_disk], + ['sudo', 'mv', contents_bin, contents_etc, root_disk], + ['sudo', 'dd', 'if=/dev/zero', 'of=%s' % swap_file, 'bs=1M', + 'count=100'], + ['sudo', 'mkswap', swap_file], + ['sync'], + ['sudo', 'umount', root_disk]] + self.assertEqual(expected, popen_fixture.mock.calls) + + def test_create_flash_kernel_config(self): + fixture = self.useFixture(MockCmdRunnerPopenFixture()) + tempdir = self.useFixture(CreateTempDirFixture()).tempdir + + create_flash_kernel_config(tempdir, boot_partition_number=1) + + calls = fixture.mock.calls + self.assertEqual(1, len(calls), calls) + call = calls[0] + # The call writes to a tmpfile and then moves it to the, + # /etc/flash-kernel.conf, so the tmpfile is the next to last in the + # list of arguments stored. + tmpfile = call[-2] + self.assertEqual( + ['sudo', 'mv', '-f', tmpfile, + '%s/etc/flash-kernel.conf' % tempdir], + call) + self.assertEqual('UBOOT_PART=/dev/mmcblk0p1', open(tmpfile).read()) + + def test_move_contents(self): + tempdir = self.useFixture(CreateTempDirFixture()).tempdir + popen_fixture = self.useFixture(MockCmdRunnerPopenFixture()) + file1 = self.createTempFileAsFixture(dir=tempdir) + + move_contents(tempdir, '/tmp/') + + self.assertEqual([['sudo', 'mv', file1, '/tmp/']], + popen_fixture.mock.calls) + + def test_has_space_left_for_swap(self): + statvfs = os.statvfs('/') + space_left = statvfs.f_bavail * statvfs.f_bsize + swap_size_in_megs = space_left / (1024**2) + self.assertTrue( + has_space_left_for_swap('/', swap_size_in_megs)) + + def test_has_no_space_left_for_swap(self): + statvfs = os.statvfs('/') + space_left = statvfs.f_bavail * statvfs.f_bsize + swap_size_in_megs = (space_left / (1024**2)) + 1 + self.assertFalse( + has_space_left_for_swap('/', swap_size_in_megs)) + + def test_write_data_to_protected_file(self): + fixture = self.useFixture(MockCmdRunnerPopenFixture()) + data = 'foo' + path = '/etc/nonexistant' + + write_data_to_protected_file(path, data) + + calls = fixture.mock.calls + self.assertEqual(1, len(calls), calls) + call = calls[0] + # The call moves tmpfile to the given path, so tmpfile is the next to + # last in the list of arguments stored. + tmpfile = call[-2] + self.assertEqual(['sudo', 'mv', '-f', tmpfile, path], call) + self.assertEqual(data, open(tmpfile).read()) + + +class TestCheckDevice(TestCaseWithFixtures): + + def _mock_does_device_exist_true(self): + self.useFixture(MockSomethingFixture( + check_device, '_does_device_exist', lambda device: True)) + + def _mock_does_device_exist_false(self): + self.useFixture(MockSomethingFixture( + check_device, '_does_device_exist', lambda device: False)) + + def _mock_print_devices(self): + self.useFixture(MockSomethingFixture( + check_device, '_print_devices', lambda: None)) + + def _mock_select_device(self): + self.useFixture(MockSomethingFixture( + check_device, '_select_device', lambda device: True)) + + def _mock_deselect_device(self): + self.useFixture(MockSomethingFixture( + check_device, '_select_device', lambda device: False)) + + def _mock_sys_stdout(self): + self.useFixture(MockSomethingFixture( + sys, 'stdout', open(os.devnull, 'w'))) + + def setUp(self): + super(TestCheckDevice, self).setUp() + self._mock_sys_stdout() + self._mock_print_devices() + + def test_ensure_device_partitions_not_mounted(self): + partitions_umounted = [] + def ensure_partition_is_not_mounted_mock(part): + partitions_umounted.append(part) + self.useFixture(MockSomethingFixture( + partitions, 'ensure_partition_is_not_mounted', + ensure_partition_is_not_mounted_mock)) + self.useFixture(MockSomethingFixture( + glob, 'glob', lambda pattern: ['/dev/sdz1', '/dev/sdz2'])) + check_device._ensure_device_partitions_not_mounted('/dev/sdz') + self.assertEquals(['/dev/sdz1', '/dev/sdz2'], partitions_umounted) + + def test_check_device_and_select(self): + self._mock_does_device_exist_true() + self._mock_select_device() + self.assertTrue( + check_device.confirm_device_selection_and_ensure_it_is_ready( + None)) + + def test_check_device_and_deselect(self): + self._mock_does_device_exist_true() + self._mock_deselect_device() + self.assertFalse( + check_device.confirm_device_selection_and_ensure_it_is_ready( + None)) + + def test_check_device_not_found(self): + self._mock_does_device_exist_false() + self.assertFalse( + check_device.confirm_device_selection_and_ensure_it_is_ready( + None)) + + +class AtExitRegister(object): + funcs = None + def __call__(self, func, *args, **kwargs): + if self.funcs is None: + self.funcs = [] + self.funcs.append((func, args, kwargs)) + + def run_funcs(self): + for func, args, kwargs in self.funcs: + func(*args, **kwargs) + + + +class TestInstallHWPack(TestCaseWithFixtures): + + def test_temporarily_overwrite_file_on_dir(self): + fixture = self.useFixture(MockCmdRunnerPopenFixture()) + temporarily_overwrite_file_on_dir('/path/to/file', '/dir', '/tmp/dir') + self.assertEquals( + [['sudo', 'mv', '-f', '/dir/file', '/tmp/dir/file'], + ['sudo', 'cp', '/path/to/file', '/dir']], + fixture.mock.calls) + + fixture.mock.calls = [] + run_local_atexit_funcs() + self.assertEquals( + [['sudo', 'mv', '-f', '/tmp/dir/file', '/dir']], + fixture.mock.calls) + + def test_copy_file(self): + fixture = self.useFixture(MockCmdRunnerPopenFixture()) + copy_file('/path/to/file', '/dir') + self.assertEquals( + [['sudo', 'cp', '/path/to/file', '/dir']], + fixture.mock.calls) + + fixture.mock.calls = [] + run_local_atexit_funcs() + self.assertEquals( + [['sudo', 'rm', '-f', '/dir/file']], fixture.mock.calls) + + def test_mount_chroot_proc(self): + fixture = self.useFixture(MockCmdRunnerPopenFixture()) + mount_chroot_proc('chroot') + self.assertEquals( + [['sudo', 'mount', 'proc', 'chroot/proc', '-t', 'proc']], + fixture.mock.calls) + + fixture.mock.calls = [] + run_local_atexit_funcs() + self.assertEquals( + [['sudo', 'umount', '-v', 'chroot/proc']], fixture.mock.calls) + + def test_install_hwpack(self): + self.useFixture(MockSomethingFixture( + sys, 'stdout', open('/dev/null', 'w'))) + fixture = self.useFixture(MockCmdRunnerPopenFixture()) + force_yes = False + install_hwpack('chroot', 'hwpack.tgz', force_yes) + self.assertEquals( + [['sudo', 'cp', 'hwpack.tgz', 'chroot'], + ['sudo', 'chroot', 'chroot', 'linaro-hwpack-install', + '/hwpack.tgz']], + fixture.mock.calls) + + fixture.mock.calls = [] + run_local_atexit_funcs() + self.assertEquals( + [['sudo', 'rm', '-f', 'chroot/hwpack.tgz']], fixture.mock.calls) + + def test_install_hwpacks(self): + self.useFixture(MockSomethingFixture( + sys, 'stdout', open('/dev/null', 'w'))) + fixture = self.useFixture(MockCmdRunnerPopenFixture()) + force_yes = True + install_hwpacks( + 'chroot', '/tmp/dir', force_yes, 'hwpack1.tgz', 'hwpack2.tgz') + self.assertEquals( + [['sudo', 'mv', '-f', 'chroot/etc/resolv.conf', + '/tmp/dir/resolv.conf'], + ['sudo', 'cp', '/etc/resolv.conf', 'chroot/etc'], + ['sudo', 'mv', '-f', 'chroot/etc/hosts', '/tmp/dir/hosts'], + ['sudo', 'cp', '/etc/hosts', 'chroot/etc'], + ['sudo', 'cp', '/usr/bin/qemu-arm-static', 'chroot/usr/bin'], + ['sudo', 'cp', 'linaro_media_create/../linaro-hwpack-install', + 'chroot/usr/bin'], + ['sudo', 'mount', 'proc', 'chroot/proc', '-t', 'proc'], + ['sudo', 'cp', 'hwpack1.tgz', 'chroot'], + ['sudo', 'chroot', 'chroot', 'linaro-hwpack-install', + '--force-yes', '/hwpack1.tgz'], + ['sudo', 'cp', 'hwpack2.tgz', 'chroot'], + ['sudo', 'chroot', 'chroot', 'linaro-hwpack-install', + '--force-yes', '/hwpack2.tgz'], + ['sudo', 'rm', '-f', 'chroot/hwpack2.tgz'], + ['sudo', 'rm', '-f', 'chroot/hwpack1.tgz'], + ['sudo', 'umount', '-v', 'chroot/proc'], + ['sudo', 'rm', '-f', 'chroot/usr/bin/linaro-hwpack-install'], + ['sudo', 'rm', '-f', 'chroot/usr/bin/qemu-arm-static'], + ['sudo', 'mv', '-f', '/tmp/dir/hosts', 'chroot/etc'], + ['sudo', 'mv', '-f', '/tmp/dir/resolv.conf', 'chroot/etc']], + fixture.mock.calls) diff --git a/linaro_media_create/unpack_binary_tarball.py b/linaro_media_create/unpack_binary_tarball.py new file mode 100644 index 0000000..8eeea29 --- /dev/null +++ b/linaro_media_create/unpack_binary_tarball.py @@ -0,0 +1,9 @@ +from linaro_media_create import cmd_runner + + +def unpack_binary_tarball(tarball, unpack_dir, as_root=True): + proc = cmd_runner.run( + ['tar', '--numeric-owner', '-C', unpack_dir, '-xf', tarball], + as_root=as_root) + proc.wait() + return proc.returncode |