### Introducing Pipes, a lightweight stream fusion EDSL

Fusion is cool. It lets us write bulk data processing programs modularly, with each pass or computation separate from the rest, while retaining the efficiency of code that executes all the passes simultaneously rather than building large intermediate values. In the last couple years, most of the attention has been coming from the Haskell crowd (e.g. Data.List.Stream), but Lisper aren’t immune to that siren’s call. In the late 80’s, Richard Waters worked on SERIES, a Common Lisp package to transform “obviously synchronizable series expressions” into buffer-less loops. That package is still available, and even lightly maintained.

I believe that, while the goal is noble, the approach taken in SERIES is too magical; I’d argue that it’s even more true of stream fusion in GHC.

SERIES goes through a lot of trouble to makes it transformation work on code expressed as regular CL, and even across function definitions. The problem is, SERIES can only handle a subset of CL, so users can be surprised when a seemingly minor refactoring suddenly breaks their code.

GHC’s fusion stuff has the virtue of using the language itself to handle the dirty work of looking just like regular Haskell. Unfortunately, it also depends on inlining and on rewrite rules (well, one), which don’t warn when they can’t be applied.

So, instead, I believe that we should be using an EDSL. It’s embedded, so it’s easy to use with the host language (both around and inside the DSL). It’s also domain specific, so it should be clear what the language does and doesn’t handle, and, in the worst case, the implementation can warn the user instead of silently reverting to slowness.

The design space is huge, but I think Pipes is a decent point. I haven’t been able to spend even one minute on it since a hackathon during the holidays; I hope someone else will be able to work on Pipes a bit.

### The idea behing Pipes

The simplest fusion schemes attempt to handle cases like

(mapcar f (mapcar g list)) -> (mapcar (compose f g) list)

These cases are simple because each operation has at most one consumer (output) and one producer (input), and produces exactly one output value for each input value. The obvious ways to generalize allow arbitrarily many output values per input values, multiple consumers, or multiple producers.

Most common are transformations that handle an arbitrary number of output
values per input values (or even arbitrary types!). `foldr `

fusion is probably the prime
example. I like to think of these rules as based on a specialised CPS conversion:
instead of consing a list up, producers call their continuations on each item that
would have been in that list.

I’m not sure I can find examples of schemes that handle multiple consumers in the literature. We don’t see them as often in code as multiple producers, and they’re hard to handle with rewrite rules. Still, it’s not hard to treat those cases with a dedicated code generator: just compile a “push” dataflow engine. Such a compiler could easily be extended to allow many outputs (or none) per input.

Multiple producers are much more common, if only because `zipWith `

is a
standard list operation. Stream fusion achieves that, and a lot more: concatenated
and nested streams, mostly. It can also handle functions like `filter `

easily, by
inserting “skip” values in the stream. In fact, it can allow functions to yield an
arbitrary number of output values per input. That’s compatible with what a “pull”
dataflow engine can achieve.

SERIES is somewhere else in the solution space. It allows multiple producers and
consumers, but not, as far as I can tell, more than one output per input (it too uses
“skip” values to implement functions like `filter`

). It achieves that by compiling to a
loop that advances the state of each node in the dataflow graph exactly once per
iteration. Thus, the amount of implicit buffering is constant (one output value for
each node), and the loop’s body is generated by a simple traversal of the
graph.

Pipes is mostly a “push” engine that handles multiple consumers, but uses a type
system to recognize cases similar to what SERIES handles, and then allows multiple
producers as well. It’s a different design choice than stream fusion, but I believe
that it’s essential to allow multiple consumers instead of forcing users to
build temporary values *and then traverse them multiple times*. Like SERIES,
it compiles to a loop whose body corresponds to a traversal of the graph.
However, instead of advancing each node exactly once per iteration, some
subgraphs can advance multiple times per iteration, or even compile to nested
loops.

I’ve been trying to find a good tradeoff for almost 5 years now, and it I feels like there’s a rule here, something like “arbitrary outputs per input, multiple consumers, multiple producers: choose two.” Stream fusion chooses multiple producers and arbitrary output, SERIES multiple producers and consumers, and Pipes multiple consumers and arbitrary output, plus a dash of multiple producers in certain cases. The fact that it almost manages to get that third feature is what I originally found exciting about that design.

### That’s it for now

Frankly, I’m pretty much rediscovering my own code, but I remember thinking that it was almost ready for people to start playing with it. Again, it can be downloaded at https://github.com/pkhuong/Pipes.

Here’s a simple example:

CL-USER> (lambda (x y) (declare (type (simple-array double-float 1) x y) (optimize speed (sb-c::insert-array-bounds-checks 0))) (pipes (let* ((x (- (from-vector x) (from-vector y))) ; x is bound to the difference at each iteration (_ sum-x (scan + x 0d0 double-float)) ; sum-x is bound to the final sum at the end of ; the loop (_ sum-x^2 (scan + (* x x) 0d0 double-float)) (_ min (scan min x double-float-positive-infinity double-float)) (_ max (scan max x double-float-negative-infinity double-float)))) (values (length x) sum-x sum-x^2 min max))) #<FUNCTION (LAMBDA (X Y)) {1004B7BAB9}> CL-USER> (funcall * (map-into (make-array 10 :element-type ’double-float) (let ((x 0d0)) (lambda () (incf x)))) (map-into (make-array 10 :element-type ’double-float) (let ((x 0d0)) (lambda () (decf x))))) 10 110.0d0 1540.0d0 2.0d0 20.0d0 CL-USER> (disassemble **) ; disassembly for (LAMBDA (X Y)) ; 04125107: 488B4AF9 MOV RCX, [RDX-7] ; no-arg-parsing entry point ; 10B: 488B5FF9 MOV RBX, [RDI-7] ; 10F: 4C8BC3 MOV R8, RBX ; 112: 488BF1 MOV RSI, RCX ; 115: 4839D9 CMP RCX, RBX ; 118: 488BCE MOV RCX, RSI ; 11B: 490F4FC8 CMOVNLE RCX, R8 ; find the min length ; 11F: 488BD9 MOV RBX, RCX ; 122: 31C9 XOR ECX, ECX ; 124: 660F57E4 XORPD XMM4, XMM4 ; 128: 660F57ED XORPD XMM5, XMM5 ; 12C: F20F101554020000 MOVSD XMM2, [RIP+596] ; 134: F20F101D54020000 MOVSD XMM3, [RIP+596] ; 13C: EB46 JMP L3 ; 13E: 90 NOP ; 13F: 90 NOP ; 140: L0: F20F104C0A01 MOVSD XMM1, [RDX+RCX+1] ; 146: F20F10740F01 MOVSD XMM6, [RDI+RCX+1] ; 14C: F20F5CCE SUBSD XMM1, XMM6 ; 150: F20F58E1 ADDSD XMM4, XMM1 ; 154: 660F28F1 MOVAPD XMM6, XMM1 ; 158: F20F59F1 MULSD XMM6, XMM1 ; 15C: F20F58EE ADDSD XMM5, XMM6 ; 160: 660F2FD1 COMISD XMM2, XMM1 ; 164: 0F8A52010000 JP L14 ; 16A: 0F834C010000 JNB L14 ; rest of cmp/movapd ; 170: L1: 660F2FD9 COMISD XMM3, XMM1 ; 174: 0F8A2D010000 JP L12 ; 17A: 0F8627010000 JBE L12 ; same ; 180: L2: 4883C108 ADD RCX, 8 ; 184: L3: 4839D9 CMP RCX, RBX ; 187: 7CB7 JL L0 [...]

This subtracts two vectors element-wise, and returns the sum of the differences, the sum of the squared differences, and the minimum and maximum differences. It’s definitely not the best example, as it doesn’t explicitly exploit the freedom to compute many outputs per input, but it’s something that I’ve had to write by hand before.

If you want to play with Pipes and have questions, ping me on `#lisp`

.

posted at: 21:20 | /Lisp/Pipes | permalink