Paul Khuong mostly on Lisp

rss feed

Fri, 01 Apr 2011

 

Introducing Pipes, a lightweight stream fusion EDSL

Fusion is cool. It lets us write bulk data processing programs modularly, with each pass or computation separate from the rest, while retaining the efficiency of code that executes all the passes simultaneously rather than building large intermediate values. In the last couple years, most of the attention has been coming from the Haskell crowd (e.g. Data.List.Stream), but Lisper aren’t immune to that siren’s call. In the late 80’s, Richard Waters worked on SERIES, a Common Lisp package to transform “obviously synchronizable series expressions” into buffer-less loops. That package is still available, and even lightly maintained.

I believe that, while the goal is noble, the approach taken in SERIES is too magical; I’d argue that it’s even more true of stream fusion in GHC.

SERIES goes through a lot of trouble to makes it transformation work on code expressed as regular CL, and even across function definitions. The problem is, SERIES can only handle a subset of CL, so users can be surprised when a seemingly minor refactoring suddenly breaks their code.

GHC’s fusion stuff has the virtue of using the language itself to handle the dirty work of looking just like regular Haskell. Unfortunately, it also depends on inlining and on rewrite rules (well, one), which don’t warn when they can’t be applied.

So, instead, I believe that we should be using an EDSL. It’s embedded, so it’s easy to use with the host language (both around and inside the DSL). It’s also domain specific, so it should be clear what the language does and doesn’t handle, and, in the worst case, the implementation can warn the user instead of silently reverting to slowness.

The design space is huge, but I think Pipes is a decent point. I haven’t been able to spend even one minute on it since a hackathon during the holidays; I hope someone else will be able to work on Pipes a bit.

The idea behing Pipes

The simplest fusion schemes attempt to handle cases like

(mapcar f (mapcar g list)) -> (mapcar (compose f g) list)

These cases are simple because each operation has at most one consumer (output) and one producer (input), and produces exactly one output value for each input value. The obvious ways to generalize allow arbitrarily many output values per input values, multiple consumers, or multiple producers.

Most common are transformations that handle an arbitrary number of output values per input values (or even arbitrary types!). foldr fusion is probably the prime example. I like to think of these rules as based on a specialised CPS conversion: instead of consing a list up, producers call their continuations on each item that would have been in that list.

I’m not sure I can find examples of schemes that handle multiple consumers in the literature. We don’t see them as often in code as multiple producers, and they’re hard to handle with rewrite rules. Still, it’s not hard to treat those cases with a dedicated code generator: just compile a “push” dataflow engine. Such a compiler could easily be extended to allow many outputs (or none) per input.

Multiple producers are much more common, if only because zipWith is a standard list operation. Stream fusion achieves that, and a lot more: concatenated and nested streams, mostly. It can also handle functions like filter easily, by inserting “skip” values in the stream. In fact, it can allow functions to yield an arbitrary number of output values per input. That’s compatible with what a “pull” dataflow engine can achieve.

SERIES is somewhere else in the solution space. It allows multiple producers and consumers, but not, as far as I can tell, more than one output per input (it too uses “skip” values to implement functions like filter). It achieves that by compiling to a loop that advances the state of each node in the dataflow graph exactly once per iteration. Thus, the amount of implicit buffering is constant (one output value for each node), and the loop’s body is generated by a simple traversal of the graph.

Pipes is mostly a “push” engine that handles multiple consumers, but uses a type system to recognize cases similar to what SERIES handles, and then allows multiple producers as well. It’s a different design choice than stream fusion, but I believe that it’s essential to allow multiple consumers instead of forcing users to build temporary values and then traverse them multiple times. Like SERIES, it compiles to a loop whose body corresponds to a traversal of the graph. However, instead of advancing each node exactly once per iteration, some subgraphs can advance multiple times per iteration, or even compile to nested loops.

I’ve been trying to find a good tradeoff for almost 5 years now, and it I feels like there’s a rule here, something like “arbitrary outputs per input, multiple consumers, multiple producers: choose two.” Stream fusion chooses multiple producers and arbitrary output, SERIES multiple producers and consumers, and Pipes multiple consumers and arbitrary output, plus a dash of multiple producers in certain cases. The fact that it almost manages to get that third feature is what I originally found exciting about that design.

That’s it for now

Frankly, I’m pretty much rediscovering my own code, but I remember thinking that it was almost ready for people to start playing with it. Again, it can be downloaded at https://github.com/pkhuong/Pipes.

Here’s a simple example:

CL-USER> 
(lambda (x y) 
  (declare (type (simple-array double-float 1) x y) 
           (optimize speed (sb-c::insert-array-bounds-checks 0))) 
  (pipes (let* ((x (- (from-vector x) (from-vector y))) 
                ; x is bound to the difference at each iteration 
                (_ sum-x (scan + x 0d0 double-float)) 
                ; sum-x is bound to the final sum at the end of 
                ; the loop 
                (_ sum-x^2 (scan + (* x x) 0d0 double-float)) 
                (_ min   (scan min x 
                               double-float-positive-infinity 
                               double-float)) 
                (_ max   (scan max x 
                               double-float-negative-infinity 
                               double-float)))) 
    (values (length x) sum-x sum-x^2 min max))) 
#<FUNCTION (LAMBDA (X Y)) {1004B7BAB9}> 
CL-USER> 
(funcall * 
         (map-into (make-array 10 :element-type ’double-float) 
                   (let ((x 0d0)) 
                     (lambda () 
                       (incf x)))) 
         (map-into (make-array 10 :element-type ’double-float) 
                   (let ((x 0d0)) 
                     (lambda () 
                       (decf x))))) 
10 
110.0d0 
1540.0d0 
2.0d0 
20.0d0 
CL-USER> (disassemble **) 
; disassembly for (LAMBDA (X Y)) 
; 04125107:       488B4AF9         MOV RCX, [RDX-7]           ; no-arg-parsing entry point 
;      10B:       488B5FF9         MOV RBX, [RDI-7] 
;      10F:       4C8BC3           MOV R8, RBX 
;      112:       488BF1           MOV RSI, RCX 
;      115:       4839D9           CMP RCX, RBX 
;      118:       488BCE           MOV RCX, RSI                                                                                                                                     
;      11B:       490F4FC8         CMOVNLE RCX, R8 ; find the min length 
;      11F:       488BD9           MOV RBX, RCX 
;      122:       31C9             XOR ECX, ECX 
;      124:       660F57E4         XORPD XMM4, XMM4 
;      128:       660F57ED         XORPD XMM5, XMM5 
;      12C:       F20F101554020000 MOVSD XMM2, [RIP+596] 
;      134:       F20F101D54020000 MOVSD XMM3, [RIP+596] 
;      13C:       EB46             JMP L3 
;      13E:       90               NOP 
;      13F:       90               NOP 
;      140: L0:   F20F104C0A01     MOVSD XMM1, [RDX+RCX+1] 
;      146:       F20F10740F01     MOVSD XMM6, [RDI+RCX+1] 
;      14C:       F20F5CCE         SUBSD XMM1, XMM6 
;      150:       F20F58E1         ADDSD XMM4, XMM1 
;      154:       660F28F1         MOVAPD XMM6, XMM1 
;      158:       F20F59F1         MULSD XMM6, XMM1 
;      15C:       F20F58EE         ADDSD XMM5, XMM6 
;      160:       660F2FD1         COMISD XMM2, XMM1 
;      164:       0F8A52010000     JP L14 
;      16A:       0F834C010000     JNB L14 ; rest of cmp/movapd 
;      170: L1:   660F2FD9         COMISD XMM3, XMM1 
;      174:       0F8A2D010000     JP L12 
;      17A:       0F8627010000     JBE L12 ; same 
;      180: L2:   4883C108         ADD RCX, 8 
;      184: L3:   4839D9           CMP RCX, RBX 
;      187:       7CB7             JL L0 
[...]

This subtracts two vectors element-wise, and returns the sum of the differences, the sum of the squared differences, and the minimum and maximum differences. It’s definitely not the best example, as it doesn’t explicitly exploit the freedom to compute many outputs per input, but it’s something that I’ve had to write by hand before.

If you want to play with Pipes and have questions, ping me on #lisp.

posted at: 21:20 | /Lisp/Pipes | permalink

Made with PyBlosxom Contact me by email: pvk@pvk.ca.