Kiln » Unity3D Unity 3D's proposed fixes and extensions to Kiln BFiles
Clone URL:  
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import sys import os import stat import subprocess import hgtest # assumes we're run by hg's run-tests.py TESTDIR = os.environ['TESTDIR'] STOREDIR = os.path.join(os.getcwd(), 'store') DEFAULTRC = { 'extensions': [('kbfiles', '%s/../kbfiles' % TESTDIR), ('rebase', '')], 'kilnbfiles': [('systemcache', os.path.join(os.getcwd(), 'bfilesstore')),], } class BfilesTester(hgtest.Tester): def updaterc(self, extraconfig=None): '''Append configuration settings to $HGRCPATH to configure bfiles for the current run, plus any additional settings in extraconfig. extraconfig, if supplied, must be a mapping from section name to list of (key, value) pairs -- the same structure as DEFAULTRC.''' config = DEFAULTRC.copy() if extraconfig: for (section, values) in extraconfig.iteritems(): if section in config: config[section] = list(config[section]) + values else: config[section] = values output = [] for section in sorted(config): entries = config[section] output.append('[%s]' % section) for entry in entries: assert len(entry) == 2, \ 'a config entry must be an iterable with two items' output.append('%s = %s' % tuple(entry)) self.writerc('\n'.join(output) + '\n') def walk(self, dir, prune=None, dirs=False): sep = os.path.sep dir = dir.replace('/', sep) result = [] prefix = len(dir + sep) for (dirpath, dirnames, filenames) in os.walk(dir): dirpath = dirpath[prefix:] # trim common leading dir name if dirs and dirpath != '': result.append(dirpath) if prune: dirnames[:] = [name for name in dirnames if name not in prune] dirnames.sort() filenames.sort() result.extend([os.path.join(dirpath, name) for name in filenames]) return [fn.replace(sep, '/') for fn in result] def assertadmin(self, expect): actual = self.walk('.hg/bfiles', prune=['pending', 'committed']) self.assertequals(expect, actual, 'admin files') def assertpending(self, expect): actual = self.walk('.hg/bfiles/pending', dirs=True) self.assertequals(expect, actual, 'pending files/dirs') def assertcommitted(self, expect): actual = self.walk('.hg/bfiles/committed', dirs=True) self.assertequals(expect, actual, 'committed files/dirs') def assertstandins(self, expect): actual = self.walk('.hgbfiles') self.assertequals(expect, actual, 'standin files') def assertstore(self, expect): actual = self.walk(STOREDIR, dirs=True) self.assertequals(expect, actual, 'store contents') def assertdirstate(self, expect): actual = self.walk('.') def excluded(path): return path.startswith('./.hg/') or path.startswith('./.hgbfiles') actual = [x for x in actual if not excluded(x)] self.assertequals(expect, actual, 'working files') def assertfile(self, file): self.asserttrue(os.path.isfile(file), 'file %s does not exist' % file) def assertfilegone(self, file): self.assertfalse(os.path.isfile(file) or os.path.exists(file), 'file %s exists (should be gone)' % file) def assertdirgone(self, dir): self.assertfalse(os.path.isdir(dir) or os.path.exists(dir), 'dir %s exists (should be gone)' % dir) def isexec(self, filename): return (os.stat(filename).st_mode & stat.S_IXUSR) != 0 def assertexec(self, filename): if os.name != 'posix': return self.asserttrue(self.isexec(filename), '%s is not executable' % filename) def assertnotexec(self, filename): self.assertfalse(self.isexec(filename), '%s is executable' % filename) def asserthashes(self): '''assert that all big file hashes are consistent: - contents of .hgbfiles/x - contents of .hg/bfiles/latest/x - hash of x ''' sep = os.path.sep latestdir = os.path.join('.hg', 'bfiles', 'latest') for bfile in self.walk('.hgbfiles'): fhash = sha1(bfile) shash = self.readfile(os.path.join('.hgbfiles', bfile))[0:40] lhash = self.readfile(os.path.join(latestdir, bfile))[0:40] assert shash, "nothing read from file %s" % standin assert lhash, "nothing read from file %s" % lfile self.asserttrue( shash == lhash == fhash, "%s: inconsistent hashes:\n" " standin: %s\n" " latest: %s\n" " file: %s" % (bfile, shash, lhash, fhash)) def createstore(self, name): store_tar = self.tjoin(name + '.tar') untar(store_tar) def sshstore(self): user = os.environ.get('LOGNAME', os.environ.get('USER')) return "ssh://%s@localhost/%s" % (user, STOREDIR) # This deliberately does not use mercurial.util.sha1() -- don't want to # depend on code that we might be testing. try: import hashlib # Python >= 2.5 _sha1 = hashlib.sha1 except ImportError: import sha # Python 2.4 _sha1 = sha.new def sha1(filename): '''Compute and return the SHA-1 hash of the specified file.''' # XXX assumes the file is small enough to read into memory digester = _sha1() f = open(filename) try: digester.update(f.read()) return digester.hexdigest() finally: f.close() def untar(filename, path='.'): '''Untar the specified tar file in path (current directory by default).''' import tarfile tf = tarfile.TarFile(filename) for info in tf: tf.extract(info, path) tf.close() def checkdirs(patha, pathb): filesa = [] for (dir, dirnames, filenames) in os.walk(patha): for f in filenames: filename = os.path.join(dir,f)[len(patha)+1:] if not filename.startswith('.hg') and not filename.startswith('.kbf'): filesa.append(filename) filesb = [] for (dir, dirnames, filenames) in os.walk(pathb): for f in filenames: filename = os.path.join(dir,f)[len(pathb)+1:] if not filename.startswith('.hg') and not filename.startswith('.kbf'): filesb.append(filename) filesa.sort() filesb.sort() for (fa, fb) in zip(filesa, filesb): if fa != fb: return False sys.exit() fda = open(os.path.join(patha, fa), 'rb').read() fdb = open(os.path.join(pathb, fb), 'rb').read() if fda != fdb: return False if os.stat(os.path.join(patha, fa)).st_mode != os.stat(os.path.join(pathb, fb)).st_mode: return False return True def checkrepos(hgt, repo1, repo2, revs): for i in revs: hgt.hg(['up', '-R', repo1, '-r', str(i), '-C'], stdout=hgt.ANYTHING, log=False) hgt.hg(['up', '-R', repo2, '-r', str(i), '-C'], stdout=hgt.ANYTHING, log=False) hgt.asserttrue(checkdirs(repo1, repo2), 'repos dont match at %d' % i) hgt.hg(['up', '-R', repo1, '-C'], stdout=hgt.ANYTHING, log=False) hgt.hg(['up', '-R', repo2, '-C'], stdout=hgt.ANYTHING, log=False)