aboutsummaryrefslogtreecommitdiff
path: root/rhodecode/tests/scripts
diff options
context:
space:
mode:
authorMarcin Kuzminski <marcin@python-works.com>2012-08-28 09:28:09 +0200
committerMarcin Kuzminski <marcin@python-works.com>2012-08-28 09:28:09 +0200
commitfdad5b1a2f6cc2fc703e10db33bb76526492353a (patch)
tree2eb6bdd2e63126306a722a738ee02ce3120486de /rhodecode/tests/scripts
parent7d446a70bf1385093e882a7f5d7848158aa1be6d (diff)
added new suite of tests for VCS operations
- locking tests - push pull tests - added new ENV option to bypass re-creation of test enviroment --HG-- branch : beta rename : rhodecode/tests/scripts/test_scm_operations.py => rhodecode/tests/scripts/test_vcs_operations.py
Diffstat (limited to 'rhodecode/tests/scripts')
-rwxr-xr-xrhodecode/tests/scripts/test_scm_operations.py242
-rwxr-xr-xrhodecode/tests/scripts/test_vcs_operations.py425
2 files changed, 425 insertions, 242 deletions
diff --git a/rhodecode/tests/scripts/test_scm_operations.py b/rhodecode/tests/scripts/test_scm_operations.py
deleted file mode 100755
index 59aa9bb8..00000000
--- a/rhodecode/tests/scripts/test_scm_operations.py
+++ /dev/null
@@ -1,242 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
- rhodecode.tests.test_scm_operations
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
- Test suite for making push/pull operations.
- Run using::
-
- RC_WHOOSH_TEST_DISABLE=1 nosetests rhodecode/tests/scripts/test_scm_operations.py
-
- :created_on: Dec 30, 2010
- :author: marcink
- :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
- :license: GPLv3, see COPYING for more details.
-"""
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import tempfile
-from os.path import join as jn
-from os.path import dirname as dn
-
-from tempfile import _RandomNameSequence
-from subprocess import Popen, PIPE
-
-from rhodecode.tests import *
-from rhodecode.model.db import User, Repository, UserLog
-from rhodecode.model.meta import Session
-
-DEBUG = True
-HOST = '127.0.0.1:5000' # test host
-
-
-class Command(object):
-
- def __init__(self, cwd):
- self.cwd = cwd
-
- def execute(self, cmd, *args):
- """
- Runs command on the system with given ``args``.
- """
-
- command = cmd + ' ' + ' '.join(args)
- if DEBUG:
- print '*** CMD %s ***' % command
- p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
- stdout, stderr = p.communicate()
- if DEBUG:
- print stdout, stderr
- return stdout, stderr
-
-
-def _get_tmp_dir():
- return tempfile.mkdtemp(prefix='rc_integration_test')
-
-
-def _construct_url(repo, dest=None, **kwargs):
- if dest is None:
- #make temp clone
- dest = _get_tmp_dir()
- params = {
- 'user': TEST_USER_ADMIN_LOGIN,
- 'passwd': TEST_USER_ADMIN_PASS,
- 'host': HOST,
- 'cloned_repo': repo,
- 'dest': dest
- }
- params.update(**kwargs)
- if params['user'] and params['passwd']:
- _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
- else:
- _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
- return _url
-
-
-def set_anonymous_access(enable=True):
- user = User.get_by_username(User.DEFAULT_USER)
- user.active = enable
- Session().add(user)
- Session().commit()
- print '\tanonymous access is now:', enable
- if enable != User.get_by_username(User.DEFAULT_USER).active:
- raise Exception('Cannot set anonymous access')
-
-
-def setup_module():
- #DISABLE ANONYMOUS ACCESS
- set_anonymous_access(False)
-
-
-def test_clone_hg_repo_by_admin():
- clone_url = _construct_url(HG_REPO)
- stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-
- assert 'requesting all changes' in stdout
- assert 'adding changesets' in stdout
- assert 'adding manifests' in stdout
- assert 'adding file changes' in stdout
-
- assert stderr == ''
-
-
-def test_clone_git_repo_by_admin():
- clone_url = _construct_url(GIT_REPO)
- stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-
- assert 'Cloning into' in stdout
- assert stderr == ''
-
-
-def test_clone_wrong_credentials_hg():
- clone_url = _construct_url(HG_REPO, passwd='bad!')
- stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
- assert 'abort: authorization failed' in stderr
-
-
-def test_clone_wrong_credentials_git():
- clone_url = _construct_url(GIT_REPO, passwd='bad!')
- stdout, stderr = Command('/tmp').execute('git clone', clone_url)
- assert 'fatal: Authentication failed' in stderr
-
-
-def test_clone_git_dir_as_hg():
- clone_url = _construct_url(GIT_REPO)
- stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
- assert 'HTTP Error 404: Not Found' in stderr
-
-
-def test_clone_hg_repo_as_git():
- clone_url = _construct_url(HG_REPO)
- stdout, stderr = Command('/tmp').execute('git clone', clone_url)
- assert 'not found: did you run git update-server-info on the server' in stderr
-
-
-def test_clone_non_existing_path_hg():
- clone_url = _construct_url('trololo')
- stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
- assert 'HTTP Error 404: Not Found' in stderr
-
-
-def test_clone_non_existing_path_git():
- clone_url = _construct_url('trololo')
- stdout, stderr = Command('/tmp').execute('git clone', clone_url)
- assert 'not found: did you run git update-server-info on the server' in stderr
-
-
-def test_push_new_file_hg():
- DEST = _get_tmp_dir()
- clone_url = _construct_url(HG_REPO, dest=DEST)
- stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
-
- # commit some stuff into this repo
- cwd = path = jn(DEST)
- added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
- Command(cwd).execute('touch %s' % added_file)
- Command(cwd).execute('hg add %s' % added_file)
-
- for i in xrange(3):
- cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
- Command(cwd).execute(cmd)
-
- cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (
- i,
- 'Marcin Kuźminski <marcin@python-blog.com>',
- added_file
- )
- Command(cwd).execute(cmd)
- # PUSH it back
- clone_url = _construct_url(HG_REPO, dest='')
- stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
-
- assert 'pushing to' in stdout
- assert 'Repository size' in stdout
- assert 'Last revision is now' in stdout
-
-
-def test_push_new_file_git():
- DEST = _get_tmp_dir()
- clone_url = _construct_url(GIT_REPO, dest=DEST)
- stdout, stderr = Command('/tmp').execute('git clone', clone_url)
-
- # commit some stuff into this repo
- cwd = path = jn(DEST)
- added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
- Command(cwd).execute('touch %s' % added_file)
- Command(cwd).execute('git add %s' % added_file)
-
- for i in xrange(3):
- cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
- Command(cwd).execute(cmd)
-
- cmd = """git ci -m 'commited new %s' --author '%s' %s """ % (
- i,
- 'Marcin Kuźminski <marcin@python-blog.com>',
- added_file
- )
- Command(cwd).execute(cmd)
- # PUSH it back
- clone_url = _construct_url(GIT_REPO, dest='')
- stdout, stderr = Command(cwd).execute('git push --verbose', clone_url)
-
- #WTF git stderr ?!
- assert 'master -> master' in stderr
-
-
-def test_push_modify_existing_file_hg():
- assert 0
-
-
-def test_push_modify_existing_file_git():
- assert 0
-
-
-def test_push_wrong_credentials_hg():
- assert 0
-
-
-def test_push_wrong_credentials_git():
- assert 0
-
-
-def test_push_back_to_wrong_url_hg():
- assert 0
-
-
-def test_push_back_to_wrong_url_git():
- assert 0
-
-
-#TODO: write all locking tests
diff --git a/rhodecode/tests/scripts/test_vcs_operations.py b/rhodecode/tests/scripts/test_vcs_operations.py
new file mode 100755
index 00000000..81ef6fa7
--- /dev/null
+++ b/rhodecode/tests/scripts/test_vcs_operations.py
@@ -0,0 +1,425 @@
+# -*- coding: utf-8 -*-
+"""
+ rhodecode.tests.test_scm_operations
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Test suite for making push/pull operations.
+ Run using::
+
+ RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py
+
+ :created_on: Dec 30, 2010
+ :author: marcink
+ :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
+ :license: GPLv3, see COPYING for more details.
+"""
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import tempfile
+import unittest
+from os.path import join as jn
+from os.path import dirname as dn
+
+from tempfile import _RandomNameSequence
+from subprocess import Popen, PIPE
+
+from rhodecode.tests import *
+from rhodecode.model.db import User, Repository, UserLog
+from rhodecode.model.meta import Session
+from rhodecode.model.repo import RepoModel
+
+DEBUG = True
+HOST = '127.0.0.1:5000' # test host
+
+
+class Command(object):
+
+ def __init__(self, cwd):
+ self.cwd = cwd
+
+ def execute(self, cmd, *args):
+ """
+ Runs command on the system with given ``args``.
+ """
+
+ command = cmd + ' ' + ' '.join(args)
+ if DEBUG:
+ print '*** CMD %s ***' % command
+ p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
+ stdout, stderr = p.communicate()
+ if DEBUG:
+ print stdout, stderr
+ return stdout, stderr
+
+
+def _get_tmp_dir():
+ return tempfile.mkdtemp(prefix='rc_integration_test')
+
+
+def _construct_url(repo, dest=None, **kwargs):
+ if dest is None:
+ #make temp clone
+ dest = _get_tmp_dir()
+ params = {
+ 'user': TEST_USER_ADMIN_LOGIN,
+ 'passwd': TEST_USER_ADMIN_PASS,
+ 'host': HOST,
+ 'cloned_repo': repo,
+ 'dest': dest
+ }
+ params.update(**kwargs)
+ if params['user'] and params['passwd']:
+ _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
+ else:
+ _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
+ return _url
+
+
+def _add_files_and_push(vcs, DEST, **kwargs):
+ """
+ Generate some files, add it to DEST repo and push back
+ vcs is git or hg and defines what VCS we want to make those files for
+
+ :param vcs:
+ :param DEST:
+ """
+ # commit some stuff into this repo
+ cwd = path = jn(DEST)
+ #added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
+ added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next())
+ Command(cwd).execute('touch %s' % added_file)
+ Command(cwd).execute('%s add %s' % (vcs, added_file))
+
+ for i in xrange(3):
+ cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
+ Command(cwd).execute(cmd)
+ if vcs == 'hg':
+ cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % (
+ i, 'Marcin Kuźminski <marcin@python-blog.com>', added_file
+ )
+ elif vcs == 'git':
+ cmd = """git ci -m 'commited new %s' --author '%s' %s """ % (
+ i, 'Marcin Kuźminski <marcin@python-blog.com>', added_file
+ )
+ Command(cwd).execute(cmd)
+ # PUSH it back
+ if vcs == 'hg':
+ _REPO = HG_REPO
+ elif vcs == 'git':
+ _REPO = GIT_REPO
+
+ kwargs['dest'] = ''
+ clone_url = _construct_url(_REPO, **kwargs)
+ if 'clone_url' in kwargs:
+ clone_url = kwargs['clone_url']
+ if vcs == 'hg':
+ stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
+ elif vcs == 'git':
+ stdout, stderr = Command(cwd).execute('git push', clone_url + " master")
+
+ return stdout, stderr
+
+
+def set_anonymous_access(enable=True):
+ user = User.get_by_username(User.DEFAULT_USER)
+ user.active = enable
+ Session().add(user)
+ Session().commit()
+ print '\tanonymous access is now:', enable
+ if enable != User.get_by_username(User.DEFAULT_USER).active:
+ raise Exception('Cannot set anonymous access')
+
+
+#==============================================================================
+# TESTS
+#==============================================================================
+
+class TestVCSOperations(unittest.TestCase):
+
+ @classmethod
+ def setup_class(cls):
+ #DISABLE ANONYMOUS ACCESS
+ set_anonymous_access(False)
+
+ def setUp(self):
+ r = Repository.get_by_repo_name(GIT_REPO)
+ Repository.unlock(r)
+ r.enable_locking = False
+ Session().add(r)
+ Session().commit()
+
+ r = Repository.get_by_repo_name(HG_REPO)
+ Repository.unlock(r)
+ r.enable_locking = False
+ Session().add(r)
+ Session().commit()
+
+ def test_clone_hg_repo_by_admin(self):
+ clone_url = _construct_url(HG_REPO)
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+ assert 'requesting all changes' in stdout
+ assert 'adding changesets' in stdout
+ assert 'adding manifests' in stdout
+ assert 'adding file changes' in stdout
+
+ assert stderr == ''
+
+ def test_clone_git_repo_by_admin(self):
+ clone_url = _construct_url(GIT_REPO)
+ stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+
+ assert 'Cloning into' in stdout
+ assert stderr == ''
+
+ def test_clone_wrong_credentials_hg(self):
+ clone_url = _construct_url(HG_REPO, passwd='bad!')
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+ assert 'abort: authorization failed' in stderr
+
+ def test_clone_wrong_credentials_git(self):
+ clone_url = _construct_url(GIT_REPO, passwd='bad!')
+ stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+ assert 'fatal: Authentication failed' in stderr
+
+ def test_clone_git_dir_as_hg(self):
+ clone_url = _construct_url(GIT_REPO)
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+ assert 'HTTP Error 404: Not Found' in stderr
+
+ def test_clone_hg_repo_as_git(self):
+ clone_url = _construct_url(HG_REPO)
+ stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+ assert 'not found:' in stderr
+
+ def test_clone_non_existing_path_hg(self):
+ clone_url = _construct_url('trololo')
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+ assert 'HTTP Error 404: Not Found' in stderr
+
+ def test_clone_non_existing_path_git(self):
+ clone_url = _construct_url('trololo')
+ stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+ assert 'not found:' in stderr
+
+ def test_push_new_file_hg(self):
+ DEST = _get_tmp_dir()
+ clone_url = _construct_url(HG_REPO, dest=DEST)
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+ stdout, stderr = _add_files_and_push('hg', DEST)
+
+ assert 'pushing to' in stdout
+ assert 'Repository size' in stdout
+ assert 'Last revision is now' in stdout
+
+ def test_push_new_file_git(self):
+ DEST = _get_tmp_dir()
+ clone_url = _construct_url(GIT_REPO, dest=DEST)
+ stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+
+ # commit some stuff into this repo
+ stdout, stderr = _add_files_and_push('git', DEST)
+
+ #WTF git stderr ?!
+ assert 'master -> master' in stderr
+
+ def test_push_wrong_credentials_hg(self):
+ DEST = _get_tmp_dir()
+ clone_url = _construct_url(HG_REPO, dest=DEST)
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+ stdout, stderr = _add_files_and_push('hg', DEST, user='bad',
+ passwd='name')
+
+ assert 'abort: authorization failed' in stderr
+
+ def test_push_wrong_credentials_git(self):
+ DEST = _get_tmp_dir()
+ clone_url = _construct_url(GIT_REPO, dest=DEST)
+ stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+
+ stdout, stderr = _add_files_and_push('git', DEST, user='bad',
+ passwd='name')
+
+ assert 'fatal: Authentication failed' in stderr
+
+ def test_push_back_to_wrong_url_hg(self):
+ DEST = _get_tmp_dir()
+ clone_url = _construct_url(HG_REPO, dest=DEST)
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+ stdout, stderr = _add_files_and_push('hg', DEST,
+ clone_url='http://127.0.0.1:5000/tmp',)
+
+ assert 'HTTP Error 404: Not Found' in stderr
+
+ def test_push_back_to_wrong_url_git(self):
+ DEST = _get_tmp_dir()
+ clone_url = _construct_url(GIT_REPO, dest=DEST)
+ stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+
+ stdout, stderr = _add_files_and_push('git', DEST,
+ clone_url='http://127.0.0.1:5000/tmp',)
+
+ assert 'not found:' in stderr
+
+ def test_clone_and_create_lock_hg(self):
+ # enable locking
+ r = Repository.get_by_repo_name(HG_REPO)
+ r.enable_locking = True
+ Session().add(r)
+ Session().commit()
+ # clone
+ clone_url = _construct_url(HG_REPO)
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+ #check if lock was made
+ r = Repository.get_by_repo_name(HG_REPO)
+ assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
+
+ def test_clone_and_create_lock_git(self):
+ # enable locking
+ r = Repository.get_by_repo_name(GIT_REPO)
+ r.enable_locking = True
+ Session().add(r)
+ Session().commit()
+ # clone
+ clone_url = _construct_url(GIT_REPO)
+ stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+
+ #check if lock was made
+ r = Repository.get_by_repo_name(GIT_REPO)
+ assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
+
+ def test_clone_after_repo_was_locked_hg(self):
+ #lock repo
+ r = Repository.get_by_repo_name(HG_REPO)
+ Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
+ #pull fails since repo is locked
+ clone_url = _construct_url(HG_REPO)
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+ msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
+ % (HG_REPO, TEST_USER_ADMIN_LOGIN))
+ assert msg in stderr
+
+ def test_clone_after_repo_was_locked_git(self):
+ #lock repo
+ r = Repository.get_by_repo_name(GIT_REPO)
+ Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
+ #pull fails since repo is locked
+ clone_url = _construct_url(GIT_REPO)
+ stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+ msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
+ % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
+ #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw
+ # back 423 to it, it makes ANOTHER request and we fail there with 405 :/
+ msg = "405 Method Not Allowed"
+ assert msg in stderr
+
+ def test_push_on_locked_repo_by_other_user_hg(self):
+ #clone some temp
+ DEST = _get_tmp_dir()
+ clone_url = _construct_url(HG_REPO, dest=DEST)
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+ #lock repo
+ r = Repository.get_by_repo_name(HG_REPO)
+ # let this user actually push !
+ RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
+ perm='repository.write')
+ Session().commit()
+ Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
+
+ #push fails repo is locked by other user !
+ stdout, stderr = _add_files_and_push('hg', DEST,
+ user=TEST_USER_REGULAR_LOGIN,
+ passwd=TEST_USER_REGULAR_PASS)
+ msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
+ % (HG_REPO, TEST_USER_ADMIN_LOGIN))
+ assert msg in stderr
+
+#TODO: fix me ! somehow during tests hooks don't get called on GIT
+# def test_push_on_locked_repo_by_other_user_git(self):
+# #clone some temp
+# DEST = _get_tmp_dir()
+# clone_url = _construct_url(GIT_REPO, dest=DEST)
+# stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+#
+# #lock repo
+# r = Repository.get_by_repo_name(GIT_REPO)
+# # let this user actually push !
+# RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
+# perm='repository.write')
+# Session().commit()
+# Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
+#
+# #push fails repo is locked by other user !
+# stdout, stderr = _add_files_and_push('git', DEST,
+# user=TEST_USER_REGULAR_LOGIN,
+# passwd=TEST_USER_REGULAR_PASS)
+# msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
+# % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
+# #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw
+# # back 423 to it, it makes ANOTHER request and we fail there with 405 :/
+# msg = "405 Method Not Allowed"
+# assert msg in stderr
+
+ def test_push_unlocks_repository_hg(self):
+ # enable locking
+ r = Repository.get_by_repo_name(HG_REPO)
+ r.enable_locking = True
+ Session().add(r)
+ Session().commit()
+ #clone some temp
+ DEST = _get_tmp_dir()
+ clone_url = _construct_url(HG_REPO, dest=DEST)
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+ #check for lock repo after clone
+ r = Repository.get_by_repo_name(HG_REPO)
+ assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
+
+ #push is ok and repo is now unlocked
+ stdout, stderr = _add_files_and_push('hg', DEST)
+ assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout
+ #we need to cleanup the Session Here !
+ Session.remove()
+ r = Repository.get_by_repo_name(HG_REPO)
+ assert r.locked == [None, None]
+
+#TODO: fix me ! somehow during tests hooks don't get called on GIT
+# def test_push_unlocks_repository_git(self):
+# # enable locking
+# r = Repository.get_by_repo_name(GIT_REPO)
+# r.enable_locking = True
+# Session().add(r)
+# Session().commit()
+# #clone some temp
+# DEST = _get_tmp_dir()
+# clone_url = _construct_url(GIT_REPO, dest=DEST)
+# stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+#
+# #check for lock repo after clone
+# r = Repository.get_by_repo_name(GIT_REPO)
+# assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
+#
+# #push is ok and repo is now unlocked
+# stdout, stderr = _add_files_and_push('git', DEST)
+# #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout
+# #we need to cleanup the Session Here !
+# Session.remove()
+# r = Repository.get_by_repo_name(GIT_REPO)
+# assert r.locked == [None, None]