#!/usr/bin/python
#
# DeviceKit-disks test suite
# Run in dk-disks built tree to test local built binaries (needs
# --libexecdir=`pwd`/src --localstatedir=/var), or from anywhere else to test
# system installed binaries.
#
# Usage:
# - Run all tests: 
#   ./testsuite  
# - Run only a particular class of tests:
#   ./testsuite Luks
# - Run only a single test:
#   ./testsuite FS.text_ext3

# Copyright: (C) 2009 Martin Pitt <martin.pitt@ubuntu.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# TODO:
# - Add hotplug stresstest: stop/rebuild array with some partitions in a loop,
#   and check added/removed devices and emitted signals
# - Add LUKS stresstest (was racy in the past)
# - Test LinuxMd* D-BUS interface

import subprocess
import os
import unittest
import sys
import tempfile
import atexit
import time
import shutil
import dbus
import signal
import stat

test_ram_dev1 = '/dev/ram8'
test_ram_dev2 = '/dev/ram9'
test_ram_dev3 = '/dev/ram10'
test_md_dev = '/dev/md125'

DK_D = 'org.freedesktop.DeviceKit.Disks.Device'

# ----------------------------------------------------------------------------

class DKDisksTestCase(unittest.TestCase):
    '''Base class for DeviceKit-disks test cases.
    
    This provides static functions which are useful for all test cases.
    '''

    tool_path = None
    daemon = None
    daemon_log = None
    device = None

    manager_obj = None
    manager_props = None

    @classmethod
    def init(klass):
        '''start daemon and set up test environment'''

        assert os.geteuid() == 0, 'need to be root for running this'

        # ensure that we can use test_ram_dev* for testing
        for r in (test_ram_dev1, test_ram_dev2, test_ram_dev3):
            assert os.path.exists(r), r + ' does not exist -- you might need to load the rd module?'
            assert stat.S_ISBLK(os.stat(r).st_mode), r + ' is not a block device'
            assert r not in open('/proc/mounts').read(), r + ' is already in use'
            subprocess.call(['dd', 'if=/dev/zero', 'of='+r],
                    stderr=subprocess.PIPE)

        assert test_md_dev not in open('/proc/mdstat').read(), test_md_dev + 'is already in use'

        klass.device = test_md_dev
        assert subprocess.call(['mdadm', '--create', test_md_dev, '--force', '-n',
            '1', '-l', 'raid0', test_ram_dev1]) == 0

        # start with a clean slate: zero out device 
        klass.zero_device()

        # run from local build tree if we are in one, otherwise use system instance
        if (os.access ('src/devkit-disks-daemon', os.X_OK)):
            daemon_path = 'src/devkit-disks-daemon'
            klass.tool_path = 'tools/devkit-disks'
            print 'Testing binaries from local build tree'
        else:
            print 'Testing installed system binaries'
            daemon_path = None
            for l in open('/usr/share/dbus-1/system-services/org.freedesktop.DeviceKit.Disks.service'):
                if l.startswith('Exec='):
                    daemon_path = l.split('=', 1)[1].strip()
                    break
            assert daemon_path, 'could not determine daemon path from D-BUS .service file'

            klass.tool_path = 'devkit-disks'

        print 'daemon path:', daemon_path

        # start daemon
        klass.daemon_log = tempfile.TemporaryFile()
        klass.daemon = subprocess.Popen([daemon_path, '--replace'],
            stdout=klass.daemon_log, stderr=subprocess.STDOUT)
        assert klass.daemon.pid, 'daemon failed to start'
        time.sleep(0.5) # give it some time to settle

        atexit.register(klass.cleanup)

        obj = dbus.SystemBus().get_object('org.freedesktop.DeviceKit.Disks',
            '/org/freedesktop/DeviceKit/Disks')
        klass.manager_iface = dbus.Interface(obj, 'org.freedesktop.DeviceKit.Disks')
        klass.manager_props = dbus.Interface(obj, dbus.PROPERTIES_IFACE)

    @classmethod
    def cleanup(klass):
        '''stop daemon again and clean up test environment'''

        subprocess.call(['umount', test_md_dev], stderr=subprocess.PIPE) # if a test failed
        subprocess.call(['mdadm', '-S', test_md_dev])
        klass.device = None

        os.kill(klass.daemon.pid, signal.SIGTERM)
        os.wait()
        klass.daemon = None

        #print '----- daemon log ----'
        #klass.daemon_log.seek(0)
        #print klass.daemon_log.read()

    @classmethod
    def sync(klass):
        '''Wait until pending events finished processing.'''

        subprocess.call(['udevadm', 'settle'])

    @classmethod
    def zero_device(klass):
        subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device],
                stderr=subprocess.PIPE)
        klass.sync()

    @classmethod
    def devname(klass, partition=None):
        '''Get name of test device or one of its partitions'''

        if partition:
            return klass.device + 'p' + str(partition)
        else:
            return klass.device

    @classmethod
    def partition_obj(klass, partition=None):
        '''Get D-Bus object of test device or one of its partitions'''

        p = '/org/freedesktop/DeviceKit/Disks/devices/' + \
            os.path.basename(klass.devname(partition))
        if partition:
            p += 'p' + str(partition)

        return dbus.SystemBus().get_object('org.freedesktop.DeviceKit.Disks', p)

    @classmethod
    def partition_iface(klass, partition=None):
        '''Get D-Bus Disks interface of test device or one of its partitions'''

        return dbus.Interface(klass.partition_obj(partition), DK_D)

    @classmethod
    def partition_props(klass, partition=None):
        '''Get D-Bus Disks properties of test device or one of its partitions'''

        return dbus.Interface(klass.partition_obj(partition),
                dbus.PROPERTIES_IFACE)

    @classmethod
    def get_info(klass, partition=None, devname=None):
        '''Return devkit-disks --info in a dictionary.
        
        If no partition number is given, this queries device. If devname is
        given, info for that is returned instead.
        '''
        info = subprocess.Popen([klass.tool_path, '--show-info',
            devname or klass.devname(partition)],
                stdout=subprocess.PIPE)
        out = info.communicate()[0]
        assert info.returncode == 0, 'devkit-disks --info failed'

        props = {}
        prefix = ''
        for l in out.splitlines():
            if not l.startswith('  ') or not ':' in l:
                continue

            if l.startswith('  linux md:'):
                prefix = 'md_'
                continue
            elif l.startswith('  partition table:'):
                prefix = 'partition_'
                continue
            if prefix and not l.startswith('    '):
                prefix = ''

            (k, v) = l.split(':', 1)
            props[prefix + k.strip()] = v.strip()

        return props

    @classmethod
    def get_uuid(klass, partition=None):
        '''Use blkid to determine UUID.'''

        uuid = None
        blkid = subprocess.Popen(['blkid', '-p', '-o', 'udev', 
            klass.devname(partition)], stdout=subprocess.PIPE)
        for l in blkid.stdout:
            if l.startswith('ID_FS_UUID='):
                uuid = l.split('=', 1)[1].strip()
        assert blkid.wait() == 0
        return uuid

    @classmethod
    def get_partitions(klass):
        '''Return list of test device partitions known to DK-D.'''

        info = subprocess.Popen([klass.tool_path, '--enumerate-device-files'],
                stdout=subprocess.PIPE)
        out = info.communicate()[0]
        assert info.returncode == 0, 'devkit-disks --enumerate-device-files failed'

        partitions = []
        for l in out.splitlines():
            l = l.strip()
            if l.startswith(klass.device) and l != klass.device:
                partitions.append(l[len(klass.device):])
        return partitions

    @classmethod
    def mkfs(klass, type, label=None, partition=None):
        '''Create file system using mkfs.'''

        no_stderr = False
        if type == 'vfat':
            cmd = ['mkfs.vfat', '-F', '32']
            if label:
                cmd += ['-n', label]
        elif type == 'reiserfs':
            cmd = ['mkfs.reiserfs', '-q']
            if label:
                cmd += ['-l', label]
            no_stderr = True
        elif type == 'minix':
            assert label is None, 'minix does not support labels'
            cmd = ['mkfs.minix']
        elif type == 'swap':
            cmd = ['mkswap', '-f']
            if label:
                cmd += ['-L', label]
        else:
            cmd = ['mkfs.' + type, '-q']
            if label:
                cmd += ['-L', label]

        if type == 'xfs':
            # XFS complains if there's an existing FS, so --force
            cmd.append('-f')

        cmd.append(klass.devname(partition))

        if no_stderr:
            assert subprocess.call(cmd, stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE) == 0
        else:
            assert subprocess.call(cmd, stdout=subprocess.PIPE) == 0

        # kernel/udev generally detect those changes itself, but do not quite
        # tell us when they are done; so do a little kludge here to know how
        # long we need to wait
        subprocess.call(['udevadm', 'trigger', '--action=change',
            '--sysname-match=' + os.path.basename(klass.devname(partition))])
        klass.sync()

    @classmethod
    def fs_create(klass, partition, type, options):
        '''Call FilesystemCreate() on partition with given type and options.'''

        klass.partition_iface(partition).FilesystemCreate(type, options)
        # .FilesystemCreate() already blocks until the mkfs job is done; but
        # without udevsettle the property updating is racy
        klass.sync()

    @classmethod
    def retry_busy(klass, fn, *args):
        '''Call a function until it does not fail with "Busy".'''

        timeout = 10
        while timeout >= 0:
            try:
                return fn(*args)
            except dbus.DBusException, e:
                if e._dbus_error_name != 'org.freedesktop.DeviceKit.Disks.Error.Busy':
                    raise
                print >> sys.stderr, '[busy] ',
                time.sleep(0.3)
                timeout -= 1

# ----------------------------------------------------------------------------

class FS(DKDisksTestCase):
    '''Test detection of all supported file systems'''

    def setUp(self):
        self.workdir = tempfile.mkdtemp()

    def tearDown(self):
        if subprocess.call(['umount', self.device], stderr=subprocess.PIPE) == 0:
            print >> sys.stderr, '[cleanup unmount] ',
        shutil.rmtree (self.workdir)

    def test_zero(self):
        '''properties of zeroed out md device'''

        self.zero_device()
        info = self.get_info()
        self.assertEqual(info['is mounted'], '0')
        self.assertEqual(info['mount paths'], '')
        self.assertEqual(info['presentation hide'], '0')
        self.assertEqual(info['presentation name'], '')
        self.assertEqual(info['usage'], '')
        self.assertEqual(info['type'], '')
        self.assertEqual(len(info['md_uuid']), 35)
        self.assertEqual(info['uuid'], '')
        self.assertEqual(info['label'], '')

        self.assertEqual(self.get_partitions(), [])

    def test_ext2(self):
        '''fs: ext2'''
        self._do_fs_check('ext2')

    def test_ext3(self):
        '''fs: ext3'''
        self._do_fs_check('ext3')

    def test_ext4(self):
        '''fs: ext4'''
        self._do_fs_check('ext4')

    def test_minix(self):
        '''fs: minix'''
        self._do_fs_check('minix')

    def test_xfs(self):
        '''fs: XFS'''
        self._do_fs_check('xfs')

    def test_ntfs(self):
        '''fs: NTFS'''
        self._do_fs_check('ntfs')

    def test_vfat(self):
        '''fs: FAT'''
        self._do_fs_check('vfat')

    def test_reiserfs(self):
        '''fs: reiserfs'''
        self._do_fs_check('reiserfs')

    def test_swap(self):
        '''fs: swap'''
        self._do_fs_check('swap')

    def _do_fs_check(self, type):
        '''Run checks for a particular file system.'''

        if type != 'swap' and subprocess.call(['which', 'mkfs.' + type],
                stdout=subprocess.PIPE) != 0:
            print >> sys.stderr, '[no mkfs.%s, skip] ' % type,

            # check correct D-Bus exception
            try:
                self.fs_create(None, type, [])
                self.fail('Expected failure for missing mkfs.' + type)
            except dbus.DBusException, e:
                self.assertEqual(e._dbus_error_name,
                        'org.freedesktop.DeviceKit.Disks.Error.FilesystemToolsMissing',
                        str(e))

            return

        # do checks with command line tools (mkfs/mount/umount)
        print >> sys.stderr, '[cli]',

        self._do_mkfs_check(type)
        if type != 'minix':
            self._do_mkfs_check(type, 'test%stst' % type)

        # put a different fs here instead of zeroing, so that we verify that
        # DK-D overrides existing FS (e. g. XFS complains then), and does not
        # leave traces of other FS around
        if type == 'ext3':
            self.mkfs('swap')
        else:
            self.mkfs('ext3')

        # do checks with DK-Disks D-BUS operations
        print >> sys.stderr, '[dkd] ',
        self._do_dbus_fs_check(type)
        if type != 'minix':
            self._do_dbus_fs_check(type, 'test%stst' % type)

    def _do_mkfs_check(self, type, label=None):
        '''Run mkfs/mount/umount check for a fs and label.
        
        This checks that DK-Disks correctly picks up command line too actions.
        '''
        self.mkfs(type, label)
        i = self.get_info()

        self.assertEqual(i['usage'], (type == 'swap') and 'other' or 'filesystem')

        self.assertEqual(i['type'], type)
        self.assertEqual(i['label'], label or '')
        if type != 'swap':
            self.assertEqual(i['is mounted'], '0')
            self.assertEqual(i['mount paths'], '')
        self.assertEqual(i['presentation name'], '')
        if type != 'minix':
            self.assertEqual(i['uuid'], self.get_uuid())

        if type == 'swap':
            return

        # mount it using "mount"
        if type == 'ntfs' and subprocess.call(['which', 'mount.ntfs-3g'],
                stdout=subprocess.PIPE) == 0:
            # prefer mount.ntfs-3g if we have it (on Debian; Ubuntu
            # defaults to ntfs-3g if installed); TODO: check other distros
            mount_prog = 'mount.ntfs-3g'
        else:
            mount_prog = 'mount'
        ret = subprocess.call([mount_prog, self.device, self.workdir])

        if ret == 32:
            # missing fs driver
            print >> sys.stderr, '[missing kernel driver, skip] ',
            return

        self.assertEqual(ret, 0)
        i = self.get_info()
        self.assertEqual(i['is mounted'], '1')
        self.assertEqual(i['mount paths'], self.workdir)

        # unmount it using "umount"
        subprocess.call(['umount', self.workdir])
        i = self.get_info()
        self.assertEqual(i['is mounted'], '0')
        self.assertEqual(i['mount paths'], '')

    def _do_dbus_fs_check(self, type, label=None):
        '''Run DK-D FSCreate/Mount/Unmount check for a fs and label.
        
        This checks the D-Bus methods that DK-Disks offers.
        '''
        # check that DK-disks reports the fs as supported
        for fs in self.manager_props.Get('org.freedesktop.DeviceKit.Disks',
                'KnownFilesystems'):
            if fs[0] == type:
                supports_unix_owners = fs[2]
                self.assert_(supports_unix_owners in (True, False))
                self.assertEqual(fs[3], type != 'swap') # can_mount
                self.assert_(fs[4]) # can_create
                supports_label_rename = fs[6]
                # minix does not support labels; EXFAIL: swap doesn't have a program for it
                self.assertEqual(supports_label_rename, type not in ('minix', 'swap'))
                break
        else:
            self.fail('KnownFilesystems does not contain ' + type)

        options = []
        if label:
            options.append('label=' + label)

        # create fs
        self.fs_create(None, type, options)
        i = self.get_info()

        self.assertEqual(i['usage'], (type == 'swap') and 'other' or 'filesystem')
        self.assertEqual(i['type'], type)
        self.assertEqual(i['label'], label or '')
        if type != 'swap':
            self.assertEqual(i['is mounted'], '0')
            self.assertEqual(i['mount paths'], '')
        self.assertEqual(i['presentation name'], '')
        if type != 'minix':
            self.assertEqual(i['uuid'], self.get_uuid())

        # open files when unmounted
        iface = self.partition_iface()
        self.assertRaises(dbus.DBusException,
                iface.FilesystemListOpenFiles)

        if type != 'swap':
            # mount
            try:
                mount_path = iface.FilesystemMount('', [])
            except dbus.DBusException, e:
                self.assertEqual(e._dbus_error_name,
                        'org.freedesktop.DeviceKit.Disks.Error.FilesystemDriverMissing',
                        str(e))
                print >> sys.stderr, '[missing kernel driver, skip] ',
                return

            if label:
                self.assertEqual(mount_path, '/media/' + label)
            else:
                self.assert_(mount_path.startswith('/media/'))
            i = self.get_info()
            self.assertEqual(i['is mounted'], '1')
            self.assertEqual(i['mount paths'], mount_path)

            # no ownership taken, should be root owned
            st = os.stat(mount_path)
            self.assertEqual((st.st_uid, st.st_gid), (0, 0))

            # open files when mounted
            self.assertEqual(iface.FilesystemListOpenFiles(), [])

            f = open(os.path.join(mount_path, 'test.txt'), 'w')
            result = iface.FilesystemListOpenFiles()
            self.assertEqual(len(result), 1)
            self.assertEqual(result[0][0], os.getpid())
            self.assertEqual(result[0][1], os.geteuid())
            self.assert_(sys.argv[0] in result[0][2])
            f.close()
            self.assertEqual(iface.FilesystemListOpenFiles(), [])

            # unmount
            self.retry_busy(self.partition_iface().FilesystemUnmount, [])

            i = self.get_info()
            self.assertEqual(i['is mounted'], '0')
            self.assertEqual(i['mount paths'], '')

            # create fs with taking ownership (daemon:mail == 1:8)
            if supports_unix_owners:
                options.append('take_ownership_uid=1')
                options.append('take_ownership_gid=8')
                self.fs_create(None, type, options)
                mount_path = iface.FilesystemMount('', [])
                st = os.stat(mount_path)
                self.assertEqual((st.st_uid, st.st_gid), (1, 8))
                self.retry_busy(self.partition_iface().FilesystemUnmount, [])

        # change label
        if supports_label_rename:
            l = 'rename' + type
            iface.FilesystemSetLabel(l)
            self.sync()
            i = self.get_info()
            if type == 'vfat':
                # EXFAIL: often (but not always) the label appears in all upper case
                self.assertEqual(i['label'].upper(), l.upper())
            else:
                self.assertEqual(i['label'], l)
        else:
            self.assertRaises(dbus.DBusException, iface.FilesystemSetLabel, 'foo')

        # check fs
        self.assertEqual(iface.FilesystemCheck([]), True)


# ----------------------------------------------------------------------------

class Luks(DKDisksTestCase):
    '''Check LUKS.'''

    def test_0_create_teardown(self):
        '''LUKS create/teardown'''

        self.fs_create(None, 'ext3', ['luks_encrypt=s3kr1t', 'label=treasure'])

        try:
            # check crypted device info
            i = self.get_info() 
            self.assertEqual(i['usage'], 'crypto')
            self.assertEqual(i['type'], 'crypto_LUKS')
            self.assertEqual(i['label'], '') # encrypted device
            self.assertEqual(i['is mounted'], '0')
            self.assertEqual(i['mount paths'], '')
            self.assertEqual(i['presentation name'], '')
            self.assert_(i['holder'].startswith('/org/freedesktop/DeviceKit/Disks/devices/'))
            self.assertEqual(i['uuid'], self.get_uuid())

            # check crypted device properties
            crypt_props = self.partition_props()
            self.assertEqual(crypt_props.Get(DK_D, 'DeviceIsLuks'), True)
            self.assertEqual(crypt_props.Get(DK_D, 'DeviceIsLuksCleartext'), False)
            self.assertEqual(crypt_props.Get(DK_D, 'LuksHolder'), i['holder'])

            # check cleartext device properties
            clear_obj = dbus.SystemBus().get_object('org.freedesktop.DeviceKit.Disks', i['holder'])
            clear_props = dbus.Interface(clear_obj, dbus.PROPERTIES_IFACE)
            self.assertEqual(clear_props.Get(DK_D, 'DeviceIsLuks'), False)
            self.assertEqual(clear_props.Get(DK_D, 'DeviceIsLuksCleartext'), True)
            self.assertEqual(clear_props.Get(DK_D, 'LuksCleartextUnlockedByUid'), 0)
            self.assert_(clear_props.Get(DK_D, 'LuksCleartextSlave').endswith('/' +
                os.path.basename(self.device)))

            # check cleartext device info
            self.assert_(i['holder'] in self.manager_iface.EnumerateDevices())
            clear_devname = clear_props.Get(DK_D, 'DeviceFile')
            ci = self.get_info(devname=clear_devname)
            self.assert_(os.path.exists(clear_devname))
            self.assertEqual(ci['usage'], 'filesystem')
            self.assertEqual(ci['type'], 'ext3')
            self.assertEqual(ci['label'], 'treasure')
            self.assertEqual(ci['is mounted'], '0')
            self.assertEqual(ci['mount paths'], '')
            self.assertEqual(ci['presentation name'], '')

        finally:
            # tear down cleartext device
            self.partition_iface().LuksLock([])
            self.failIf(i['holder'] in self.manager_iface.EnumerateDevices())
            self.failIf(os.path.exists(clear_devname))
            self.assertRaises(dbus.DBusException, clear_props.Get, DK_D, 'DeviceFile')

    def test_luks_mount(self):
        '''LUKS mount/unmount'''

        # wrong password
        self.assertRaises(dbus.DBusException,
                self.partition_iface().LuksUnlock, 'h4ck3rz', [])

        # correct password
        clear_objpath = self.retry_busy(self.partition_iface().LuksUnlock, 's3kr1t', [])
        self.assert_(clear_objpath in self.manager_iface.EnumerateDevices())

        clear_obj = dbus.SystemBus().get_object('org.freedesktop.DeviceKit.Disks',
                clear_objpath)
        clear_props = dbus.Interface(clear_obj, dbus.PROPERTIES_IFACE)
        clear_iface = dbus.Interface(clear_obj, DK_D)

        # mount
        mount_path = clear_iface.FilesystemMount('', [])
        clear_devname = clear_props.Get(DK_D, 'DeviceFile')
        self.assertEqual(mount_path, '/media/treasure')

        i = self.get_info(devname=clear_devname)
        self.assertEqual(i['is mounted'], '1')
        self.assertEqual(i['mount paths'], mount_path)

        # can't lock, busy
        self.assertRaises(dbus.DBusException, self.partition_iface().LuksLock, [])

        # umount
        self.retry_busy(clear_iface.FilesystemUnmount, [])
        i = self.get_info()
        self.assertEqual(i['is mounted'], '0')
        self.assertEqual(i['mount paths'], '')

        # lock
        self.partition_iface().LuksLock([])
        self.failIf(clear_objpath in self.manager_iface.EnumerateDevices())

    def test_luks_change_passphrase(self):
        '''LUKS change passphrase'''

        # wrong password
        self.assertRaises(dbus.DBusException,
                self.partition_iface().LuksChangePassphrase, 'h4ck3rz', 'foo')
        self.assertEqual(self.partition_props().Get(DK_D, 'LuksHolder'), '/',
                'changing passphrase does not unlock')

        # correct password
        self.partition_iface().LuksChangePassphrase('s3kr1t', 'cl4ss1f13d')
        holder = self.partition_props().Get(DK_D, 'LuksHolder')
        self.assertEqual(holder, '/', 
                'changing passphrase does not unlock: ' + holder)

        # old password is invalid now
        self.assertRaises(dbus.DBusException,
                self.partition_iface().LuksUnlock, 's3kr1t', [])

        # new password is accepted
        self.partition_iface().LuksUnlock('cl4ss1f13d', [])
        self.partition_iface().LuksLock([])

        # change it back so that order of tests does not matter
        self.partition_iface().LuksChangePassphrase('cl4ss1f13d', 's3kr1t')

        self.sync()

# ----------------------------------------------------------------------------

class Partitions(DKDisksTestCase):
    '''Check partition operations.'''

    def setUp(self):
        self.zero_device()
        self.assertEqual(self.get_partitions(), [])

        info = self.get_info()
        self.assertEqual([k for k in info if k.startswith('partition_')], [])
        self.assertEqual(self.fdisk_list(), None)

    def tearDown(self):
        self.partition_iface().PartitionTableCreate('none', [])
        info = self.get_info()
        self.assertEqual([k for k in info if k.startswith('partition_')], [])
        self.assertEqual(self.fdisk_list(), None)

    def test_mbr(self):
        '''Partitions: mbr'''

        self._do_schema('mbr', '0x82', '0x06', 'boot')

    def test_gpt(self):
        '''Partitions: GUID'''

        self._do_schema('gpt', 'EBD0A0A2-B9E5-4433-87C0-68B6B72699C7', 
                '0657FD6D-A4AB-43C4-84E5-0933C84B4F4F', 'required')

    # TODO: fails in various ways
    def disabled_test_apm(self):
        '''Partitions: Apple'''

        try:
            self._do_schema('apm', 'Apple_Unix_SVR2', 'Foo', 'allow_write',
                    exp_default_partitions=2) # Apple creates bootstrap stuff by default
            self.fail('creating apple partition at offset 0 should fail due to default bootstrap partitions')
        except dbus.DBusException, e:
            self.assert_("Can't have overlapping partitions." in str(e))

        self._do_schema('apm', 'Apple_Unix_SVR2', 'Foo', 'allow_write',
                exp_default_partitions=2, # Apple creates bootstrap stuff by default
                first_offset=3000000)

    def _do_schema(self, schema, type1, type2, flag,
            exp_default_partitions=0, first_offset=0):
        '''Run tests for a particular schema/type'''

        # create partition table
        self.partition_iface().PartitionTableCreate(schema, [])
        self.sync()

        info = self.get_info()
        self.assertEqual(info['partition_scheme'], schema)
        self.assertEqual(info['partition_count'], str(exp_default_partitions))
        self.assertEqual(len(self.get_partitions()), exp_default_partitions)

        if schema == 'mbr':
            self.assertEqual(self.fdisk_list(), [])

        # check device object properties
        props = self.partition_props()
        self.assertEqual(props.Get(DK_D, 'DeviceIsPartition'), False)
        self.assertEqual(props.Get(DK_D, 'DeviceIsPartitionTable'), True)
        self.assertEqual(props.Get(DK_D, 'PartitionTableScheme'), schema)
        self.assertEqual(props.Get(DK_D, 'PartitionTableCount'), exp_default_partitions)

        # p1: non-flagged, no fs
        p1 = self.partition_iface().PartitionCreate(first_offset, 10000000, type1, 
                '', [], [], '', [])
        self.assert_(p1 in self.manager_iface.EnumerateDevices())
 
        if schema == 'mbr':
            fdisk = self.fdisk_list()
            self.assertEqual(len(fdisk), 1)
            fdisk = fdisk[0]
            self.assert_(os.path.exists(fdisk[0]), 'p1: device file does not exist')
            self.assertEqual(fdisk[1], False, 'p1 is bootable')
            self.assertEqual(fdisk[5], type1.lstrip('0x'))

        # EXFAIL: /dev/md5p1 still works and appears, but kernel/udev never
        # create /dev/md5p2 and following
        #self.partition_iface().PartitionCreate(0, 10000000, type2, '',
        #        [flag], [], '', [])
        #print self.fdisk_list()

        # the device is not a partition, so calls should fail
        self.assertRaises(dbus.DBusException,
                self.partition_iface().PartitionDelete, [])
        self.assertRaises(dbus.DBusException,
                self.partition_iface().PartitionModify, type2, '', [flag])

        # check p1 object properties
        p1_obj = dbus.SystemBus().get_object('org.freedesktop.DeviceKit.Disks',
            p1)
        p1_iface = dbus.Interface(p1_obj, DK_D)
        p1_props = dbus.Interface(p1_obj, dbus.PROPERTIES_IFACE)

        self.assertEqual(p1_props.Get(DK_D, 'DeviceIsPartition'), True)
        self.assertEqual(p1_props.Get(DK_D, 'DeviceIsPartitionTable'), False)
        self.assertEqual(p1_props.Get(DK_D, 'PartitionSlave'), 
                '/org/freedesktop/DeviceKit/Disks/devices/' +
                os.path.basename(self.device))
        self.assertEqual(p1_props.Get(DK_D, 'PartitionScheme'), schema)
        self.assertEqual(p1_props.Get(DK_D, 'PartitionType'), type1)
        self.assertEqual(p1_props.Get(DK_D, 'PartitionLabel'), (schema == 'apm' and 'untitled' or ''))
        self.assertEqual(p1_props.Get(DK_D, 'PartitionFlags'), [])
        self.assertEqual(p1_props.Get(DK_D, 'PartitionNumber'), (schema == 'apm' and 2 or 1))
        self.assertEqual(p1_props.Get(DK_D, 'IdUsage'), '')
        self.assertEqual(p1_props.Get(DK_D, 'IdType'), '')
        off = p1_props.Get(DK_D, 'PartitionOffset')
        self.assert_(off >= first_offset and off <= first_offset+20000)
        size = p1_props.Get(DK_D, 'PartitionSize')
        self.assert_(size >= 9500000 and off <= 10500000)
        self.assertEqual(props.Get(DK_D, 'PartitionTableCount'), exp_default_partitions + 1)

        # modify
        p1_iface.PartitionModify(type2, '', [flag])
        self.sync()
        self.assertEqual(p1_props.Get(DK_D, 'PartitionType'), type2)
        self.assertEqual(p1_props.Get(DK_D, 'PartitionFlags'), [flag])

        # delete
        p1_iface.PartitionDelete([])
        self.sync()
        self.assertRaises(dbus.DBusException, p1_iface.PartitionModify, type2, '', [flag])
        self.assertRaises(dbus.DBusException, p1_props.Get, DK_D, 'PartitionType')

        self.failIf(p1 in self.manager_iface.EnumerateDevices())
        self.assertEqual(props.Get(DK_D, 'PartitionTableCount'), 0)

        if schema == 'mbr':
            self.failIf(os.path.exists(fdisk[0]), 'p1: device file still exists')
            self.assertEqual(self.fdisk_list(), [])

        # recreate p1: flagged, with fs
        p1 = self.partition_iface().PartitionCreate(0, 10000000, type1, 
                '', [flag], [], 'ext3', ['label=e3part'])
        self.sync()
        self.assert_(p1 in self.manager_iface.EnumerateDevices())

        p1_obj = dbus.SystemBus().get_object('org.freedesktop.DeviceKit.Disks',
            p1)
        p1_props = dbus.Interface(p1_obj, dbus.PROPERTIES_IFACE)
        self.assertEqual(p1_props.Get(DK_D, 'DeviceIsPartition'), True)
        self.assertEqual(p1_props.Get(DK_D, 'PartitionType'), type1)
        self.assertEqual(p1_props.Get(DK_D, 'PartitionFlags'), [flag])
        self.assertEqual(p1_props.Get(DK_D, 'PartitionNumber'), 1)
        self.assertEqual(p1_props.Get(DK_D, 'IdUsage'), 'filesystem')
        self.assertEqual(p1_props.Get(DK_D, 'IdType'), 'ext3')
        self.assertEqual(p1_props.Get(DK_D, 'IdLabel'), 'e3part')

    def fdisk_list(self):
        '''Parse fdisk -l.

        Return None if device does not have a partition table, or a list of
        (device, boot, start, end, blocks, id) tuples.
        '''
        env = os.environ
        env['LC_ALL'] = 'C'
        fdisk = subprocess.Popen(['fdisk', '-l', self.device], env=env,
                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (out, err) = fdisk.communicate()
        if err != '':
            return None
        parts = []
        for l in out.splitlines():
            if l.startswith('/dev/'):
                fields = l.split()
                if fields[1] == '*':
                    # boot flag
                    fields[1] = True
                    fields = tuple(fields[:6])
                else:
                    fields = tuple([fields[0], False] + fields[1:5])
                parts.append(fields)
        return parts

# ----------------------------------------------------------------------------

hd_smart_blob = None

class Smart(DKDisksTestCase):
    '''Check SMART operation.'''

    def test_0_hd_status(self):
        '''SMART status of first internal hard disk
        
        This is a best-effort readonly test.
        '''
        hd = '/dev/sda'

        if not os.path.exists(hd):
            print >> sys.stderr, '[skip]',
            return

        has_smart = subprocess.call(['skdump', '--can-smart', hd],
                stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0

        obj = dbus.SystemBus().get_object('org.freedesktop.DeviceKit.Disks', 
                self.manager_iface.FindDeviceByDeviceFile(hd))
        iface = dbus.Interface(obj, DK_D)
        props = dbus.Interface(obj, dbus.PROPERTIES_IFACE)
        info = self.get_info(devname=hd)

        self.assertEqual(props.Get(DK_D, 'DriveAtaSmartIsAvailable'),
                has_smart)

        if has_smart:
            print >> sys.stderr, '[avail] ',
            self.assert_(info['ATA SMART'].startswith('Updated at '))
            self.assertNotEqual(props.Get(DK_D, 'DriveAtaSmartTimeCollected'), 0)
            global hd_smart_blob
            hd_smart_blob = ''.join(map(chr, props.Get(DK_D, 'DriveAtaSmartBlob')))
            # this is of course not truly correct for a test suite, but let's
            # consider it a courtesy for developers :-)
            self.assertEqual(info['overall assessment'], 'Good')
            self.assertEqual(props.Get(DK_D, 'DriveAtaSmartStatus'), 'GOOD')

            try:
                self.partition_iface().DriveAtaSmartInitiateSelftest('bogus', [])
                self.fail('bogus mode succeeded')
            except dbus.DBusException, e:
                self.assertEqual(e._dbus_error_name, 'org.freedesktop.DeviceKit.Disks.Error.Failed')
        else:
            print >> sys.stderr, '[N/A] ',
            self.assertEqual(info['ATA SMART'], 'not available')
            self.assertEqual(props.Get(DK_D, 'DriveAtaSmartTimeCollected'), 0)
            self.assertEqual(props.Get(DK_D, 'DriveAtaSmartBlob'), [])
            self.failIf('overall assessment' in info)

            try:
                self.partition_iface().DriveAtaSmartInitiateSelftest('short', [])
                self.fail('device did not report to have SMART capabilities')
            except dbus.DBusException, e:
                self.assert_('does not support ATA SMART' in str(e))

    def test_simulate(self):
        '''SMART status of simulated data on test device
        
        This requires SMART being available from the first hard disk, to
        collect the blob used for testing.
        '''
        global hd_smart_blob
    
        if not hd_smart_blob:
            print >> sys.stderr, '[skip]',
            return
    
        props = self.partition_props()
        self.failIf(props.Get(DK_D, 'DriveAtaSmartIsAvailable'))
        self.assertEqual(props.Get(DK_D, 'DriveAtaSmartTimeCollected'), 0)
        self.assertEqual(props.Get(DK_D, 'DriveAtaSmartBlob'), [])
    
        # without simulate, DK-disks should complain about absent SMART
        try:
            self.partition_iface().DriveAtaSmartRefreshData([])
            self.fail('expected "Device does not support ATA SMART"')
        except dbus.DBusException, e:
            self.assert_('does not support ATA SMART' in str(e))
        try:
            self.partition_iface().DriveAtaSmartInitiateSelftest('short', [])
            self.fail('fake device is not expected to have SMART capabilities')
        except dbus.DBusException, e:
            self.assert_('does not support ATA SMART' in str(e))
    
        # load the blob
        blob_f = tempfile.NamedTemporaryFile()
        blob_f.write(hd_smart_blob)
        blob_f.flush()
        self.partition_iface().DriveAtaSmartRefreshData(['simulate=' + blob_f.name])

        info = self.get_info()
    
        self.assertEqual(props.Get(DK_D, 'DriveAtaSmartIsAvailable'), True)
    
        self.assert_(info['ATA SMART'].startswith('Updated at '))
        self.assertNotEqual(props.Get(DK_D, 'DriveAtaSmartTimeCollected'), 0)
    
        self.assertEqual(hd_smart_blob, ''.join(map(chr, props.Get(DK_D, 'DriveAtaSmartBlob'))))
        self.assertEqual(info['overall assessment'], 'Good')
        self.assertEqual(props.Get(DK_D, 'DriveAtaSmartStatus'), 'GOOD')
    
        # tool should have the entire SMART info
        tool_info = subprocess.Popen([self.tool_path, '--show-info',
            self.device], stdout=subprocess.PIPE)
        out = tool_info.communicate()[0]
        self.assert_('start-stop-count' in out)
        self.assert_('Pre-fail' in out)
    

# ----------------------------------------------------------------------------

class LVM(DKDisksTestCase):
    '''Check LVM devices.'''

    def setUp(self):
        '''Create a VG "dktest".

        This uses two RAM disks as PV.
        '''
        if subprocess.call(['which', 'pvcreate'], stdout=subprocess.PIPE) != 0:
            self.fail('lvm tools not installed')
            return
        self.assertEqual(subprocess.call(['pvcreate', test_ram_dev2],
            stdout=subprocess.PIPE), 0)
        self.assertEqual(subprocess.call(['pvcreate', test_ram_dev3],
            stdout=subprocess.PIPE), 0)
        self.vgname = 'dktest'
        self.assertEqual(subprocess.call(['vgcreate', self.vgname,
            test_ram_dev2, test_ram_dev3], stdout=subprocess.PIPE), 0)

    def tearDown(self):
        '''Remove dktest VG.'''

        self.assertEqual(subprocess.call(['vgremove', '-f',
            self.vgname], stdout=subprocess.PIPE), 0)

    def test_single_lv(self):
        '''LVM: Single LV, no RAID'''

        objs_old = set(self.manager_iface.EnumerateDevices())

        self.assertEqual(subprocess.call(['lvcreate', '-n', 'dktestlv1', '-L',
            '52M', self.vgname], stdout=subprocess.PIPE), 0)
        self.sync()

        # there should be exactly one new device for the LV
        objs_new = set(self.manager_iface.EnumerateDevices())
        self.assertEqual(len(objs_old) + 1, len(objs_new))
        lvname = list(objs_new - objs_old)[0]

        lv_props = dbus.Interface(dbus.SystemBus().get_object(
            'org.freedesktop.DeviceKit.Disks', lvname), dbus.PROPERTIES_IFACE)

        # the LV is a real volume which should be shown, but not automounted
        self.assert_(lv_props.Get(DK_D, 'DeviceFile').startswith('/dev/mapper/'))
        self.assertEqual(lv_props.Get(DK_D, 'DevicePresentationHide'), False)
        self.assertEqual(lv_props.Get(DK_D, 'DevicePresentationNopolicy'), True)

        # ensure that we have a UUID
        found_uuid = False
        for i in lv_props.Get(DK_D, 'DeviceFileById'):
            if 'uuid-LVM' in i:
                found_uuid = True
        self.assert_(found_uuid, 'no by-uuid found in ' + str(i))

    def test_single_lv_raid(self):
        '''LVM: Single LV, RAID-1'''

        objs_old = set(self.manager_iface.EnumerateDevices())

        self.assertEqual(subprocess.call(['lvcreate', '-n', 'dktestlvr1', '-L',
            '50M', '-m', '1', '--mirrorlog', 'core', self.vgname],
            stdout=subprocess.PIPE), 0)
        self.sync()

        # there should be two new shadow devices for the RAID images, and one
        # real LV
        objs_new = set(self.manager_iface.EnumerateDevices())
        self.assertEqual(len(objs_old) + 3, len(objs_new))
        lv_objs = objs_new - objs_old

        #subprocess.call(['bash', '-i'])

        # find the real one; TODO: is this nameing scheme right on all distros?
        devname = '/dev/mapper/%s-dktestlvr1' % self.vgname
        real_lv_obj = self.manager_iface.FindDeviceByDeviceFile(devname)
        self.assert_(real_lv_obj in lv_objs)

        # put a file system onto it, for testing properties
        iface = dbus.Interface(dbus.SystemBus().get_object(
                'org.freedesktop.DeviceKit.Disks', real_lv_obj), DK_D)
        iface.FilesystemCreate('ext3', [])
        self.sync()

        for o in lv_objs:
            props = dbus.Interface(dbus.SystemBus().get_object(
                'org.freedesktop.DeviceKit.Disks', o), dbus.PROPERTIES_IFACE)

            if o == real_lv_obj:
                self.assert_(props.Get(DK_D, 'DeviceFile').startswith('/dev/mapper/'))
                # never hide the real LV
                self.assertEqual(props.Get(DK_D, 'DevicePresentationHide'), False)
                self.assertEqual(props.Get(DK_D, 'IdUsage'), 'filesystem')
                self.assertEqual(props.Get(DK_D, 'IdType'), 'ext3')

                # ensure that we have a UUID
                found_uuid = False
                for i in props.Get(DK_D, 'DeviceFileById'):
                    if 'uuid-LVM' in i:
                        found_uuid = True
                self.assert_(found_uuid, 'no by-uuid found in ' + str(i))
            else:
                # mirror images should not have any real FS usage at all
                self.assertEqual(props.Get(DK_D, 'IdUsage'), '')

                # mirror images should not have by-* symlinks (avoid probing);
                # that is actually the job of the lvm2 udev rules, but check it
                # here to ensure proper system integration
                self.assertEqual(props.Get(DK_D, 'DeviceFileById'), [])

            self.assertEqual(props.Get(DK_D, 'DevicePresentationNopolicy'), True)

# ----------------------------------------------------------------------------

class GlobalOps(DKDisksTestCase):
    '''Check various global operations.'''

    def test_daemon_version(self):
        '''DaemonVersion property'''

        ver = self.manager_props.Get('org.freedesktop.DeviceKit.Disks',
                'DaemonVersion')
        self.assertEqual(type(ver), dbus.String)
        self.assert_(len(ver) > 0)

    def test_enumerate_devices(self):
        '''EnumerateDevices()'''

        devs = self.manager_iface.EnumerateDevices()
        self.assert_(len(devs) > 1) # at least our test device and root fs
        self.assert_('/org/freedesktop/DeviceKit/Disks/devices/' +
                    os.path.basename(self.device) in devs)

    def test_enumerate_device_files(self):
        '''EnumerateDeviceFiles()'''

        devs = self.manager_iface.EnumerateDeviceFiles()
        self.assert_(len(devs) > 1) # at least our test device and root fs
        self.assert_(self.device in devs)

    def test_find_by_devpath(self):
        '''FindDeviceByDeviceFile()'''

        self.assertEqual(
                self.manager_iface.FindDeviceByDeviceFile(self.device),
                '/org/freedesktop/DeviceKit/Disks/devices/' +
                    os.path.basename(self.device))

        self.assertRaises(dbus.DBusException, 
                self.manager_iface.FindDeviceByDeviceFile, '/dev/nonexisting')

    def test_find_by_major_minor(self):
        '''FindDeviceByMajorMinor()'''

        st = os.stat(self.device)
        dev = self.manager_iface.FindDeviceByMajorMinor(os.major(st.st_rdev),
                os.minor(st.st_rdev))
        self.assertEqual(dev, '/org/freedesktop/DeviceKit/Disks/devices/' +
                    os.path.basename(self.device))

        self.assertRaises(dbus.DBusException, 
                self.manager_iface.FindDeviceByMajorMinor, 42, 42)

    def test_inhibition(self):
        '''inhibition'''

        # Inhibit()
        self.failIf(self.manager_props.Get('org.freedesktop.DeviceKit.Disks',
            'DaemonIsInhibited'))
        cookie1 = self.manager_iface.Inhibit()
        self.assert_(self.manager_props.Get('org.freedesktop.DeviceKit.Disks',
            'DaemonIsInhibited'))

        # try mounting, should fail due to inhibition
        try:
            self.partition_iface().FilesystemMount('', [])
            self.fail('.FilesystemMount() succeeded while inhibited')
        except dbus.DBusException, e:
            self.assert_(e._dbus_error_name.endswith('Error.Inhibited'))

        # Inhibit() another time
        cookie2 = self.manager_iface.Inhibit()
        self.assert_(self.manager_props.Get('org.freedesktop.DeviceKit.Disks',
            'DaemonIsInhibited'))

        # Uninhibit()
        self.manager_iface.Uninhibit(cookie1)
        self.assert_(self.manager_props.Get('org.freedesktop.DeviceKit.Disks',
            'DaemonIsInhibited'))

        self.assertRaises(dbus.DBusException, self.manager_iface.Uninhibit,
            '0xDEADBEEF')

        self.manager_iface.Uninhibit(cookie2)
        self.failIf(self.manager_props.Get('org.freedesktop.DeviceKit.Disks',
            'DaemonIsInhibited'))

        self.assertRaises(dbus.DBusException, self.manager_iface.Uninhibit,
            cookie1)

# ----------------------------------------------------------------------------

if __name__ == '__main__':
    DKDisksTestCase.init()
    if len(sys.argv) == 1:
        tests = unittest.TestLoader().loadTestsFromName('__main__')
    else:
        tests = unittest.TestLoader().loadTestsFromNames(sys.argv[1:],
                __import__('__main__'))
    unittest.TextTestRunner(verbosity=2).run(tests)
            


