1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import sys
19 import threading
20
21 import osh.args
22 import osh.core
23 import osh.priorityqueue
24
25
26
27
28
29 Option = osh.args.Option
30
31
34
35
37 """TBD
38 """
39 return _Merge().process_args(key)
40
42
43 _key = None
44
47
49 return _MergeState(oshthreads)
50
52 key_fcn = self.args().next_function()
53
54 if key_fcn:
55 self._key = lambda x: key_fcn(*x[1:])
56 else:
57 self._key = None
58
59 self.command_state().setup(self)
60
62
63 self.command_state().add(self.thread_state, object)
64
65
67
68 self.command_state().done(self.thread_state)
69
70
71 key = property(lambda self: self._key)
72
74
75 _oshthreads = None
76 _thread_to_input = None
77 _merger = None
78 _consumer = None
79
81 self._oshthreads = oshthreads
82 self._thread_to_source = {}
83 source = 0
84 for thread in oshthreads:
85 self._thread_to_source[thread.state] = source
86 source += 1
87
89 return 'mergestate<%s>' % id(self)
90
91 - def setup(self, merge_op):
92 if self._merger is None:
93 if merge_op._key:
94 self._merger = _PriorityQueueMerger(merge_op, len(self._oshthreads))
95 else:
96 self._merger = _VanillaMerger(merge_op, len(self._oshthreads))
97
98 - def add(self, threadid, object):
99 self._merger.add(self._thread_to_source[threadid], object)
100
101 - def done(self, threadid):
102 self._merger.done(self._thread_to_source[threadid])
103
105
106 _merge_op = None
107
109 self._merge_op = merge_op
110
111 - def add(self, source, object):
113
114 - def done(self, source):
116
118
119 _lock = None
120 _active_sources = None
121
122 - def __init__(self, merge_op, n_sources):
123 _Merger.__init__(self, merge_op)
124 self._lock = threading.RLock()
125 self._active_sources = n_sources
126
127 - def add(self, source, object):
128 self._merge_op.send(object)
129
130 - def done(self, source):
131 self._lock.acquire()
132 self._active_sources -= 1
133 if self._active_sources == 0:
134 self._merge_op.send_complete()
135 self._lock.release()
136
138
139 _priority_queue = None
140 _consumer = None
141 _n_sources = None
142 _n_done = None
143
144 - def __init__(self, merge_op, n_sources):
145 _Merger.__init__(self, merge_op)
146 self._priority_queue = osh.priorityqueue.PriorityQueue(merge_op.key, n_sources)
147 self._n_sources = n_sources
148 self._n_done = 0
149 self._consumer = _PriorityQueueConsumer(merge_op, self._priority_queue)
150 self._consumer.start()
151
152 - def add(self, source, object):
153
154 self._priority_queue.add(source, object)
155
156
157 - def done(self, source):
158
159 self._priority_queue.done(source)
160
161 self._n_done += 1
162 if self._n_done == self._n_sources:
163 while self._consumer.isAlive():
164 self._consumer.join(1.0)
165
167
168 _merge_op = None
169 _priority_queue = None
170
171 - def __init__(self, merge_op, priority_queue):
172 threading.Thread.__init__(self)
173 self._merge_op = merge_op
174 self._priority_queue = priority_queue
175 self.setDaemon(False)
176
178
179 merge = self._merge_op
180 for x in self._priority_queue:
181
182 merge.send(x)
183
184
185 merge.send_complete()
186