package streaming

  1. Overview
  2. Docs

Streaming

Streaming abstractions that combine, transform and reduce large amounts of sequential data efficiently, in constant space and without leaking resources.

Overview

Streaming uses composable stream producers (sources), consumers (sinks) and transformers (flows). The central model that abstracts over them is a Stream.

The following features are provided:

  • Constant memory usage: large or infinite streams can be computed in constant and small space. Buffering of the input is possible when needed.
  • Excellent performance: all models were designed with performance at the core. See benchmarks for detailed comparison with other libraries.
  • Resource safety: resources in effectful streaming pipelines are allocated lazily and released as early as possible. Resources are guaranteed to be terminated even when streams rise exceptions.
  • Flexibility and loose coupling: push-based and pull-based models are implemented to allow efficient zipping, concatenation and implementation decoupled sources, sinks and flows.
  • Streaming notation: build streams and sinks using a convenient comprehension and applicative notations (see examples below).

Modules

Stream Push-based streams with excellent overall performance, safe and lazy resource management and a multitude of operations. Streams can be built with comprehensions.
Source Pull-based producers of values with good performance, safe and lazy resource management and zipping operations.
Sink Consumers of values with excellent performance, safe and lazy resource management and flexible composition operations. Sinks are like first-class folds with early termination!
Flow Transformers of values that form composable streaming pipelines. Excellent for defining source and sink independent value transformations.

For more information on each module consult the entrypoint module Streaming.

Quickstart

Install using opam

The library can be installed with OPAM: opam install streaming. You can run opam info streaming to make sure that the library is installed.

Install using esy

Add "@opam/streaming": "0.1" to dependencies in your package.json file and install the dependencies with esy install. Run esy ls-modules to make sure that the library is installed for your project.

Use with dune

To start using Streaming in your dune project add it to libraries in the dune file.

(executable
  (public_name myexe)
  (libraries streaming))

Open the entrypoint module in your code to start using Streams:

open Streaming

Use in the top-level

Fire up utop or down and run:

# #require "streaming";;
# open Streaming;;
# Stream.(stdin |> stdout);;
We're streaming!<Enter>
We're streaming!
<Ctrl+d>
- : unit = ()

That's it! Scroll down to see some examples or jump into the API documentation.

Examples

Read lines from STDIN

# Stream.stdin
  |> Stream.filter ((<>) "")
  |> Stream.map (fun line -> "You wrote: " ^ line)
  |> Stream.each print_endline

Using stream notation

# let items =
    let* n = Stream.range 1 3 in
    let* c = Stream.of_list ['x'; 'y'] in
    yield (n, c)

# Strea.to_list items
- : (int * char) list = [(1, 'x'); (1, 'y'); (2, 'x'); (2, 'y')])

Using sink notation

Compute the arithmetic mean in a single iteration of the input.

# let mean =
    let open Sink.Syntax in
    let+ total = Sink.sum
    and+ count = Sink.length in
    total / count

# Stream.(iota 20 |> into mean)
- : int = 9

Resource handling

Input and output resources involved in stream processing will have the following two properties:

  • Resources are aquired lazily and only if neeeded. For example, Streaming.Stream.file will only open the file for reading when the stream is being consumed. Similarly, a sink for a file will only open the file when elements are being written into it. Sinks are initialized before sources, when the computation begins.
  • Stream operations do not leak resources. The clean up functions for sources and sinks is guaranteed to be called in the presence of early termination (when not all input is consumed) and in case of exceptions in the streaming pipeline.

The following examples demonstrate these two properties in practice.

Examples

(* Create a source that must not be initialized. *)
# let bomb () =
    Source.make
      ~init:(fun () -> failwith "Boom!")
      ~pull:(fun () -> None)
      ()
val bomb : unit -> 'a source = <fun>

(* Feed it into a stream that terminates early. *)
# bomb ()
  |> Stream.from
  |> Stream.take 0
  |> Stream.to_list
- : 'a list = []

And... nothing! As you can see our "bomb" was never detonated. This is because in streams, sinks (in our case it's to_list combined with take) are checked before for "fullness" before sources are initialized.

In the next example let's look at how streams behave in the presence of exceptions.

# let im_a_source_i_must_not_leak () =
    Source.make
      ~init:(fun () -> `Dangerous_input)
      ~pull:(fun st -> Some ("always blue", st))
      ~stop:(fun `Dangerous_input ->
          print_endline "Stopping input... Phew!")
      ()
val im_a_source_i_must_not_leak : unit -> string source = <fun>

# let im_a_sink_i_must_not_leak () =
    Sink.make
      ~init:(fun () -> `Dangerous_output)
      ~push:(fun `Dangerous_output x -> `Dangerous_output)
      ~stop:(fun `Dangerous_output ->
          print_endline "Stopping output... That was close!")
      ()
val im_a_sink_i_must_not_leak : unit -> ('a, unit) sink = <fun>

# im_a_source_i_must_not_leak ()
  |> Stream.from
  |> Stream.map (fun x -> failwith "Boom!")
  |> Stream.into (im_a_sink_i_must_not_leak ())
Stopping input... Phew!
Stopping output... That was close!
Exception: Failure "Boom!".

Our termination functions run just before the world exploded!

Limitations

The termination functions in streaming are always guaranteed to be called. What is currently not well specified is the state they will be called with. It is possible for a source, for example, to stream lines from multiple files, while seamlessly opening and closing them as the input is read. In normal termination conditions, streaming will correctly call all termination functions with correct states.

The same is not true in situations when exceptions are raised. Currently, when there is an exception, streaming will call the termination function on the first instance of the state, even though it might have changed.

This is not a difficult problem to solve, but a correct implementation has a high performance cost.

In the future, streaming might expose safer stream management functions to help with these situations. For now, it is recommended that sources and sinks implement their stop functions in a way that allows them to close all allocated resources when given only the first state. This can be achieved by aggregating the intermediate states or using refs to allow the initial state to point to the latest sate.

Questions

When should I use streaming?

streaming is a general-purpose streaming library with abstractions meant to be used as a drop-in replacement for concrete sequential data-structures such as lists. It is always a good idea to use a streaming model if you need sequential access to data. For very small collections that do not need to be processed multiple times, using lists is adequate. In all other situations, streams are significantly better in terms of performance and composition.

Streaming abstractions are an excellent choice for stateful producers and consumers that require precise resource management. Consuming elements from a file or a database handler with streams is significantly safer. All models in streaming are lazy (they will only initialise resources when needed) and support prompt termination (the resources will be terminate immediately when they are no longer needed). This is guaranteed even when streaming pipelines raise exceptions.

Finally, streaming encourages implementation of small decoupled sources, sinks and flow that can be reused in a wide spectrum of situations.

How fast are Streams compared to other streaming models?

In short - fast (see for yourself). The Streaming.Stream module was designed to improve the most efficient iteration model currently available for OCaml, which is the so called "internal iterator" with the type ('a -> unit) -> unit. This iterator is related to the commonly available "iter" functions in the standard library.

Streams are a variant of internal iterators that add support for resource safety, early termination and avoid the need for mutations and exceptions in the combinators. In addition to that, they provide a similar performance profile.

What's the difference between Sources and Streams?

Both sources and streams produce values. The main difference is flow control: with sources, consumers of the elements are in charge of control; while with streams, it's the producers who drive the computation. This means that sources should be used for situations where the elements are requested on-demand, while streams are best suited for "reactive" inputs.

In general, streams offer better performance than sources for the most common operations (including concatenation) and offer integration with sinks and flows. On the other hand, sources are easier to create and support zipping.

It is recommended to use sources to define decoupled producers that can be consumed with streams. Any source can become a stream, but the opposite is not so easy.

Why are all streaming types in this library abstract?

Even though the core types in streaming are very simple, stable and have many nice properties, it is possible that the types might change in the future to support new functionality such as backpressure and concurrency. To avoid breaking changes, and until the library reaches the 1.0 milestone, the types are going to be abstract.

If you have a use-case that requires access to the internal type, please open an issue.

How can the net amount of entropy of the universe be massively decreased?

You should ask AC.
let answer () =
  let x = "84726982693273833265833289698432737883857070736773697" ^
          "8843268658465327079823265327769657873787170857632657883876982
  What if there was no space and time?" in
  let open Format in let (%^) = printf in
  let open Streaming in let open Stream in
  let (%)=fun f g x->f (g x) and
  (|/-)=(%^)"\027[1m\027[41m";"\\|/-"in let (//) x = (|/-)
    |> of_string
    |> interpose '\b' |> append '\b'
    |> cycle ~times:(Random.int 10)
    |> append x
    |> each (fun c -> "%c%!" %^ (Unix.sleepf 0.003; c)) in
  from @@ (let pull i =
      if i >= 114 then None
      else Some (String.sub x i 2, i + 2)
    in Source.make ~init:(fun () -> 0) ~pull ())
  |> via (Flow.map (char_of_int%int_of_string))
  |> into (Sink.each (//)); (%^) "\027[0m"

Troubleshooting

  • Incorrect Stream module - if you forget to open Streaming you might start using the Stream module from the standard library. This will result in Unbound value and type mismatch errors.
  • My stream pipeline never terminates - this indicates that one of your producers or transformers produces an infinite stream. Make sure that you are limiting the stream input with operations such as Streaming.Stream.take or Streaming.Stream.drop.

Acknowledgements

This library is based on ideas found in other libraries and research projects such as: Haskell's Pipes and Foldl libraries, Scala's ZIO Streams, Clojure's Transducers and the Iteratees streaming model by Oleg Kiselyov.