|
"""
Experimental testing framework for Mercurial and its extensions.
The goal is to allow tests to be Python scripts rather than shell
scripts. They should be:
* more portable
* possibly faster
* much easier to understand and modify
* somewhat easier to write
"""
import argparse
import sys
import os
import re
import atexit
import subprocess
import traceback
import shutil
import getpass
import json
import time
from urllib import urlencode
import urllib2
from custom import *
ANYTHING = re.compile(r'')
CURDIR = os.path.dirname(os.path.realpath(__file__))
TESTDIR = os.path.join(CURDIR, "tmp")
EXTDIR = os.path.dirname(CURDIR)
HGARGS = [
"--config",
"extensions.kiln=" + os.path.join(EXTDIR, "kiln.py"),
"--config",
"extensions.kilnauth=" + os.path.join(EXTDIR, "kilnauth.py"),
"--config",
"extensions.big-push=" + os.path.join(EXTDIR, "big-push.py")
]
# Ensure that the home directory is set appropriately so that the kilnauth
# cookies will be found. This is important because Mercurial 1.9 and later
# changes the home directory in the test script.
os.environ['HOME'] = os.path.expanduser('~' + getpass.getuser());
class Failure(Exception):
pass
def parse_args():
parser = argparse.ArgumentParser(description='Test the hg command line')
parser.add_argument('--user', default=USER, dest='user')
parser.add_argument('--password', default=PASSWORD, dest='password')
parser.add_argument('--kilnurl', default=KILNURL, dest='kilnurl')
return parser.parse_args()
class Tester(object):
"""
Default test framework. Is fairly verbose: writes each announcement
and each hg command to stdout.
"""
ANYTHING = ANYTHING
def __init__(self, auth = True):
"""
auth: whether the tester should automatically supply the username
and password to hg. Defaults to true.
"""
self.stdout = sys.stdout
self.failures = []
atexit.register(self._atexit)
self.firstannounce = True
self._clear_test_dir()
self.auth = auth
self.s_project = None
self.ix_group = None
args = parse_args()
self.kilnurl = args.kilnurl
self.user = args.user
self.password = args.password
self.auth_args = [
"--config", "auth.test.prefix=" + self.kilnurl,
"--config", "auth.test.username=" + self.user,
"--config", "auth.test.password=" + self.password
]
os.chdir(TESTDIR)
def init_project(self, token):
if self.s_project is not None:
return
self.s_project = "ExtTest-"+str(int(time.time()))
proj = self.slurp(self.api("Project/Create"), { "sName": self.s_project, "token": token }, True)
ixProject = proj["ixProject"]
group = self.slurp(self.api("RepoGroup/Create"), { "sName": "Test", "ixProject": ixProject, "token": token}, True)
self.ix_group = group["ixRepoGroup"]
def test_url(self):
return self.kilnurl + "/Code/" + self.s_project + "/Test/Test"
def test_branch_url(self):
return self.kilnurl + "/Code/" + self.s_project + "/Test/TestBranch"
def api(self, url):
return self.kilnurl + '/api/1.0/' + url
def slurp(self, url, params={}, post=False, raw=False):
params = urlencode(params, doseq=True)
handle = urllib2.urlopen(url, params) if post else urllib2.urlopen(url + '?' + params)
content = handle.read()
obj = content if raw else json.loads(content)
handle.close()
return obj
def gettoken(self):
return self.slurp(self.api('Auth/Login'), dict(sUser=self.user, sPassword=self.password))
def createtest(self, token):
self.init_project(token)
repo = self.slurp(self.api('Repo/Create'), dict(sName='Test', sDescription='test', ixRepoGroup=self.ix_group, sDefaultPermission='write', token=token))
ixRepo = repo['ixRepo']
self.asserttrue(isinstance(ixRepo, int), 'Create failed %s' % (str(ixRepo)))
time.sleep(1)
while True:
# work around a known bug in Kiln that returns non-JSON data for this
# API route so that we don't have "ignorable tests" in bfiles
try:
if self.slurp(self.api('Repo/%d' % ixRepo), dict(token=token))['sStatus'] == 'good':
break
except ValueError:
pass
time.sleep(0.1)
return (self.kilnurl + '/Code/Test/Test/Test', ixRepo)
def createtestbranch(self, token, ixParent):
self.init_project(token)
parent = self.slurp(self.api('Repo/%d' % ixParent), dict(token=token))
while parent['sStatus'] != 'good' or not parent['fHasChangesets']:
time.sleep(0.1)
parent = self.slurp(self.api('Repo/%d' % ixParent), dict(token=token))
repo = self.slurp(self.api('Repo/Create'), dict(sName='TestBranch', sDescription='test branch', ixRepoGroup=self.ix_group, ixParent=ixParent, fCentral=False, sDefaultPermission='write', token=token))
ixRepo = repo['ixRepo']
self.asserttrue(isinstance(ixRepo, int), 'Create failed %s' % (str(ixRepo)))
time.sleep(1)
while self.slurp(self.api('Repo/%d' % ixRepo), dict(token=token))['sStatus'] != 'good':
time.sleep(0.1)
return (self.kilnurl + '/Code/Test/Test/TestBranch', ixRepo)
def deletetest(self, token):
if self.s_project is None:
return
projects = self.slurp(self.api('Project'), dict(token=token))
found = False
foundbranch = False
for project in projects:
if project['sName'] == self.s_project:
ixProject = project['ixProject']
for group in project['repoGroups']:
if group['sName'] == 'Test':
ixRepoGroup = group['ixRepoGroup']
for repo in group['repos']:
if repo['sName'] == 'Test':
ixRepo = repo['ixRepo']
found = True
if repo['sName'] == 'TestBranch':
ixBranch = repo['ixRepo']
foundbranch = True
if foundbranch:
self.slurp(self.api('Repo/%d/Delete' % ixBranch), dict(token=token), post=True)
if found:
self.slurp(self.api('Repo/%d/Delete' % ixRepo), dict(token=token), post=True)
def _atexit(self):
if self.failures:
sys.stderr.write('%d failures\n' % len(self.failures))
sys.exit(1)
def _clear_test_dir(self):
if os.path.exists(TESTDIR):
shutil.rmtree(TESTDIR)
os.mkdir(TESTDIR)
def announce(self, msg):
if self.firstannounce:
msg = '% ' + msg + '\n'
self.firstannounce = False
else:
msg = '\n% ' + msg + '\n'
self.stdout.write(msg)
self.stdout.flush()
def hg(self, args, stdout='', stderr='', status=0, log=True, auth=False, stdin=None):
"""
Run an hg command and check that it output the specified text to
stdout and stderr and returned the specified status. stdout and
stderr may be strings for exact comparison or re pattern objects
for a regex comparison. The constant ANYTHING conveniently
matches any output, for when you don't care. Auth determines
whether the username and password will automatically be passed in
to hg. If either the auth argument or tester.auth is true, the
login info is passed.
"""
# XXX should use exact path to hg
# XXX set PYTHONPATH?
hgname = 'hg'
if auth or self.auth:
cmd = [hgname] + HGARGS + self.auth_args + args
else:
cmd = [hgname] + HGARGS + args
if log:
self._logcmd(['hg'] + args)
if stdin is None:
stdin_arg = None
else:
stdin_arg = subprocess.PIPE
child = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=stdin_arg,
universal_newlines=True)
actual_stdout, actual_stderr = child.communicate(input=stdin)
actual_status = child.returncode
if os.name == 'nt' and actual_status < 0:
# hack to match my Unix-centric expected status
# (Mercurial actually exits with status -1, and the signedness
# is preserved in Windows but wrapped around to 255 on Unix)
actual_status += 256
self.assertoutput(stdout, actual_stdout, 'stdout', cmd)
self.assertoutput(stderr, actual_stderr, 'stderr', cmd)
if isinstance(status, list):
self.asserttrue(actual_status in status, "Status doesn't match")
elif status == -1:
self.asserttrue(actual_status != 0, "Status should be non-zero, is 0")
else:
self.assertequals(status, actual_status)
self._failearly()
def tjoin(self, *path):
'''Return the path to filename in $TESTDIR.'''
return os.path.join(TESTDIR, *path)
def writefile(self, filename, content, mode='w'):
'''Write content to filename. By default, clobber the file and
write in text mode; override mode to append and/or write in
binary mode.'''
dirname = os.path.dirname(filename)
if dirname != '' and not os.path.exists(dirname):
os.makedirs(dirname)
f = open(filename, mode)
try:
f.write(content)
finally:
f.close()
def readfile(self, filename, mode='r'):
f = open(filename, mode)
try:
return f.read()
finally:
f.close()
def assertfalse(self, test, msg):
if test:
self._fail(msg)
def asserttrue(self, test, msg):
if not test:
self._fail(msg)
def assertequals(self, expect, actual, prefix=''):
if expect != actual:
if prefix:
prefix += ': '
if isinstance(expect, list):
msg = self._listfailure(expect, actual, prefix)
else:
msg = prefix + self._shortfailure(expect, actual)
self._fail(msg)
def expect_str(self, expect):
if isinstance(expect, list):
return "\n".join(map(lambda s: s+"*", expect))
else:
return str(expect)
def assertoutput(self, expect, actual, label, cmd):
'''Assert that the actual output (stdout or stderr) of cmd matches
the expected output.'''
filtered = self._filteroutput(actual)
if self._stringmatch(expect, filtered):
# silence is golden, so say nothing on success
return
msg = self._bannerbracket(
'cmd:',
str(cmd),
'%s %s:' % ('expected', label),
self.expect_str(expect),
'%s %s:' % ('actual (filtered)', label),
str(filtered))
self._fail(msg)
# -- Internal methods ----------------------------------------------
def _shortfailure(self, expect, actual):
return 'expected %r, but got %r' % (expect, actual)
def _listfailure(self, expect, actual, prefix):
return self._bannerbracket(
'%s%s:' % (prefix, 'expected'),
'\n'.join([' %r' % v for v in expect]) + '\n',
'%s%s:' % (prefix, 'actual'),
'\n'.join([' %r' % v for v in actual]) + '\n')
def _bannerbracket(self, *items):
banner = True
out = "\n"
for line in items:
if banner:
out += ('-- %s ' % line).ljust(60, '-')
banner = False
else:
out += line
banner = True
out += "\n"
out += '-'*60
return out
def _fail(self, msg):
self.stdout.write('FAIL: %s\n' % msg)
# print a stack trace up to the point where we entered this module
sys.stdout.write('failure context:\n')
stack = traceback.extract_stack()
modfile = sys.modules[__name__].__file__
modfile = re.sub(r'\.py[co]$', '.py', modfile)
while stack[-1][0] == modfile:
del stack[-1]
for line in traceback.format_list(stack):
sys.stdout.write(line)
self.failures.append(msg)
def _failearly(self):
if self.failures:
raise Failure()
def report(self):
if self.failures:
sys.exit("%i tests failed" % len(self.failures))
sys.exit()
def _stringmatch(self, expect, actual):
if isinstance(expect, list):
actual = actual.rstrip().split('\n')
if len(actual) != len(expect):
return False
for i in xrange(len(actual)):
if not actual[i].startswith(expect[i]):
return False
return True
else:
return expect.rstrip() == actual.rstrip()
unsafechars = re.compile(r'[^a-zA-Z0-9\-\_\+\.\/\=\:]')
def _logcmd(self, cmd):
vcmd = []
sep = os.path.sep
unix = (sep == '/')
for arg in cmd:
if not unix and sep in arg:
arg = arg.replace(sep, '/')
if self.unsafechars.search(arg):
arg = '\'' + arg.replace('\'', '\\\'') + '\''
val = TESTDIR.replace(sep, '/')
arg = arg.replace(val, '$TESTDIR')
vcmd.append(arg)
self.stdout.write(' '.join(vcmd) + '\n')
self.stdout.flush()
def _filteroutput(self, output):
'''Filter Mercurial output to match test expectations.
- convert local path separator to Unix
- replace occurences of TESTDIR with pseudo-variable
expansion
'''
sep = os.path.sep
unix = (sep == '/')
if not unix and sep in output:
output = output.replace(sep, '/')
val = TESTDIR.replace(sep, '/')
output = output.replace(val, '$TESTDIR')
return output
|
Loading...