Kiln » TortoiseHg » TortoiseHg
Clone URL:  
thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# thread.py - A seprated thread to run Mercurial command # # Copyright 2009 Steve Borho <steve@borho.org> # Copyright 2010 Yuki KODAMA <endflow.net@gmail.com> # # This software may be used and distributed according to the terms of the # GNU General Public License version 2, incorporated herein by reference. import os import Queue import time import urllib2 from PyQt4.QtCore import SIGNAL, pyqtSignal, QObject, QThread from PyQt4.QtGui import QMessageBox, QInputDialog, QLineEdit from mercurial import ui, util, error, dispatch from tortoisehg.util import thread2, hglib from tortoisehg.hgqt.i18n import _, localgettext from tortoisehg.hgqt import qtlib local = localgettext() SIG_OUTPUT = SIGNAL('output(PyQt_PyObject)') SIG_ERROR = SIGNAL('error(PyQt_PyObject)') SIG_INTERACT = SIGNAL('interact(PyQt_PyObject)') SIG_PROGRESS = SIGNAL('progress(PyQt_PyObject)') class QtUi(ui.ui): def __init__(self, src=None, responseq=None): super(QtUi, self).__init__(src) if src: self.sig = src.sig self.responseq = src.responseq else: self.sig = QObject() # dummy object to emit signals self.responseq = responseq self.setconfig('ui', 'interactive', 'on') self.setconfig('progress', 'disable', 'True') os.environ['TERM'] = 'dumb' qtlib.configstyles(self) def write(self, *args, **opts): if self._buffers: self._buffers[-1].extend([str(a) for a in args]) else: for a in args: data = DataWrapper((str(a), opts.get('label', ''))) self.sig.emit(SIG_OUTPUT, data) def write_err(self, *args, **opts): for a in args: data = DataWrapper((str(a), opts.get('label', 'ui.error'))) self.sig.emit(SIG_ERROR, data) def label(self, msg, label): return msg def flush(self): pass def prompt(self, msg, choices=None, default='y'): if not self.interactive(): return default try: # emit SIG_INTERACT signal data = DataWrapper((msg, False, choices, None)) self.sig.emit(SIG_INTERACT, data) # await response r = self.responseq.get(True) if r is None: raise EOFError if not r: return default if choices: # return char for Mercurial 1.3 choice = choices[r] return choice[choice.index('&')+1].lower() return r except EOFError: raise util.Abort(local._('response expected')) def promptchoice(self, msg, choices, default=0): if not self.interactive(): return default try: # emit SIG_INTERACT signal data = DataWrapper((msg, False, choices, default)) self.sig.emit(SIG_INTERACT, data) # await response r = self.responseq.get(True) if r is None: raise EOFError return r except EOFError: raise util.Abort(local._('response expected')) def getpass(self, prompt=_('password: '), default=None): # emit SIG_INTERACT signal data = DataWrapper((prompt, True, None, default)) self.sig.emit(SIG_INTERACT, data) # await response r = self.responseq.get(True) if r is None: raise util.Abort(local._('response expected')) return r def progress(self, topic, pos, item='', unit='', total=None): data = DataWrapper((topic, item, pos, total, unit)) self.sig.emit(SIG_PROGRESS, data) class DataWrapper(QObject): def __init__(self, data): super(DataWrapper, self).__init__(None) self.data = data class CmdThread(QThread): """Run an Mercurial command in a background thread, implies output is being sent to a rendered text buffer interactively and requests for feedback from Mercurial can be handled by the user via dialog windows. """ # (msg=str, label=str) [wrapped] outputReceived = pyqtSignal(DataWrapper) # msg=str [wrapped] errorReceived = pyqtSignal(DataWrapper) # (msg=str, password=bool, choices=tuple, default=str) [wrapped] # password: whether should be masked by asterisk chars # choices: tuple of choice strings interactReceived = pyqtSignal(DataWrapper) # (topic=str, item=str, pos=int, total=int, unit=str) [wrapped] progressReceived = pyqtSignal(DataWrapper) # result=int or None [wrapped] # result: None - command is incomplete, possibly exited with exception # 0 - command is finished successfully # others - return code of command commandFinished = pyqtSignal(DataWrapper) def __init__(self, cmdline, parent=None): super(QThread, self).__init__(None) self.cmdline = cmdline self.parent = parent self.ret = None self.abortbyuser = False self.responseq = Queue.Queue() self.ui = QtUi(responseq=self.responseq) # Re-emit all ui.sig's signals to CmdThread (self). # QSignalMapper doesn't help for this since our SIGNAL # parameters contain 'PyQt_PyObject' types. for name, sig in ((SIG_OUTPUT, self.outputReceived), (SIG_ERROR, self.errorReceived), (SIG_INTERACT, self.interactReceived), (SIG_PROGRESS, self.progressReceived)): def repeater(sig): # hide 'sig' local variable name return lambda data: sig.emit(data) self.connect(self.ui.sig, name, repeater(sig)) self.finished.connect(self.thread_finished) self.interactReceived.connect(self.interact_handler) def abort(self): if self.isRunning() and hasattr(self, 'thread_id'): self.abortbyuser = True thread2._async_raise(self.thread_id, KeyboardInterrupt) def thread_finished(self): self.commandFinished.emit(DataWrapper(self.ret)) def interact_handler(self, wrapper): prompt, password, choices, default = wrapper.data prompt = hglib.tounicode(prompt) if choices: dlg = QMessageBox(QMessageBox.Question, _('TortoiseHg Prompt'), prompt, QMessageBox.Yes | QMessageBox.Cancel, self.parent) dlg.setDefaultButton(QMessageBox.Cancel) rmap = {} for index, choice in enumerate(choices): button = dlg.addButton(hglib.tounicode(choice), QMessageBox.ActionRole) rmap[id(button)] = index dlg.exec_() button = dlg.clickedButton() if button is 0: result = default else: result = rmap[id(button)] self.responseq.put(result) else: mode = password and QLineEdit.Password \ or QLineEdit.Normal text, ok = QInputDialog().getText(self.parent, _('TortoiseHg Prompt'), prompt, mode) self.responseq.put(ok and text or None) def run(self): # save thread id in order to terminate by KeyboardInterrupt self.thread_id = int(QThread.currentThreadId()) try: for k, v in self.ui.configitems('defaults'): self.ui.setconfig('defaults', k, '') self.ret = dispatch._dispatch(self.ui, self.cmdline) or 0 except util.Abort, e: self.ui.write_err(local._('abort: ') + str(e) + '\n') except (error.RepoError, urllib2.HTTPError), e: self.ui.write_err(str(e) + '\n') except (Exception, OSError, IOError), e: self.ui.write_err(str(e) + '\n') except KeyboardInterrupt: pass if self.ret is None: if self.abortbyuser: msg = _('[command terminated by user %s]') else: msg = _('[command interrupted %s]') elif self.ret: msg = _('[command returned code %d %%s]') % int(ret) else: msg = _('[command completed successfully %s]') self.ui.write(msg % time.asctime() + '\n', label='control')