1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """C{red -r [BINARY_FUNCTION ...]}
19
20 Aggregates (reduces) objects from the input stream in a simpler but less general
21 way than the C{agg} command.
22
23 A C{BINARY_FUNCTION} takes two inputs and produces one output. Binary operators
24 and user-defined functions can be used as C{BINARY_FUNCTION}s. Given a sequence
25 of inputs such as C{(1,), (2,), (3,)}, C{red} can be used to find the sum::
26
27 ... ^ red +
28
29 yields C{(6,)}. If each input sequence contains multiple values, then multiple
30 C{BINARY_FUNCTION}s can be provided. For example, to find the sums of the first
31 10 integers, the sum of their squares, and the sum of their cubes::
32
33 osh gen 10 ^ f 'x: (x, x**2, x**3)' ^ red + + + $
34
35 which yields the output C{(45, 285, 2025)}.
36
37 If C{.} is provided as one of the C{BINARY_FUNCTION}s, then that value is not aggregated.
38 Instead, aggregation is done for the groups defined by the indicated elements. For example,
39 suppose the input sequence is::
40
41 (1, 10, 5, 100)
42 (1, 10, 6, 200)
43 (1, 11, 4, 100)
44 (1, 11, 3, 200)
45 (2, 20, 8, 100)
46 (2, 20, 9, 200)
47 (2, 20, 10, 300)
48 (3, 30, 5, 100)
49
50 If this sequence is piped to this invocation of C{red}::
51
52 red . . + +
53
54 Then the output sequence would be::
55
56 (1, 10, 11, 300)
57 (1, 11, 7, 300)
58 (2, 20, 17, 300)
59 (3, 30, 5, 100)
60
61 The two C{.}s, in the first two positions, mean that the groups used for aggregation
62 are C{(1, 10)}, C{(1, 11)}, C{(2, 20)}, and C{(3, 30)}. The
63 C{(1, 10)} group has two rows, C{(1, 10, 5, 100)},
64 and C{(1, 10, 6, 200)}. The two C{+}s mean that the items in the last two fields should be summed.
65 Adding the items in the third position, 5 + 6 = 11; and in the last position, 100 + 200 = 300.
66
67 If the C{-r} flag is specified, then one output object is generated for each input object;
68 the output object contains the current accumulated values. The accumulator appears
69 in the output row before the inputs. For example, if the input stream contains C{(1,), (2,), (3,)},
70 then the running total can be computed as follows::
71
72 ... ^ red -r + ^ ...
73
74 The output stream would be C{(1, 1), (3, 2), (6, 3)}. In the last output object, C{6} is the sum
75 of the current input (C{3}) and all preceding inputs (C{1, 2}).
76
77 The C{-r} flag can also be used with grouping. For example, if the input objects are
78 C{('a', 1), ('a', 2), ('b', 3), ('b', 4)}, then the running totals for the strings would
79 be computed as follows::
80
81 ... ^ red -r -g 'x, y: x' 0 'sum, x, y: sum + y' ^ ...
82
83 The output stream would be C{(1, 'a', 1), (3, 'a', 2), (3, 'b', 3), (7, 'b', 4)}.
84 """
85
86 import osh.core
87 import osh.args
88 import osh.function
89 import agg
90
91 create_function = osh.function._create_function
92 Option = osh.args.Option
93 _GroupingAggregate = agg._GroupingAggregate
94 _NonGroupingAggregate = agg._NonGroupingAggregate
95
96
99
100
101 -def red(binary_functions, running = False):
102 """C{binary_functions} is a sequence. Each element of C{binary_functions} is
103 either None or a binary function.
104 If no elements are C{None}, then the binary function in position C{i} is used
105 to reduce the values in element C{i} of the input sequences. If there are
106 C{None} values, then these are used to define groups of inputs, partitioning
107 by the values in the indicated columns. The remaining binary functions then compute
108 reductions within each group. If C{running} is C{False} then there is one output
109 tuple in the absence of grouping; otherwise there is one tuple output per group.
110 If C{running} is {True}, then the aggregate value computed so far is written
111 out for each input, i.e., the output contains "running totals". In this case,
112 the aggregate value appears before the input values.
113 """
114 if not (isinstance(binary_functions, list) or
115 isinstance(binary_functions, tuple)):
116 binary_functions = [binary_functions]
117 args = [f for f in binary_functions]
118 if running:
119 args.append(Option('-r'))
120 return _Red().process_args(*args)
121
122 -class _Red(osh.core.Op):
123
124 _aggregate = None
125
126
127
130
131
132
133
136
138 args = self.args()
139 running = args.flag('-r')
140 if running is None:
141 running = False
142 functions = args.remaining()
143
144
145 group_positions = []
146 data_positions = []
147 for p in xrange(len(functions)):
148 f = functions[p]
149 if f == '.' or f is None:
150 group_positions.append(p)
151 else:
152 data_positions.append(p)
153 n_group = len(group_positions)
154 n_data = len(data_positions)
155 functions = [create_function(functions[p]) for p in data_positions]
156 initial_value = (None,) * n_data
157 if n_group == 0:
158 def aggregator(*t):
159 if t[:n_data] == initial_value:
160
161 accumulator = t[-n_data:]
162 else:
163 accumulator = tuple([functions[p](t[p], t[n_data + p])
164 for p in xrange(n_data)])
165 return accumulator
166 self._aggregate = _NonGroupingAggregate(self,
167 running,
168 initial_value,
169 aggregator)
170 else:
171 def grouper(*t):
172 return tuple([t[p] for p in group_positions])
173 def aggregator(*t):
174 if reduce(lambda r, x: r and x is None, t[:n_data], True):
175
176 accumulator = tuple([t[n_data + data_positions[p]]
177 for p in xrange(n_data)])
178 else:
179 accumulator = tuple([functions[p](t[p], t[n_data + data_positions[p]])
180 for p in xrange(n_data)])
181 return accumulator
182 self._aggregate = _GroupingAggregate(self,
183 running,
184 grouper,
185 initial_value,
186 aggregator)
187
189 self._aggregate.receive(object)
190
192 self._aggregate.receive_complete()
193