[frames] | no frames]

# Source Code for Module osh.command.red

```  1  # osh
2  # Copyright (C) Jack Orenstein <jao@geophile.com>
3  #
4  # This program is free software; you can redistribute it and/or modify
6  # the Free Software Foundation; either version 2 of the License, or
7  # (at your option) any later version.
8  #
9  # This program is distributed in the hope that it will be useful,
10  # but WITHOUT ANY WARRANTY; without even the implied warranty of
11  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  # GNU General Public License for more details.
13  #
14  # You should have received a copy of the GNU General Public License
15  # along with this program; if not, write to the Free Software
16  # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18  """C{red -r [BINARY_FUNCTION ...]}
19
20  Aggregates (reduces) objects from the input stream in a simpler but less general
21  way than the C{agg} command.
22
23  A C{BINARY_FUNCTION} takes two inputs and produces one output. Binary operators
24  and user-defined functions can be used as C{BINARY_FUNCTION}s. Given a sequence
25  of inputs such as C{(1,), (2,), (3,)}, C{red} can be used to find the sum::
26
27      ... ^ red +
28
29  yields C{(6,)}. If each input sequence contains multiple values, then multiple
30  C{BINARY_FUNCTION}s can be provided. For example, to find the sums of the first
31  10 integers, the sum of their squares, and the sum of their cubes::
32
33      osh gen 10 ^ f 'x: (x, x**2, x**3)' ^ red + + + \$
34
35  which yields the output C{(45, 285, 2025)}.
36
37  If C{.} is provided as one of the C{BINARY_FUNCTION}s, then that value is not aggregated.
38  Instead, aggregation is done for the groups defined by the indicated elements. For example,
39  suppose the input sequence is::
40
41      (1, 10, 5, 100)
42      (1, 10, 6, 200)
43      (1, 11, 4, 100)
44      (1, 11, 3, 200)
45      (2, 20, 8, 100)
46      (2, 20, 9, 200)
47      (2, 20, 10, 300)
48      (3, 30, 5, 100)
49
50  If this sequence is piped to this invocation of C{red}::
51
52      red . . + +
53
54  Then the output sequence would be::
55
56      (1, 10, 11, 300)
57      (1, 11, 7, 300)
58      (2, 20, 17, 300)
59      (3, 30, 5, 100)
60
61  The two C{.}s, in the first two positions, mean that the groups used for aggregation
62  are C{(1, 10)}, C{(1, 11)}, C{(2, 20)}, and C{(3, 30)}. The
63  C{(1, 10)} group has two rows, C{(1, 10, 5, 100)},
64  and C{(1, 10, 6, 200)}. The two C{+}s mean that the items in the last two fields should be summed.
65  Adding the items in the third position, 5 + 6 = 11; and in the last position, 100 + 200 = 300.
66
67  If the C{-r} flag is specified, then one output object is generated for each input object;
68  the output object contains the current accumulated values. The accumulator appears
69  in the output row before the inputs. For example, if the input stream contains C{(1,), (2,), (3,)},
70  then the running total can be computed as follows::
71
72      ... ^ red -r + ^ ...
73
74  The output stream would be C{(1, 1), (3, 2), (6, 3)}. In the last output object, C{6} is the sum
75  of the current input (C{3}) and all preceding inputs (C{1, 2}).
76
77  The C{-r} flag can also be used with grouping. For example, if the input objects are
78  C{('a', 1), ('a', 2), ('b', 3), ('b', 4)}, then the running totals for the strings would
79  be computed as follows::
80
81      ... ^ red -r -g 'x, y: x' 0 'sum, x, y: sum + y' ^ ...
82
83  The output stream would be C{(1, 'a', 1), (3, 'a', 2), (3, 'b', 3), (7, 'b', 4)}.
84  """
85
86  import osh.core
87  import osh.args
88  import osh.function
89  import agg
90
91  create_function = osh.function._create_function
92  Option = osh.args.Option
93  _GroupingAggregate = agg._GroupingAggregate
94  _NonGroupingAggregate = agg._NonGroupingAggregate
95
96  # CLI
97 -def _red():
98      return _Red()
99
100  # API
101 -def red(binary_functions, running = False):
102      """C{binary_functions} is a sequence. Each element of C{binary_functions} is
103      either None or a binary function.
104      If no elements are C{None}, then the binary function in position C{i} is used
105      to reduce the values in element C{i} of the input sequences. If there are
106      C{None} values, then these are used to define groups of inputs, partitioning
107      by the values in the indicated columns. The remaining binary functions then compute
108      reductions within each group. If C{running} is C{False} then there is one output
109      tuple in the absence of grouping; otherwise there is one tuple output per group.
110      If C{running} is {True}, then the aggregate value computed so far is written
111      out for each input, i.e., the output contains "running totals". In this case,
112      the aggregate value appears before the input values.
113      """
114      if not (isinstance(binary_functions, list) or
115              isinstance(binary_functions, tuple)):
116          binary_functions = [binary_functions]
117      args = [f for f in binary_functions]
118      if running:
119          args.append(Option('-r'))
120      return _Red().process_args(*args)
121
122 -class _Red(osh.core.Op):
123
124      _aggregate = None
125
126      # object interface
127
128 -    def __init__(self):
129          osh.core.Op.__init__(self, 'r', (1, None))
130
131
132      # BaseOp interface
133
134 -    def doc(self):
135          return __doc__
136
137 -    def setup(self):
138          args = self.args()
139          running = args.flag('-r')
140          if running is None:
141              running = False
142          functions = args.remaining()
143          # A "group" position contains a dot, used to indicate grouping.
144          # A "data" position does not contain a dot; it contains data that will be aggregated.
145          group_positions = []
146          data_positions = []
147          for p in xrange(len(functions)):
148              f = functions[p]
149              if f == '.' or f is None:
150                  group_positions.append(p)
151              else:
152                  data_positions.append(p)
153          n_group = len(group_positions)
154          n_data = len(data_positions)
155          functions = [create_function(functions[p]) for p in data_positions]
156          initial_value = (None,) * n_data
157          if n_group == 0:
158              def aggregator(*t):
159                  if t[:n_data] == initial_value:
160                      # all None => first item, need to initialize accumulator
161                      accumulator = t[-n_data:]
162                  else:
163                      accumulator = tuple([functions[p](t[p], t[n_data + p])
164                                           for p in xrange(n_data)])
165                  return accumulator
166              self._aggregate = _NonGroupingAggregate(self,
167                                                      running,
168                                                      initial_value,
169                                                      aggregator)
170          else:
171              def grouper(*t):
172                  return tuple([t[p] for p in group_positions])
173              def aggregator(*t):
174                  if reduce(lambda r, x: r and x is None, t[:n_data], True):
175                      # all None => first item, need to initialize accumulator
176                      accumulator = tuple([t[n_data + data_positions[p]]
177                                           for p in xrange(n_data)])
178                  else:
179                      accumulator = tuple([functions[p](t[p], t[n_data + data_positions[p]])
180                                           for p in xrange(n_data)])
181                  return accumulator
182              self._aggregate = _GroupingAggregate(self,
183                                                   running,
184                                                   grouper,
185                                                   initial_value,
186                                                   aggregator)
187