Package osh :: Package command :: Module red
[frames] | no frames]

Source Code for Module osh.command.red

  1  # osh 
  2  # Copyright (C) Jack Orenstein <jao@geophile.com> 
  3  # 
  4  # This program is free software; you can redistribute it and/or modify 
  5  # it under the terms of the GNU General Public License as published by 
  6  # the Free Software Foundation; either version 2 of the License, or 
  7  # (at your option) any later version. 
  8  # 
  9  # This program is distributed in the hope that it will be useful, 
 10  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 11  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 12  # GNU General Public License for more details. 
 13  # 
 14  # You should have received a copy of the GNU General Public License 
 15  # along with this program; if not, write to the Free Software 
 16  # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 
 17   
 18  """C{red -r [BINARY_FUNCTION ...]} 
 19   
 20  Aggregates (reduces) objects from the input stream in a simpler but less general 
 21  way than the C{agg} command. 
 22   
 23  A C{BINARY_FUNCTION} takes two inputs and produces one output. Binary operators 
 24  and user-defined functions can be used as C{BINARY_FUNCTION}s. Given a sequence 
 25  of inputs such as C{(1,), (2,), (3,)}, C{red} can be used to find the sum:: 
 26   
 27      ... ^ red + 
 28   
 29  yields C{(6,)}. If each input sequence contains multiple values, then multiple 
 30  C{BINARY_FUNCTION}s can be provided. For example, to find the sums of the first 
 31  10 integers, the sum of their squares, and the sum of their cubes:: 
 32   
 33      osh gen 10 ^ f 'x: (x, x**2, x**3)' ^ red + + + $ 
 34   
 35  which yields the output C{(45, 285, 2025)}. 
 36   
 37  If C{.} is provided as one of the C{BINARY_FUNCTION}s, then that value is not aggregated. 
 38  Instead, aggregation is done for the groups defined by the indicated elements. For example, 
 39  suppose the input sequence is:: 
 40   
 41      (1, 10, 5, 100) 
 42      (1, 10, 6, 200) 
 43      (1, 11, 4, 100) 
 44      (1, 11, 3, 200) 
 45      (2, 20, 8, 100) 
 46      (2, 20, 9, 200) 
 47      (2, 20, 10, 300) 
 48      (3, 30, 5, 100) 
 49   
 50  If this sequence is piped to this invocation of C{red}:: 
 51   
 52      red . . + + 
 53   
 54  Then the output sequence would be:: 
 55   
 56      (1, 10, 11, 300) 
 57      (1, 11, 7, 300) 
 58      (2, 20, 17, 300) 
 59      (3, 30, 5, 100) 
 60   
 61  The two C{.}s, in the first two positions, mean that the groups used for aggregation 
 62  are C{(1, 10)}, C{(1, 11)}, C{(2, 20)}, and C{(3, 30)}. The 
 63  C{(1, 10)} group has two rows, C{(1, 10, 5, 100)}, 
 64  and C{(1, 10, 6, 200)}. The two C{+}s mean that the items in the last two fields should be summed. 
 65  Adding the items in the third position, 5 + 6 = 11; and in the last position, 100 + 200 = 300. 
 66   
 67  If the C{-r} flag is specified, then one output object is generated for each input object; 
 68  the output object contains the current accumulated values. The accumulator appears 
 69  in the output row before the inputs. For example, if the input stream contains C{(1,), (2,), (3,)}, 
 70  then the running total can be computed as follows:: 
 71   
 72      ... ^ red -r + ^ ... 
 73   
 74  The output stream would be C{(1, 1), (3, 2), (6, 3)}. In the last output object, C{6} is the sum 
 75  of the current input (C{3}) and all preceding inputs (C{1, 2}). 
 76   
 77  The C{-r} flag can also be used with grouping. For example, if the input objects are 
 78  C{('a', 1), ('a', 2), ('b', 3), ('b', 4)}, then the running totals for the strings would 
 79  be computed as follows:: 
 80   
 81      ... ^ red -r -g 'x, y: x' 0 'sum, x, y: sum + y' ^ ... 
 82   
 83  The output stream would be C{(1, 'a', 1), (3, 'a', 2), (3, 'b', 3), (7, 'b', 4)}. 
 84  """ 
 85   
 86  import osh.core 
 87  import osh.args 
 88  import osh.function 
 89  import agg 
 90   
 91  create_function = osh.function._create_function 
 92  Option = osh.args.Option 
 93  _GroupingAggregate = agg._GroupingAggregate 
 94  _NonGroupingAggregate = agg._NonGroupingAggregate 
 95   
 96  # CLI 
97 -def _red():
98 return _Red()
99 100 # API
101 -def red(binary_functions, running = False):
102 """C{binary_functions} is a sequence. Each element of C{binary_functions} is 103 either None or a binary function. 104 If no elements are C{None}, then the binary function in position C{i} is used 105 to reduce the values in element C{i} of the input sequences. If there are 106 C{None} values, then these are used to define groups of inputs, partitioning 107 by the values in the indicated columns. The remaining binary functions then compute 108 reductions within each group. If C{running} is C{False} then there is one output 109 tuple in the absence of grouping; otherwise there is one tuple output per group. 110 If C{running} is {True}, then the aggregate value computed so far is written 111 out for each input, i.e., the output contains "running totals". In this case, 112 the aggregate value appears before the input values. 113 """ 114 if not (isinstance(binary_functions, list) or 115 isinstance(binary_functions, tuple)): 116 binary_functions = [binary_functions] 117 args = [f for f in binary_functions] 118 if running: 119 args.append(Option('-r')) 120 return _Red().process_args(*args)
121
122 -class _Red(osh.core.Op):
123 124 _aggregate = None 125 126 # object interface 127
128 - def __init__(self):
129 osh.core.Op.__init__(self, 'r', (1, None))
130 131 132 # BaseOp interface 133
134 - def doc(self):
135 return __doc__
136
137 - def setup(self):
138 args = self.args() 139 running = args.flag('-r') 140 if running is None: 141 running = False 142 functions = args.remaining() 143 # A "group" position contains a dot, used to indicate grouping. 144 # A "data" position does not contain a dot; it contains data that will be aggregated. 145 group_positions = [] 146 data_positions = [] 147 for p in xrange(len(functions)): 148 f = functions[p] 149 if f == '.' or f is None: 150 group_positions.append(p) 151 else: 152 data_positions.append(p) 153 n_group = len(group_positions) 154 n_data = len(data_positions) 155 functions = [create_function(functions[p]) for p in data_positions] 156 initial_value = (None,) * n_data 157 if n_group == 0: 158 def aggregator(*t): 159 if t[:n_data] == initial_value: 160 # all None => first item, need to initialize accumulator 161 accumulator = t[-n_data:] 162 else: 163 accumulator = tuple([functions[p](t[p], t[n_data + p]) 164 for p in xrange(n_data)]) 165 return accumulator
166 self._aggregate = _NonGroupingAggregate(self, 167 running, 168 initial_value, 169 aggregator) 170 else: 171 def grouper(*t): 172 return tuple([t[p] for p in group_positions])
173 def aggregator(*t): 174 if reduce(lambda r, x: r and x is None, t[:n_data], True): 175 # all None => first item, need to initialize accumulator 176 accumulator = tuple([t[n_data + data_positions[p]] 177 for p in xrange(n_data)]) 178 else: 179 accumulator = tuple([functions[p](t[p], t[n_data + data_positions[p]]) 180 for p in xrange(n_data)]) 181 return accumulator 182 self._aggregate = _GroupingAggregate(self, 183 running, 184 grouper, 185 initial_value, 186 aggregator) 187
188 - def receive(self, object):
189 self._aggregate.receive(object)
190
191 - def receive_complete(self):
192 self._aggregate.receive_complete()
193