Package osh :: Package command :: Module agg
[frames] | no frames]

Source Code for Module osh.command.agg

  1  # osh 
  2  # Copyright (C) Jack Orenstein <jao@geophile.com> 
  3  # 
  4  # This program is free software; you can redistribute it and/or modify 
  5  # it under the terms of the GNU General Public License as published by 
  6  # the Free Software Foundation; either version 2 of the License, or 
  7  # (at your option) any later version. 
  8  # 
  9  # This program is distributed in the hope that it will be useful, 
 10  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 11  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 12  # GNU General Public License for more details. 
 13  # 
 14  # You should have received a copy of the GNU General Public License 
 15  # along with this program; if not, write to the Free Software 
 16  # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 
 17   
 18  """C{agg [-r] [[-g|-c] GROUPING_FUNCTION] INITIAL_VALUE AGGREGATION_FUNCTION} 
 19   
 20  Aggregates objects from the input stream. If C{GROUPING_FUNCTION} is omitted, then 
 21  one output object is generated by initializing an accumulator to C{INITIAL_VALUE} 
 22  and then combining the accumulator with input objects using C{AGGREGATION_FUNCTION}. 
 23  C{AGGREGATION_FUNCTION} takes two inputs, the current value of the accumulator and 
 24  an object from the input stream. 
 25   
 26  Example: If the input objects are integers C{1, 2, 3}, then the sum of the integers 
 27  is computed as follows:: 
 28   
 29      ... ^ agg 0 'sum, x: sum + x' 
 30   
 31  which yields C{(6,)}. 
 32   
 33  If C{GROUPING_FUNCTION} is specified, then a set of accumulators is maintained, 
 34  one for each value of C{GROUPING_FUNCTION}. Each output object is a tuple with 
 35  two parts, the group value and the accumulated value for the group. 
 36   
 37  Example: If the input objects are C{('a', 1), ('a', 2), ('b', 3), ('b', 4)}, then 
 38  the sum of ints for each string is computed as follows:: 
 39   
 40      ... ^ agg -g 'x, y: x' 0 'sum, x, y: sum + y' 
 41   
 42  which yields C{('a', 3), ('b', 7)}. 
 43   
 44  If the grouping function is specified with the C{-g} flag, then agg generates its 
 45  output when the input stream has ended. (It has to, because group members map 
 46  appear in any order.) In some situations however, group members appear consecutively, 
 47  and it is useful to get output earlier. If group members are known to be consecutive, 
 48  then the group function can be specified using the C{-c} flag. 
 49   
 50  If the C{-r} flag is specified, then one output object is generated for each input object; 
 51  the output object contains the value of the accumulator so far. The accumulator appears 
 52  in the output row before the inputs. For example, if the input stream contains C{1, 2, 3}, 
 53  then the running total can be computed as follows:: 
 54   
 55      ... ^ agg -r 0 'sum, x: sum + x' ^ ... 
 56   
 57  The output stream would be C{(1, 1), (3, 2), (6, 3)}. In the last output object, C{6} is the sum 
 58  of the current input (C{3}) and all preceding inputs (C{1, 2}). 
 59   
 60  The C{-r} flag can also be used with grouping. For example, if the input objects are 
 61  C{('a', 1), ('a', 2), ('b', 3), ('b', 4)}, then the running totals for the strings would 
 62  be computed as follows:: 
 63   
 64      ... ^ agg -r -g 'x, y: x' 0 'sum, x, y: sum + y' ^ ... 
 65   
 66  The output stream would be C{(1, 'a', 1), (3, 'a', 2), (3, 'b', 3), (7, 'b', 4)}. 
 67  I.e., the running total is reinitialized to 0 for each group. 
 68  """ 
 69   
 70  import osh.core 
 71  import osh.args 
 72  import osh.function 
 73   
 74  _wrap_if_necessary = osh.core.wrap_if_necessary 
 75  create_function = osh.function._create_function 
 76  Option = osh.args.Option 
 77   
 78  # CLI 
79 -def _agg():
80 return _Agg()
81 82 # API
83 -def agg(initial_value, 84 aggregator, 85 group = None, 86 consecutive = None, 87 running = False):
88 """Combine inputs into a smaller number of outputs. If neither C{group} nor 89 C{consecutive} is specified, then there is one accumulator, initialized to 90 C{initial_value}. The C{aggregator} function is used to combine the current value 91 of the accumulator with the input to yield the next value of the accumulator. 92 The arguments to C{aggregator} are the elements of the accumulator followed 93 by the elements of one piece of input. 94 If C{group} is specified, then there is one accumulator for each group value, defined 95 by applying the function C{group} to each input. C{consecutive} is just like C{group} 96 except that it is assumed that group values are adjacent in the input sequence. 97 At most one of C{group} and C{consecutive} may be specified. If C{running} is C{false}, 98 then output contains one object per group, containing the aggregate value. 99 (If neither C{group} nor C{consecutive} are provided, then there is just one group, 100 representing the aggregate for the entire input stream.) If C{running} is true, 101 then each the aggregate value for the group is written out with each input object -- 102 i.e., the output contains "running totals". In this case, the aggregate values appear 103 before the input values in the output object. 104 """ 105 args = [initial_value, aggregator] 106 if group: 107 args.append(Option('-g', group)) 108 if consecutive: 109 args.append(Option('-c', consecutive)) 110 if running: 111 args.append(Option('-r')) 112 return _Agg().process_args(*args)
113
114 -class _Agg(osh.core.Op):
115 116 _aggregate = None 117 118 119 # object interface 120
121 - def __init__(self):
122 osh.core.Op.__init__(self, 'g:c:r', (2, 2))
123 124 125 # BaseOp interface 126
127 - def doc(self):
128 return __doc__
129
130 - def setup(self):
131 args = self.args() 132 grouping_function = args.function_arg('-g') 133 consecutive_grouping_function = args.function_arg('-c') 134 running_totals = args.flag('-r') 135 if running_totals is None: 136 running_totals = False 137 initial_value = _wrap_if_necessary(args.next_eval()) 138 aggregation_function = args.next_function() 139 if grouping_function and consecutive_grouping_function: 140 self.usage() 141 if initial_value is None or aggregation_function is None: 142 self.usage() 143 if args.has_next(): 144 self.usage() 145 if grouping_function and consecutive_grouping_function: 146 self.usage() 147 elif grouping_function: 148 self._aggregate = _GroupingAggregate( 149 self, 150 running_totals, 151 grouping_function, 152 initial_value, 153 aggregation_function) 154 elif consecutive_grouping_function: 155 self._aggregate = _ConsecutiveGroupingAggregate( 156 self, 157 running_totals, 158 consecutive_grouping_function, 159 initial_value, 160 aggregation_function) 161 else: 162 self._aggregate = _NonGroupingAggregate( 163 self, 164 running_totals, 165 initial_value, 166 aggregation_function)
167
168 - def receive(self, object):
169 self._aggregate.receive(object)
170
171 - def receive_complete(self):
172 self._aggregate.receive_complete()
173
174 -class _GroupingAggregate(object):
175 _running_totals = None 176 _command = None 177 _group_function = None 178 _initial_value = None 179 _aggregate_function = None 180 _sum = None 181
182 - def __init__(self, command, running_totals, group_function, initial_value, aggregate_function):
183 self._running_totals = running_totals 184 self._command = command 185 self._group_function = group_function 186 self._initial_value = initial_value 187 self._aggregate_function = aggregate_function 188 self._sum = {}
189
190 - def receive(self, object):
191 group = self._group_function(*object) 192 sum = self._sum.get(group, self._initial_value) 193 tuple_object = tuple(object) 194 new_sum = self._aggregate_function(*(tuple(sum) + tuple_object)) 195 self._sum[group] = _wrap_if_necessary(new_sum) 196 if self._running_totals: 197 self._command.send(_wrap_if_necessary(new_sum,) + tuple_object)
198
199 - def receive_complete(self):
200 if not self._running_totals: 201 for group, sum in self._sum.iteritems(): 202 self._command.send(_wrap_if_necessary(group) + tuple(sum)) 203 self._command.send_complete()
204
205 -class _ConsecutiveGroupingAggregate(object):
206 _running_totals = None 207 _command = None 208 _group_function = None 209 _initial_value = None 210 _aggregate_function = None 211 _group = None 212 _sum = None 213
214 - def __init__(self, command, running_totals, group_function, initial_value, aggregate_function):
215 self._running_totals = running_totals 216 self._command = command 217 self._group_function = group_function 218 self._initial_value = initial_value 219 self._aggregate_function = aggregate_function 220 self._group = None 221 self._sum = None
222
223 - def receive(self, object):
224 new_group = self._group_function(*object) 225 if self._group is None or self._group != new_group: 226 if self._group is not None and not self._running_totals: 227 self._command.send(_wrap_if_necessary(self._group) + tuple(self._sum)) 228 self._group = new_group 229 self._sum = self._initial_value 230 tuple_object = tuple(object) 231 new_sum = self._aggregate_function(*(tuple(self._sum) + tuple_object)) 232 self._sum = _wrap_if_necessary(new_sum) 233 if self._running_totals: 234 self._command.send(_wrap_if_necessary(new_sum,) + tuple_object)
235
236 - def receive_complete(self):
237 if (not self._running_totals) and self._group is not None: 238 self._command.send(_wrap_if_necessary(self._group) + tuple(self._sum)) 239 self._command.send_complete()
240
241 -class _NonGroupingAggregate(object):
242 _command = None 243 _running_totals = None 244 _aggregate_function = None 245 _sum = None 246
247 - def __init__(self, command, running_totals, initial_value, aggregate_function):
248 self._running_totals = running_totals 249 self._command = command 250 self._aggregate_function = aggregate_function 251 self._sum = initial_value
252
253 - def receive(self, object):
254 tuple_object = tuple(object) 255 new_sum = self._aggregate_function(*(tuple(self._sum) + tuple_object)) 256 self._sum = _wrap_if_necessary(new_sum) 257 if self._running_totals: 258 self._command.send(_wrap_if_necessary(new_sum,) + tuple_object)
259
260 - def receive_complete(self):
261 if not self._running_totals: 262 self._command.send(self._sum) 263 self._command.send_complete()
264