|
"""Simple load simulator for Kiln servers (version 2.9.X)
This script has not been thoroughly tested and has only basic error handling!
Please use with caution, particularly the commands which write data (--pushnew and --branch)
Usage:
simulate_load.py (-h | --help)
simulate_load.py --kiln=<serverurl> --accesstoken=<token> --clone [--ixrepo=<ixrepo>] [--keep=<destpath>]
simulate_load.py --kiln=<serverurl> --accesstoken=<token> --fbtoken=<token> --pushnew [--ixrepo=<ixrepo> | --local=<repopath>]
simulate_load.py --kiln=<serverurl> --accesstoken=<token> --fbtoken=<token> --branch [--ixrepo=<ixrepo>]
simulate_load.py --kiln=<serverurl> --fbtoken=<token> --viewhistory [--ixrepo=<ixrepo>]
simulate_load.py --kiln=<serverurl> --fbtoken=<token> --viewfiles [--ixrepo=<ixrepo>]
simulate_load.py --kiln=<serverurl> --fbtoken=<token> --downloadzip [--ixrepo=<ixrepo>]
Options:
-h --help Show this screen.
--kiln=<serverurl> The URL of the Kiln server (e.g. https://fogbugz.company.com/kiln).
--accesstoken=<token> An Access Token with write permissions to the Kiln Server.
--clone Clone a repository.
--ixrepo=<ixrepo> The ixRepo value of the repository you want to operate on, defaults to a random repo.
--keep=<destpath> The path where you will store the repository being cloned. If this parameter is absent the cloned repo won't be saved.
--pushnew Create a new repository in Kiln and push code into it. If using --ixRepo: clone, push to new repo, delete local copy.
--local=<repopath> Use code from this local source for the push.
--branch Use the Kiln API to create a Kiln Branch of a repository.
--fbtoken=<token> The fbToken used to authenticate standard HTTP requests.
--viewhistory Load the history page for a repsitory.
--viewfiles Load the file page for a repository.
--downloadzip Download a repository as a zip file, discarding the results.
"""
from docopt import docopt
import urllib2
from urllib import urlencode
from random import choice
import json
import subprocess
import tempfile
import os, sys, shutil
import time
class KilnLoadSimulator2000:
def __init__(self, url, accessToken=None, fbToken=None, ixRepo=None):
if not url.endswith('/'):
url += '/'
self._url = url
self._apiurl = url + 'api/1.0/'
self._totalSec = -1
self._repos = {}
if accessToken:
# For Kiln API calls, we can use the accessToken in place of the fbToken.
self._accessToken = accessToken
self._fbToken = accessToken
if fbToken:
self._fbToken = fbToken
if (not ixRepo):
self._ixRepo = None
self._userSetRepo = False
else:
self._userSetRepo = True
self._ixRepo = int(ixRepo)
def getUserSetIxRepo(self):
if self._userSetRepo:
return self._ixRepo
else:
return None
def loadAllRepos(self):
if (not self._repos):
self._repos = {}
response = self.slurp("%s/Project" % self._apiurl, {'token':self._fbToken})
#print response
for project in response:
for group in project['repoGroups'] :
for repo in [x for x in group['repos'] if x['sStatus'].lower() == 'good']:
ixRepo = repo['ixRepo']
repoUrl = self._url + "code/" + repo['sProjectSlug'] +"/"+ repo['sGroupSlug'] +"/"+ repo['sSlug']
self._repos[ixRepo] = repoUrl
# If the ixRepo wasn't set by command parameters, pick a random one
if (not self._ixRepo):
self._ixRepo = choice(self._repos.keys())
def forceNewRandomIxRepo(self):
if (self._userSetRepo):
return False; # Don't override the user-set ixRepo
if (not self._repos):
self.loadAllRepos()
self._ixRepo = choice(self._repos.keys())
return True
def isRepoEmpty(self, ixRepo):
if not ixRepo:
return True # Should probably throw an error here...
# Get the changeset history of the repo, check for anything
repoHistory = self.slurp("%s/Repo/%s/History" % (self._apiurl, ixRepo,), {'token':self._fbToken})
return repoHistory == []
def getRepoUrl(self, ixRepo=None):
if (not self._repos):
self.loadAllRepos()
if ixRepo:
return self._repos[ixRepo]
return self._repos[self._ixRepo]
# Will always return an ixRepo, either the one set by the user or a random one.
# Once a random ixRepo is chosen, it will not change and will function as if it was set by
# the user
def getIxRepo(self):
if (not self._repos):
self.loadAllRepos()
return self._ixRepo
def getAccessToken(self):
return self._accessToken
def slurp(self, url, params={}, post=False, raw=False):
params = urlencode(params, doseq=True)
handle = urllib2.urlopen(url, params) if post else urllib2.urlopen(url + '?' + params)
content = handle.read()
obj = content if raw else json.loads(content)
handle.close()
if (not raw and 'errors' in obj):
for err in obj['errors']:
print "\nERROR! - %s \n" % err['sError']
return obj
def getFirstIxGroup(self):
response = self.slurp("%s/Project" % self._apiurl, {'token':self._fbToken})
return response[0]['repoGroups'][0]['ixRepoGroup']
def printOperationTime(self):
print "\n == Total operation time: %s sec" % self._totalSec
def waitForRepo(self, ixRepo):
print ' == Waiting for repo to be ready (1 sec intervals)...'
status = 'new'
while status != 'good':
repo = self.slurp("%s/Repo/%s" % (self._apiurl, ixRepo,), {'token':self._fbToken})
status = repo['sStatus']
time.sleep(1)
print '.'
def clone(self, destPath):
deleteAfter = False
if (destPath):
dest = destPath
if (os.listdir(dest)):
print "Clone destination (%s) is not empty, please provide a different path." % dest
sys.exit(1)
else:
dest = tempfile.mkdtemp()
deleteAfter = True;
repoUrl = kilnTest.getRepoUrl().replace('://', '://%s:password@' % (kilnTest.getAccessToken(),) )
print "\n == Cloning repo now:"
print " == " + " ".join(['hg', 'clone', repoUrl, dest])
t_start = time.clock()
subprocess.call(['hg', 'clone', repoUrl, dest])
self._totalSec = time.clock() - t_start
if (deleteAfter):
print "\n == Deleting local clone"
shutil.rmtree(dest)
def makeNewRepo(self, name='load-test-repo'):
if (self._ixRepo):
repo = self.slurp("%s/Repo/%s" % (self._apiurl, self._ixRepo,), {'token':self._fbToken})
ixGroup = repo['ixRepoGroup']
else:
ixGroup = self.getFirstIxGroup() # don't use a random repo group, that would make the new repos hard to find!
newRepo = self.slurp("%s/Repo/Create" % (self._apiurl,),
{'token':self._fbToken,
'sName':name,
'ixRepoGroup':ixGroup
}, post=True)
#print newRepo
newIxRepo = newRepo['ixRepo']
# Add it to the internal list
repoUrl = self._url + "code/" + newRepo['sProjectSlug'] +"/"+ newRepo['sGroupSlug'] +"/"+ newRepo['sSlug']
self._repos[newIxRepo] = repoUrl
self.waitForRepo(newIxRepo)
return newIxRepo
def pushRepo(self, sourceDir, targetIxRepo):
prevDir = os.getcwd()
os.chdir(sourceDir)
repoUrl = self.getRepoUrl(targetIxRepo).replace('://', '://%s:password@' % (kilnTest.getAccessToken(),) )
print "\n == Pushing repo now:"
print " == " + " ".join(['hg', 'push', repoUrl])
t_start = time.clock()
subprocess.call(['hg', 'push', repoUrl])
self._totalSec = time.clock() - t_start
os.chdir(prevDir)
def createKilnBranch(self):
ixRepo = self.getIxRepo() # this will get use the ixRepo set by command parameter, or the random one
if (self.isRepoEmpty(ixRepo)):
if (self._userSetRepo): # user repo was empty, can't branch it! fail gracefully
print ' == Cannot branch empty repo provided by user with ixRepo = %s' % ixRepo
print ' == Exiting'
sys.exit(1)
# Okay, it was a random repo, but it was empty. Just get a new random one!
while True:
self.forceNewRandomIxRepo() # try a new one
ixRepo = self.getIxRepo()
if (not self.isRepoEmpty(ixRepo)): # until we get a non-empty repo
break
print ' == Branching ixRepo %s' % ixRepo
print ' == %s' % self.getRepoUrl(ixRepo)
repo = self.slurp('%s/Repo/%s' % (self._apiurl, ixRepo,), {'token':self._fbToken})
ixGroup = repo['ixRepoGroup']
name = repo['sName']
name = name + '-branch-' + randomStr(6)
# We need to measure from the request to when the repo becomes ready in the backend
t_start = time.clock()
branchRepo = self.slurp('%s/Repo/Create' % (self._apiurl,),
{'token':self._fbToken,
'sName':name,
'ixRepoGroup':ixGroup,
'ixParent':ixRepo
}, post=True)
#print branchRepo
newIxRepo = branchRepo['ixRepo']
# Add the new repo to the internal list ( so we can call things like getRepoUrl() )
repoUrl = self._url + 'code/' + branchRepo['sProjectSlug'] +'/'+ branchRepo['sGroupSlug'] +'/'+ branchRepo['sSlug']
self._repos[newIxRepo] = repoUrl
print ' == Created new repo, ixRepo: %s' % newIxRepo
print ' == %s' % kilnTest.getRepoUrl(newIxRepo)
self.waitForRepo(newIxRepo)
# Stop the timer
self._totalSec = time.clock() - t_start
return newIxRepo
# By default will load the history view. Change subPath to '/Files' to load the files page
# Also uses the "Log Off" link to confirm the result is a valid page
def viewRepoPage(self, subPath='', successString='/Auth/LogOff'):
ixRepo = self.getIxRepo()
repoUrl = self.getRepoUrl()
print ' == Getting repo page for ixRepo %s' % ixRepo
print ' == URL: %s\n' % (repoUrl + subPath)
t_start = time.clock()
history = kilnTest.slurp(repoUrl + subPath, {'token':self._fbToken }, raw=True )
self._totalSec = time.clock() - t_start
if (successString in history):
print ' == Success!'
else:
print ' == Error, unexpected HTML page!\n\n'
print history
def downloadRepoZip(self):
# Equivalent curl command:
# curl repoUrl + '/Archive?rev=tip&type=zip&node=tip' -H 'Cookie: fbToken=xxx;' --X post [--compressed]
ixRepo = self.getIxRepo() # this will get use the ixRepo set by command parameter, or the random one
if (self.isRepoEmpty(ixRepo)):
if (self._userSetRepo): # user repo was empty, but that isn't useful! fail gracefully
print ' == Will not download empty repo with ixRepo = %s' % ixRepo
print ' == Exiting'
sys.exit(1)
# Okay, it was a random repo, but it was empty. Just get a new random one.
while True:
self.forceNewRandomIxRepo() # try a new one
ixRepo = self.getIxRepo()
if (not self.isRepoEmpty(ixRepo)): # until we get a non-empty repo
break
repoUrl = self.getRepoUrl()
archiveUrl = repoUrl + '/Archive?rev=tip&type=zip&node=tip'
print ' == Getting zip archive for ixRepo %s' % ixRepo
print ' == URL: %s\n' % archiveUrl
t_start = time.clock()
zipBinary = kilnTest.slurp(archiveUrl, {'token':self._fbToken }, post=True, raw=True )
self._totalSec = time.clock() - t_start
#print zipBinary # this print gets super messy, it's just raw binary data
if ('html' not in zipBinary):
print ' == Success!'
else:
print ' == Error, unexpected HTML page!\n\n'
print zipBinary
#Helper functions
def randomStr(length):
return ''.join(choice('0123456789abcdef') for x in range(length))
if __name__ == '__main__':
args = docopt(__doc__)
#print(args)
kilnTest = KilnLoadSimulator2000(args['--kiln'], args['--accesstoken'], args['--fbtoken'], args['--ixrepo'])
##### CLONE REPO #####
if (args['--clone']):
kilnTest.clone(args['--keep'])
kilnTest.printOperationTime()
##### PUSH NEW REPO ##### -- Measures only the time for the push operation to complete
elif (args['--pushnew']):
if (args['--local']):
tempDir = args['--local']
tempName = os.path.basename(tempDir) + "-" + randomStr(6)
else:
tempDir = tempfile.mkdtemp()
kilnTest.clone(tempDir)
tempName = os.path.basename(tempDir)
targetIxRepo = kilnTest.makeNewRepo(tempName)
kilnTest.pushRepo(tempDir, targetIxRepo)
kilnTest.printOperationTime()
print "\n == Deleting local clone"
shutil.rmtree(tempDir)
##### CREATE KILN BRANCH #####
elif (args['--branch']):
kilnTest.createKilnBranch()
kilnTest.printOperationTime()
##### VIEW HISTORY #####
elif (args['--viewhistory']):
kilnTest.viewRepoPage()
kilnTest.printOperationTime()
##### VIEW FILE LIST #####
elif (args['--viewfiles']):
kilnTest.viewRepoPage(subPath='/Files')
kilnTest.printOperationTime()
##### DOWNLOAD REPO ARCHIVE #####
elif (args['--downloadzip']):
kilnTest.downloadRepoZip()
kilnTest.printOperationTime()
|
Loading...