Package osh :: Package command :: Module merge
[frames] | no frames]

Source Code for Module osh.command.merge

  1  # osh 
  2  # Copyright (C) Jack Orenstein <jao@geophile.com> 
  3  # 
  4  # This program is free software; you can redistribute it and/or modify 
  5  # it under the terms of the GNU General Public License as published by 
  6  # the Free Software Foundation; either version 2 of the License, or 
  7  # (at your option) any later version. 
  8  # 
  9  # This program is distributed in the hope that it will be useful, 
 10  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 11  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 12  # GNU General Public License for more details. 
 13  # 
 14  # You should have received a copy of the GNU General Public License 
 15  # along with this program; if not, write to the Free Software 
 16  # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 
 17   
 18  import sys 
 19  import threading 
 20   
 21  import osh.args 
 22  import osh.core 
 23  import osh.priorityqueue 
 24   
 25  # import osh.trace 
 26  # osh.trace.on('/tmp/trace.txt') 
 27  # trace = osh.trace.trace 
 28   
 29  Option = osh.args.Option 
 30   
 31  # CLI 
32 -def _merge():
33 return _Merge()
34 35 # API
36 -def merge(key = 'x: x'):
37 """TBD 38 """ 39 return _Merge().process_args(key)
40
41 -class _Merge(osh.core.Op):
42 43 _key = None 44
45 - def __init__(self):
46 osh.core.Op.__init__(self, '', (1, 1))
47
48 - def create_command_state(self, oshthreads):
49 return _MergeState(oshthreads)
50
51 - def setup(self):
52 key_fcn = self.args().next_function() 53 # Merge key function must be applied to tuple without threadid (in position 0). 54 if key_fcn: 55 self._key = lambda x: key_fcn(*x[1:]) 56 else: 57 self._key = None 58 # Any thread's merge object can be used to initialize command state. 59 self.command_state().setup(self)
60
61 - def receive(self, object):
62 # trace('%s: receive %s' % (self, object)) 63 self.command_state().add(self.thread_state, object)
64 # trace('%s: receive %s done' % (self, object)) 65
66 - def receive_complete(self):
67 # trace('%s: receive_complete' % self) 68 self.command_state().done(self.thread_state)
69 # trace('%s: receive_complete done' % self) 70 71 key = property(lambda self: self._key)
72
73 -class _MergeState(osh.priorityqueue.PriorityQueue):
74 75 _oshthreads = None 76 _thread_to_input = None 77 _merger = None 78 _consumer = None 79
80 - def __init__(self, oshthreads):
81 self._oshthreads = oshthreads 82 self._thread_to_source = {} 83 source = 0 84 for thread in oshthreads: 85 self._thread_to_source[thread.state] = source 86 source += 1
87
88 - def __repr__(self):
89 return 'mergestate<%s>' % id(self)
90
91 - def setup(self, merge_op):
92 if self._merger is None: 93 if merge_op._key: 94 self._merger = _PriorityQueueMerger(merge_op, len(self._oshthreads)) 95 else: 96 self._merger = _VanillaMerger(merge_op, len(self._oshthreads))
97
98 - def add(self, threadid, object):
99 self._merger.add(self._thread_to_source[threadid], object)
100
101 - def done(self, threadid):
102 self._merger.done(self._thread_to_source[threadid])
103
104 -class _Merger(object):
105 106 _merge_op = None 107
108 - def __init__(self, merge_op):
109 self._merge_op = merge_op
110
111 - def add(self, source, object):
112 assert False
113
114 - def done(self, source):
115 assert False
116
117 -class _VanillaMerger(_Merger):
118 119 _lock = None 120 _active_sources = None 121
122 - def __init__(self, merge_op, n_sources):
123 _Merger.__init__(self, merge_op) 124 self._lock = threading.RLock() 125 self._active_sources = n_sources
126
127 - def add(self, source, object):
128 self._merge_op.send(object)
129
130 - def done(self, source):
131 self._lock.acquire() 132 self._active_sources -= 1 133 if self._active_sources == 0: 134 self._merge_op.send_complete() 135 self._lock.release()
136
137 -class _PriorityQueueMerger(_Merger):
138 139 _priority_queue = None 140 _consumer = None 141 _n_sources = None 142 _n_done = None 143
144 - def __init__(self, merge_op, n_sources):
145 _Merger.__init__(self, merge_op) 146 self._priority_queue = osh.priorityqueue.PriorityQueue(merge_op.key, n_sources) 147 self._n_sources = n_sources 148 self._n_done = 0 149 self._consumer = _PriorityQueueConsumer(merge_op, self._priority_queue) 150 self._consumer.start()
151
152 - def add(self, source, object):
153 # trace('%s: merge %s source %s, add %s' % (self, self._merge_op, source, object)) 154 self._priority_queue.add(source, object)
155 # trace('%s: merge %s source %s, add %s finished' % (self, self._merge_op, source, object)) 156
157 - def done(self, source):
158 # trace('%s: merge %s source %s, done' % (self, self._merge_op, source)) 159 self._priority_queue.done(source) 160 # trace('%s: merge %s source %s, done finished' % (self, self._merge_op, source)) 161 self._n_done += 1 162 if self._n_done == self._n_sources: 163 while self._consumer.isAlive(): 164 self._consumer.join(1.0)
165
166 -class _PriorityQueueConsumer(threading.Thread):
167 168 _merge_op = None 169 _priority_queue = None 170
171 - def __init__(self, merge_op, priority_queue):
172 threading.Thread.__init__(self) 173 self._merge_op = merge_op 174 self._priority_queue = priority_queue 175 self.setDaemon(False)
176
177 - def run(self):
178 # trace('%s: run' % self) 179 merge = self._merge_op 180 for x in self._priority_queue: 181 # trace('%s: send %s' % (self, x)) 182 merge.send(x) 183 # trace('%s: send %s done' % (self, x)) 184 # trace('%s: send complete' % self) 185 merge.send_complete()
186