
from twisted.trial import unittest
import config_manager
import os
import logging
import shutil
from StringIO import StringIO

import pqm

sample_message = ("From: John.Citizen@example.com\n"
                  "Subject: A new action\n"
                  "star-merge archive@example.com/foo--bar--0 "
                  "archive2@example.com/bar--foo--1")
sample_signed_message = ("From: whee@bar.com (Matthew Thomas)\n"
    "Subject: [trivial] fix various actions portlet icons\n"
    "-----BEGIN PGP SIGNED MESSAGE-----\n"
    "Hash: SHA1\n"
    "\n"
    "star-merge mpt@canonical.com/launchpad--devel--0 rocketfuel@canonical.com/launchpad--devel--0\n"
    "-----BEGIN PGP SIGNATURE-----\n"
    "Version: GnuPG v1.2.5 (GNU/Linux)\n"
    "\n"
    "iD8DBQFC8guw6PUxNfU6ecoRAt5OAKDPyd1Ac0H7QO4DIEiZ2wvscqrg0ACeNL0h\n"
    "ksM1dJeD/aX54NnE/uqps/M=\n"
    "=X7Ib\n"
    "-----END PGP SIGNATURE-----\n"
    "\n")

sample_config_with_overrides = ("[DEFAULT]\n"
                                "queuedir=/tmp\n"
                                "[location overrides]\n"
                                "/local/path=sftp://host/otherpath\n"
                                "scheme://something=righthandside\n")

class QueueSetup(object):
    """Setup a queue with mock messages in it."""
    def __init__(self):
        self.configFileName = "Foo"
        self.cwd = os.path.abspath(os.path.curdir)
        self.message = sample_message

    def setUp(self):
        myFile=open(self.configFileName, "w")
        myFile.write("[DEFAULT]\n"
                     "queuedir=%s/queue" % self.cwd)
        myFile.close()
        self.queuedir = os.path.join(self.cwd, "queue")
        try:
            os.mkdir(self.queuedir)
        except OSError:
            pass
        self.messageFileName = os.path.join(self.queuedir, "patch.00000001")
        messageFile = open(self.messageFileName, "w")
        messageFile.write(self.message)
        messageFile.close()

    def tearDown(self):
        os.unlink(self.configFileName)
        shutil.rmtree(self.queuedir, ignore_errors=True)


class TestScript(unittest.TestCase):
    """Test the script command objects"""
    
    def testName(self):
        import pqm
        patch = pqm.Script('foo.script', logging, False, 0)
        self.assertEqual(patch.filename, 'foo.script')

    def tearDown(self):
        try:
            os.unlink(self.scriptname)
        except OSError:
            pass

    def getScript(self, content):
        """Get a script for testing with."""
        import pqm
        self.scriptname = 'foo.script'
        scriptFile = open(self.scriptname, "w")
        scriptFile.write(content)
        scriptFile.close()
        return pqm.Script(self.scriptname, logging, False, 54)

    def testFields(self):
        script = self.getScript(sample_message)
        self.assertEqual(script.getSender(), "John.Citizen@example.com")
        self.assertEqual(script.getSubject(), "A new action")
        self.assertEqual(script.getContent(), 
            ("star-merge archive@example.com/foo--bar--0 "
             "archive2@example.com/bar--foo--1"))
        self.assertEqual(script.getLines(), 
            [("star-merge archive@example.com/foo--bar--0 "
             "archive2@example.com/bar--foo--1")])
        self.assertEqual(script.getCommands(), 
            [("star-merge archive@example.com/foo--bar--0 "
             "archive2@example.com/bar--foo--1")])

    def testGPGFields(self):
        script = self.getScript(sample_signed_message)
        self.assertEqual(script.getSender(), "whee@bar.com (Matthew Thomas)")
        self.assertEqual(script.getSubject(), 
            "[trivial] fix various actions portlet icons")
        self.assertEqual(script.getContent(), 
            ('-----BEGIN PGP SIGNED MESSAGE-----\n' 
             'Hash: SHA1\n' 
             '\n' 
             'star-merge mpt@canonical.com/launchpad--devel--0 '
             'rocketfuel@canonical.com/launchpad--devel--0\n'
             '-----BEGIN PGP SIGNATURE-----\n' 
             'Version: GnuPG v1.2.5 (GNU/Linux)\n'
             '\n'
             'iD8DBQFC8guw6PUxNfU6ecoRAt5OAKDPyd1Ac0H7QO4DIEiZ2wvscqrg0ACeNL0h\n'
             'ksM1dJeD/aX54NnE/uqps/M=\n'
             '=X7Ib\n'
             '-----END PGP SIGNATURE-----\n'
             '\n'))
        self.assertEqual(script.getLines(), 
            [("star-merge mpt@canonical.com/launchpad--devel--0 "
             "rocketfuel@canonical.com/launchpad--devel--0")])
        self.assertEqual(script.getCommands(), 
            [("star-merge mpt@canonical.com/launchpad--devel--0 "
             "rocketfuel@canonical.com/launchpad--devel--0")])

    def testDate(self):
        """Can we access a submission time for scripts."""
        script = self.getScript(sample_message)
        self.assertEqual(script.getSubmissionTime(), 54)

# NOTDONEYET: move command recognition from CommandRunner to Script
# and commands should follow the command pattern rather than being strings.
#
#    def testIsCommand(self):
#        """Is a line a command line?"""
#        import pqm
#        script = pqm.Script('foo.script', logging, False)
#        self.failIf(script.is_command(""))


class MockScript(object):

    def getSender(self):
        return "a_sender"


class TestCommandRunner(unittest.TestCase):

    def test_star_merge_urls(self):
        from pqm import CommandRunner
        star_match = CommandRunner.star_re.match("star-merge file:///url1 file:///url2")
        self.assertEqual(star_match.group(1), 'file:///url1')
        self.assertEqual(star_match.group(2), 'file:///url2')
        
    def test_star_merge_names(self):
        from pqm import CommandRunner
        star_match = CommandRunner.star_re.match("star-merge foo@bar/foo--bar--0 foo@bar/gam--foo--0")
        self.assertEqual(star_match.group(1), 'foo@bar/foo--bar--0')
        self.assertEqual(star_match.group(2), 'foo@bar/gam--foo--0')

    def test_get_arch_impl(self):
        from pqm import CommandRunner
        runner = CommandRunner()
        # TODO should be able to pass on systems without baz installed.
        # TODO set a fake logger
        self.failUnless(isinstance(runner.get_arch_impl(),
                                   pqm.Baz1_1Handler))

    def test_check_revision(self):
        runner = pqm.CommandRunner()
        runner.script = MockScript()
        pqm.allowed_revisions={'foo@bar/gam--foo--0':{},
                                    'file:///tmp/foo':{},
                                    'file:///tmp/bar/':{}}
        runner.check_target('foo@bar/gam--foo--0', 'blah')
        runner.check_target('file:///tmp/foo', 'blah')
        self.assertRaises(pqm.PQMCmdFailure,
                          runner.check_target,
                          'file:///tmp/foo/branch',
                          'blah')
        self.assertRaises(pqm.PQMCmdFailure,
                          runner.check_target,
                          'file:///tmp/bar',
                          'blah')
        runner.check_target('file:///tmp/bar/branch', 'blah')
 
    def test_get_target_unpublished_config(self):
        runner = pqm.CommandRunner()
        pqm.allowed_revisions={'foo@bar/gam--foo--0':{'a':'a',
                                                           'published_at':None},
                                    'file:///tmp/foo':{'b':'b'},
                                    'file:///tmp/bar/':{'c':'c'}}
        self.assertEqual(runner.get_target_config('foo@bar/gam--foo--0'),
                                                  ('foo@bar/gam--foo--0',
                                                    {'a':'a',
                                                     'published_at':None,
                                                     'publish_to':None}))
        self.assertRaises(KeyError, runner.get_target_config, 'foo@bar/gam--0')
        
    def test_get_target_config(self):
        runner = pqm.CommandRunner()
        pqm.allowed_revisions={'foo@bar/gam--foo--0':{'published_at':'file:///a'},
                                    'file:///tmp/foo':{'published_at':'file:///b',
                                                       'publish_to':'sftp://host/b'},
                                    'file:///tmp/bar/':{'published_at':'file:///c/'}}
        self.assertEqual(runner.get_target_config('foo@bar/gam--foo--0'),
                                                  ('foo@bar/gam--foo--0',
                                                  {'published_at':'file:///a',
                                                   'publish_to':'file:///a'}))
        self.assertEqual(runner.get_target_config('file:///a'),
                                                  ('foo@bar/gam--foo--0',
                                                  {'published_at':'file:///a',
                                                   'publish_to':'file:///a'}))
        # close but shouldn't match:
        self.assertRaises(KeyError, runner.get_target_config, 'foo@bar/gam--0')
        self.assertRaises(KeyError, runner.get_target_config, 'file:///a/bar')
        # note that publish_to is not overriden by the default from published_at.
        self.assertEqual(runner.get_target_config('file:///tmp/foo'),
                                                  ('file:///tmp/foo',
                                                  {'published_at':'file:///b',
                                                   'publish_to':'sftp://host/b'}))
        self.assertEqual(runner.get_target_config('file:///b'),
                                                  ('file:///tmp/foo',
                                                  {'published_at':'file:///b',
                                                   'publish_to':'sftp://host/b'}))
        # close but shouldn't match:
        self.assertRaises(KeyError,
                          runner.get_target_config,
                          'file:///tmp/foo/branch')
        self.assertRaises(KeyError,
                          runner.get_target_config,
                          'file:///b/branch')
        self.assertEqual(runner.get_target_config('file:///tmp/bar/branch'),
                                                ('file:///tmp/bar/branch',
                                                {'published_at':'file:///c/branch',
                                                 'publish_to':'file:///c/branch'}))
        self.assertEqual(runner.get_target_config('file:///c/branch'),
                                                ('file:///tmp/bar/branch',
                                                {'published_at':'file:///c/branch',
                                                 'publish_to':'file:///c/branch'}))
        # but the location isn't a branch itself
        self.assertRaises(KeyError,
                          runner.get_target_config,
                          'file:///tmp/bar')
        self.assertRaises(KeyError,
                          runner.get_target_config,
                          'file:///c')
        self.assertRaises(KeyError,
                          runner.get_target_config,
                          'file:///c/')

    def test__make_wd_path(self):
        runner = pqm.CommandRunner()
        self.assertEqual(runner._make_wd_path('/foo', 'foo@bar/gam--foo--0'),
                         '/foo/foo@bar')
        self.assertEqual(runner._make_wd_path('/foo', '/foo/bar'), '/foo/foo')
        self.assertEqual(runner._make_wd_path('/foo', 'file:///foo/bar'), 
                         '/foo/file')
        self.assertEqual(runner._make_wd_path('/foo', 'branch'), '/foo/branch')

    def test__branch_name(self):
        runner = pqm.CommandRunner()
        self.assertEqual(runner._branch_name('foo@bar/gam--foo--0'),
                         'gam--foo--0')
        self.assertEqual(runner._branch_name('foo'), 'foo')
        self.assertEqual(runner._branch_name('file:///home/bar/foo'), 'foo')
        self.assertEqual(runner._branch_name('file:///home/bar/foo'), 'foo')

    def test_cleanup_wd_missing_dir(self):
        runner = pqm.CommandRunner()
        pqm.workdir = "test-workdir"
        pqm.allowed_revisions={'path':{'build_dir':'nonexistant-dir'}}
        os.mkdir("test-workdir")
        try:
            runner.cleanup_wd()
        finally:
            shutil.rmtree("test-workdir")

    def test__get_url_override_mapper_no_overrides(self):
        runner = pqm.CommandRunner()
        runner.configp = pqm.ConfigParser()
        expected_mapper = config_manager.URLMapper()
        self.assertEqual(expected_mapper, runner._get_url_override_mapper())
        
    def test__get_url_override_mapper(self):
        runner = pqm.CommandRunner()
        runner.configp = pqm.ConfigParser()
        runner.configp.readfp(StringIO(sample_config_with_overrides))
        expected_mapper = config_manager.URLMapper()
        expected_mapper.add_map("sftp://host/otherpath", "/local/path")
        expected_mapper.add_map("righthandside", "scheme://something")
        self.assertEqual(expected_mapper, runner._get_url_override_mapper())
        


class FunctionalTestCommandRunner(unittest.TestCase):

    def setUp(self):
        from bzrlib.add import smart_add_tree
        from bzrlib.branch import Branch
        from bzrlib.clone import copy_branch
        from bzrlib.merge import merge
        from bzrlib.merge_core import ApplyMerge3
        os.mkdir("bzrbranch")
        branch = Branch.initialize("bzrbranch")
        smart_add_tree(branch.working_tree(), ["bzrbranch"])
        branch.working_tree().commit("start branch.", verbose=False)

    def tearDown(self):
        shutil.rmtree("bzrbranch")

    def test_get_branch_handler_arch(self):
        # I would test this, but I don't want to add a full dependency on 
        # pybaz at this point
        #from pqm import CommandRunner
        #runner = CommandRunner()
        ## TODO set a fake logger
        pass
        
    def branch_url(self):
        branchurl = "file://%s" % os.path.abspath("bzrbranch")
        return branchurl.replace('\\', '/')
        
    def test_get_branch_handler_bzr_file(self):
        runner = pqm.CommandRunner()
        runner.script = MockScript()
        self.failUnless(isinstance(runner.get_branch_handler(self.branch_url()),
                                   pqm.Bazaar2Handler))
        
    def test_get_branch_handler_bzr_relative(self):
        runner = pqm.CommandRunner()
        runner.script = MockScript()
        self.failUnless(isinstance(runner.get_branch_handler("bzrbranch"),
                                   pqm.Bazaar2Handler))
 
    def test_set_current_vcs(self):
        runner = pqm.CommandRunner()
        runner.script = MockScript()
        self.assertEqual(runner.get_vcs(), None)
        runner.set_current_vcs(self.branch_url(), "bzrbranch")
        self.assertNotEqual(runner.get_vcs(), None)
        self.failUnless(isinstance(runner.get_vcs(), pqm.Bazaar2Handler))


class BzrHandlerTestCase(unittest.TestCase):

    def setUp(self):
        from bzrlib.add import smart_add_tree
        from bzrlib.branch import Branch
        from bzrlib.clone import copy_branch
        from bzrlib.merge import merge
        from bzrlib.merge_core import ApplyMerge3
        os.mkdir("bzrbranch")
        file = open("bzrbranch/README", "wt")
        file.write("Start!\n")
        file.close()
        branch = Branch.initialize("bzrbranch")
        smart_add_tree(branch.working_tree(), ["bzrbranch"])
        branch.working_tree().commit("start branch.", verbose=False)
        os.mkdir("branch-contributor")
        os.mkdir("bzrbranch-parent")
        os.mkdir("bzrbranch-public")
        copy_branch(branch, "bzrbranch-parent")
        copy_branch(branch, "bzrbranch-public")
        copy_branch(branch, "branch-contributor")
        contrib_branch = Branch.open("branch-contributor")
        file = open("branch-contributor/README", "wt")
        file.write("Boo!\n")
        file.close()
        contrib_branch.working_tree().commit("modify README")

    def tearDown(self):
        shutil.rmtree("bzrbranch")
        shutil.rmtree("branch-contributor")
        shutil.rmtree("bzrbranch-parent")
        shutil.rmtree("bzrbranch-public")

    def test_commit(self):
        from bzrlib.branch import Branch
        branch = Branch.open("bzrbranch")
        self.assertEqual(branch.get_revision(branch.last_revision()).message,
                         'start branch.')
        handler = pqm.Bazaar2Handler()
        config = {'publish_to':'bzrbranch-public'}
        handler.commit("me", "bzrbranch", "wheee", "bzrbranch-parent", config)
        for path in ['bzrbranch', 'bzrbranch-parent', 'bzrbranch-public']:
            branch = Branch.open(path)
            rev = branch.last_revision()
            self.assertEqual(branch.get_revision(rev).message, 'wheee')

    def test_commit_uncommits_when_push_fails(self):
        from bzrlib.branch import Branch
        branch = Branch.open("bzrbranch")
        self.assertEqual(branch.get_revision(branch.last_revision()).message,
                         'start branch.')
        handler = pqm.Bazaar2Handler()
        config = {'publish_to':'bzrbranch-public-missing'}
        self.assertRaises(pqm.PQMTlaFailure, handler.commit, "me",
                          "bzrbranch", "wheee", "bzrbranch-parent", config)
        # bzr branch's commit is allowed as its the local tree - we ont
        # care what happens there, but the
        # parent and published copies must be reverted.
        for path in ['bzrbranch-parent', 'bzrbranch-public']:
            branch = Branch.open(path)
            rev = branch.last_revision()
            self.assertEqual(branch.get_revision(rev).message, 'start branch.')

    def test_make_local_dir(self):
        try:
            handler = pqm.Bazaar2Handler()
            handler.make_local_dir("me", "bzrbranch", "proof")
            from bzrlib.branch import Branch
            branch = Branch.open("proof")
            self.assertEqual(branch.get_revision(branch.last_revision()).message,
                             'start branch.')
        finally:
            shutil.rmtree("proof")
            
    def test_merge(self):
        from bzrlib.branch import Branch
        branch = Branch.open("bzrbranch")
        self.assertEqual(branch.get_revision(branch.last_revision()).message,
                         'start branch.')
        contrib_branch = Branch.open("branch-contributor")
        self.assertEqual(contrib_branch.get_revision(
                            contrib_branch.last_revision()).message,
                         'modify README')
        self.failUnless(os.path.exists("bzrbranch/README"))
        handler = pqm.Bazaar2Handler()
        result = handler.do_star_merge("me", "branch-contributor", "bzrbranch")
        self.assertEqual(result, ["merge successful\n"])
        branch = Branch.open("bzrbranch")
        self.assertEqual(branch.get_revision(branch.last_revision()).message,
                         'start branch.')
        contrib_branch = Branch.open("branch-contributor")
        self.assertEqual(contrib_branch.get_revision(
                            contrib_branch.last_revision()).message,
                         'modify README')
        self.failUnless(os.path.exists("bzrbranch/README"))
        
    def test_merge_conflicts(self):
        from bzrlib.branch import Branch
        branch = Branch.open("bzrbranch")
        self.assertEqual(branch.get_revision(branch.last_revision()).message,
                         'start branch.')
        contrib_branch = Branch.open("branch-contributor")
        self.assertEqual(contrib_branch.get_revision(
                            contrib_branch.last_revision()).message,
                         'modify README')
        self.failUnless(os.path.exists("bzrbranch/README"))
        file = open("bzrbranch/README", "wt")
        file.write("Boo conflict!\n")
        file.close()
        branch.working_tree().commit('conflicting modification of README')
        handler = pqm.Bazaar2Handler()
        e = None
        try:
            handler.do_star_merge("me", "branch-contributor", "bzrbranch")
        except pqm.PQMTlaFailure, e:
            self.assertEqual(["Conflicts during merge: 1\n",
                              "README\n",
                              ],
                             e.output)
            return
        self.fail("Merge conflict not raised.")


class TestConfig(unittest.TestCase):

    def test_location_overrides(self):
        config = pqm.ConfigParser()
        config.readfp(StringIO(sample_config_with_overrides))
        location_rules = config.section_items('location overrides')
        expected_rules = [("/local/path", "sftp://host/otherpath"),
                          ("scheme", "//something=righthandside")]
        self.assertEqual(set(expected_rules), set(location_rules))
