1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """For API usage only, (for CLI use C{osh @FORK [ ... ]} syntax instead.)
19 """
20
21 import types
22
23 import osh.args
24 import osh.cluster
25 import osh.core
26 import osh.error
27 import osh.function
28 import osh.oshthread
29 import osh.spawn
30 import osh.util
31 import merge
32
33 LineOutputConsumer = osh.spawn.LineOutputConsumer
34 ObjectInputProvider = osh.spawn.ObjectInputProvider
35 ObjectOutputConsumer = osh.spawn.ObjectOutputConsumer
36 Spawn = osh.spawn.Spawn
37 Option = osh.args.Option
38 create_function = osh.function._create_function
39
40
43
44
45 -def fork(threadgen, command, merge_key = None):
46 """Creates threads and executes C{command} on each. The number of threads is determined
47 by C{threadgen}. If C{threadgen} is an integer, then the specified number of threads is created,
48 and each thread has an integer label, from 0 through C{threadgen} - 1. If C{threadgen} is
49 a sequence, then for each element in the sequence, a thread is created, labelled with that
50 element. If C{threadgen} is a function, then it is evaluated, and is expected to yield an
51 integer or sequence, which is then handled as already described. If C{threadgen} is
52 a cluster specification, then the command is executed on each specified host; the thread label
53 identifies the host, (whose type is C{osh.cluster.Host}). If C{merge_key} is specified, then
54 the inputs of each thread are expected to be ordered by the C{merge_key}. The sequences
55 from the threads
56 are then merged into a single sequence using the C{merge_key}.
57 """
58 import osh.apiparser
59 op = _Fork()
60 if isinstance(command, osh.core.Op):
61 command = [command]
62 pipeline = osh.apiparser._sequence_op(command)
63 if merge_key:
64 return op.process_args(threadgen, pipeline, merge_key)
65 else:
66 return op.process_args(threadgen, pipeline)
67
68 -class _Fork(osh.core.Generator):
69
70
71
72 _threads = None
73 _pipeline = None
74 _merge_key = None
75 _function_store = None
76 _cluster_required = None
77
78
79
81 osh.core.Generator.__init__(self, '', (2, 3))
82 self._function_store = FunctionStore()
83 self._cluster_required = False
84
85
86
87
90
92 args = self.args()
93 threadgen = args.next()
94 self._pipeline = args.next()
95 self._merge_key = args.next()
96 cluster, thread_ids = self.thread_ids(threadgen)
97 self.setup_pipeline(cluster)
98 self.setup_threads(thread_ids)
99 self.setup_shared_state()
100
102 for thread in self._threads:
103 thread.pipeline.receive_complete()
104
105
106
107
109 for thread in self._threads:
110 thread.pipeline.setup()
111 thread.pipeline.set_receiver(self._receiver)
112 thread.start()
113 for thread in self._threads:
114 while thread.isAlive():
115 thread.join(0.1)
116 thread_termination = thread.terminating_exception
117 if thread_termination:
118 osh.error.exception_handler(thread_termination, self, None, thread)
119
120
121
123 self._cluster_required = required
124
125
126
127 - def thread_ids(self, threadgen, already_evaled = False):
128 threadgen_type = type(threadgen)
129 try:
130 cluster = None
131 thread_ids = None
132 if threadgen_type in (list, tuple):
133 thread_ids = threadgen
134 elif isinstance(threadgen, int):
135 thread_ids = range(threadgen)
136 elif threadgen.isdigit():
137 thread_ids = range(int(threadgen))
138 elif threadgen_type is types.FunctionType:
139 if already_evaled:
140 self.usage()
141 else:
142 cluster, thread_ids = self.thread_ids(create_function(threadgen)(), True)
143 else:
144
145 cluster_name, pattern = (threadgen.split(':') + [None])[:2]
146 cluster = osh.cluster.cluster_named(cluster_name, pattern)
147 if cluster:
148 thread_ids = cluster.hosts
149 else:
150 evaled_threadgen = create_function(threadgen)()
151 cluster, thread_ids = self.thread_ids(evaled_threadgen, True)
152 if self._cluster_required and cluster is None:
153
154 import remote
155 self.usage(remote.__doc__)
156 if thread_ids is None:
157 self.usage()
158 return cluster, thread_ids
159 except:
160 self.usage()
161
163 if cluster and not self._pipeline.run_local():
164 remote_op = _Remote()
165 remote_op.process_args(self._pipeline)
166 self._pipeline = osh.core.Pipeline()
167 self._pipeline.append_op(remote_op)
168 self._pipeline.append_op(_AttachThreadState())
169 self._pipeline.append_op(merge.merge(self._merge_key))
170
172 pipeline_copier = _PipelineCopier(self)
173
174 self._function_store.hide_functions(self._pipeline)
175 threads = []
176 for thread_id in thread_ids:
177 pipeline_copy = pipeline_copier.pipeline(thread_id)
178 thread = osh.oshthread._OshThread(self, thread_id, pipeline_copy)
179 threads.append(thread)
180 self._function_store.restore_functions(self._pipeline)
181 self._threads = threads
182
184
185
186
187 pipeline_copy_iterators = [thread.pipeline.ops() for thread in self._threads]
188 for pipeline_template_op in self._pipeline.ops():
189 command_state = pipeline_template_op.create_command_state(self._threads)
190 for pipeline_copy_iterator in pipeline_copy_iterators:
191 pipeline_copy_op = pipeline_copy_iterator.next()
192 pipeline_copy_op.set_command_state(command_state)
193
195
196 _fork = None
197
200
202 copy = osh.util.clone(self._fork._pipeline)
203 self._fork._function_store.restore_functions(copy)
204 return copy
205
207
208 _thread_state = None
209
212
214 self._thread_state = (self.thread_state,)
215
217 if type(object) is list:
218 object = tuple(object)
219 self.send(self._thread_state + object)
220
221
222
223
224
225
226
227
228
229
230
231
234
236
237 _functions = None
238
241
242
243
245 pipeline.replace_function_by_reference(self)
246
248 pipeline.restore_function(self)
249
250
251
259
265
266
267
268 _REMOTE_EXECUTABLE = 'remoteosh'
269
270 -def _dump(stream, object):
272
274 if isinstance(object, osh.error.PickleableException):
275 exception = object.recreate_exception()
276 osh.error.exception_handler(exception, object.command_description(), object.input(), threadid)
277 else:
278 consumer.send(object)
279
281
282
283
284
285 if '[Errno 9] Bad file descriptor' not in line:
286 osh.error.stderr_handler(line, consumer, None, threadid)
287
289
290
291
292 _pipeline = None
293
294
295
298
299
300
303
305 self._pipeline = self.args().next()
306
307
308
310 host = self.thread_state
311 process = Spawn(
312 self._remote_command(host.address, host.user, host.identity, host.db_profile),
313 ObjectInputProvider(lambda stream, object: _dump(stream, object),
314 [osh.core.verbosity, self._pipeline, self.thread_state]),
315 ObjectOutputConsumer(lambda object: _consume_remote_stdout(self, host, object)),
316 LineOutputConsumer(lambda line: _consume_remote_stderr(self, host, line)))
317 process.run()
318 if process.terminating_exception():
319 raise process.terminating_exception()
320
321
322
324 buffer = [_REMOTE_EXECUTABLE]
325 if db_profile:
326 buffer.append(db_profile)
327 remote_command = ' '.join(buffer)
328 if identity:
329 ssh_command = 'ssh %s -l %s -i %s %s' % (host,
330 user,
331 identity,
332 remote_command)
333 else:
334 ssh_command = 'ssh %s -l %s %s' % (host,
335 user,
336 remote_command)
337 return ssh_command
338