1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """C{agg [-r] [[-g|-c] GROUPING_FUNCTION] INITIAL_VALUE AGGREGATION_FUNCTION}
19
20 Aggregates objects from the input stream. If C{GROUPING_FUNCTION} is omitted, then
21 one output object is generated by initializing an accumulator to C{INITIAL_VALUE}
22 and then combining the accumulator with input objects using C{AGGREGATION_FUNCTION}.
23 C{AGGREGATION_FUNCTION} takes two inputs, the current value of the accumulator and
24 an object from the input stream.
25
26 Example: If the input objects are integers C{1, 2, 3}, then the sum of the integers
27 is computed as follows::
28
29 ... ^ agg 0 'sum, x: sum + x'
30
31 which yields C{(6,)}.
32
33 If C{GROUPING_FUNCTION} is specified, then a set of accumulators is maintained,
34 one for each value of C{GROUPING_FUNCTION}. Each output object is a tuple with
35 two parts, the group value and the accumulated value for the group.
36
37 Example: If the input objects are C{('a', 1), ('a', 2), ('b', 3), ('b', 4)}, then
38 the sum of ints for each string is computed as follows::
39
40 ... ^ agg -g 'x, y: x' 0 'sum, x, y: sum + y'
41
42 which yields C{('a', 3), ('b', 7)}.
43
44 If the grouping function is specified with the C{-g} flag, then agg generates its
45 output when the input stream has ended. (It has to, because group members map
46 appear in any order.) In some situations however, group members appear consecutively,
47 and it is useful to get output earlier. If group members are known to be consecutive,
48 then the group function can be specified using the C{-c} flag.
49
50 If the C{-r} flag is specified, then one output object is generated for each input object;
51 the output object contains the value of the accumulator so far. The accumulator appears
52 in the output row before the inputs. For example, if the input stream contains C{1, 2, 3},
53 then the running total can be computed as follows::
54
55 ... ^ agg -r 0 'sum, x: sum + x' ^ ...
56
57 The output stream would be C{(1, 1), (3, 2), (6, 3)}. In the last output object, C{6} is the sum
58 of the current input (C{3}) and all preceding inputs (C{1, 2}).
59
60 The C{-r} flag can also be used with grouping. For example, if the input objects are
61 C{('a', 1), ('a', 2), ('b', 3), ('b', 4)}, then the running totals for the strings would
62 be computed as follows::
63
64 ... ^ agg -r -g 'x, y: x' 0 'sum, x, y: sum + y' ^ ...
65
66 The output stream would be C{(1, 'a', 1), (3, 'a', 2), (3, 'b', 3), (7, 'b', 4)}.
67 I.e., the running total is reinitialized to 0 for each group.
68 """
69
70 import osh.core
71 import osh.args
72 import osh.function
73
74 _wrap_if_necessary = osh.core.wrap_if_necessary
75 create_function = osh.function._create_function
76 Option = osh.args.Option
77
78
81
82
83 -def agg(initial_value,
84 aggregator,
85 group = None,
86 consecutive = None,
87 running = False):
88 """Combine inputs into a smaller number of outputs. If neither C{group} nor
89 C{consecutive} is specified, then there is one accumulator, initialized to
90 C{initial_value}. The C{aggregator} function is used to combine the current value
91 of the accumulator with the input to yield the next value of the accumulator.
92 The arguments to C{aggregator} are the elements of the accumulator followed
93 by the elements of one piece of input.
94 If C{group} is specified, then there is one accumulator for each group value, defined
95 by applying the function C{group} to each input. C{consecutive} is just like C{group}
96 except that it is assumed that group values are adjacent in the input sequence.
97 At most one of C{group} and C{consecutive} may be specified. If C{running} is C{false},
98 then output contains one object per group, containing the aggregate value.
99 (If neither C{group} nor C{consecutive} are provided, then there is just one group,
100 representing the aggregate for the entire input stream.) If C{running} is true,
101 then each the aggregate value for the group is written out with each input object --
102 i.e., the output contains "running totals". In this case, the aggregate values appear
103 before the input values in the output object.
104 """
105 args = [initial_value, aggregator]
106 if group:
107 args.append(Option('-g', group))
108 if consecutive:
109 args.append(Option('-c', consecutive))
110 if running:
111 args.append(Option('-r'))
112 return _Agg().process_args(*args)
113
114 -class _Agg(osh.core.Op):
115
116 _aggregate = None
117
118
119
120
123
124
125
126
129
131 args = self.args()
132 grouping_function = args.function_arg('-g')
133 consecutive_grouping_function = args.function_arg('-c')
134 running_totals = args.flag('-r')
135 if running_totals is None:
136 running_totals = False
137 initial_value = _wrap_if_necessary(args.next_eval())
138 aggregation_function = args.next_function()
139 if grouping_function and consecutive_grouping_function:
140 self.usage()
141 if initial_value is None or aggregation_function is None:
142 self.usage()
143 if args.has_next():
144 self.usage()
145 if grouping_function and consecutive_grouping_function:
146 self.usage()
147 elif grouping_function:
148 self._aggregate = _GroupingAggregate(
149 self,
150 running_totals,
151 grouping_function,
152 initial_value,
153 aggregation_function)
154 elif consecutive_grouping_function:
155 self._aggregate = _ConsecutiveGroupingAggregate(
156 self,
157 running_totals,
158 consecutive_grouping_function,
159 initial_value,
160 aggregation_function)
161 else:
162 self._aggregate = _NonGroupingAggregate(
163 self,
164 running_totals,
165 initial_value,
166 aggregation_function)
167
169 self._aggregate.receive(object)
170
172 self._aggregate.receive_complete()
173
175 _running_totals = None
176 _command = None
177 _group_function = None
178 _initial_value = None
179 _aggregate_function = None
180 _sum = None
181
182 - def __init__(self, command, running_totals, group_function, initial_value, aggregate_function):
183 self._running_totals = running_totals
184 self._command = command
185 self._group_function = group_function
186 self._initial_value = initial_value
187 self._aggregate_function = aggregate_function
188 self._sum = {}
189
191 group = self._group_function(*object)
192 sum = self._sum.get(group, self._initial_value)
193 tuple_object = tuple(object)
194 new_sum = self._aggregate_function(*(tuple(sum) + tuple_object))
195 self._sum[group] = _wrap_if_necessary(new_sum)
196 if self._running_totals:
197 self._command.send(_wrap_if_necessary(new_sum,) + tuple_object)
198
200 if not self._running_totals:
201 for group, sum in self._sum.iteritems():
202 self._command.send(_wrap_if_necessary(group) + tuple(sum))
203 self._command.send_complete()
204
206 _running_totals = None
207 _command = None
208 _group_function = None
209 _initial_value = None
210 _aggregate_function = None
211 _group = None
212 _sum = None
213
214 - def __init__(self, command, running_totals, group_function, initial_value, aggregate_function):
215 self._running_totals = running_totals
216 self._command = command
217 self._group_function = group_function
218 self._initial_value = initial_value
219 self._aggregate_function = aggregate_function
220 self._group = None
221 self._sum = None
222
224 new_group = self._group_function(*object)
225 if self._group is None or self._group != new_group:
226 if self._group is not None and not self._running_totals:
227 self._command.send(_wrap_if_necessary(self._group) + tuple(self._sum))
228 self._group = new_group
229 self._sum = self._initial_value
230 tuple_object = tuple(object)
231 new_sum = self._aggregate_function(*(tuple(self._sum) + tuple_object))
232 self._sum = _wrap_if_necessary(new_sum)
233 if self._running_totals:
234 self._command.send(_wrap_if_necessary(new_sum,) + tuple_object)
235
237 if (not self._running_totals) and self._group is not None:
238 self._command.send(_wrap_if_necessary(self._group) + tuple(self._sum))
239 self._command.send_complete()
240
242 _command = None
243 _running_totals = None
244 _aggregate_function = None
245 _sum = None
246
247 - def __init__(self, command, running_totals, initial_value, aggregate_function):
248 self._running_totals = running_totals
249 self._command = command
250 self._aggregate_function = aggregate_function
251 self._sum = initial_value
252
254 tuple_object = tuple(object)
255 new_sum = self._aggregate_function(*(tuple(self._sum) + tuple_object))
256 self._sum = _wrap_if_necessary(new_sum)
257 if self._running_totals:
258 self._command.send(_wrap_if_necessary(new_sum,) + tuple_object)
259
261 if not self._running_totals:
262 self._command.send(self._sum)
263 self._command.send_complete()
264