Writer is Async's main API for output to a file descriptor. It is the analog of
Each writer has an internal buffer, to which
Writer.write* adds data. Each writer uses an Async cooperative thread that makes
write() system calls to move the data from the writer's buffer to an OS buffer via the file descriptor.
There is no guarantee that the data sync on the other side of the writer can keep up with the rate at which you are writing. If it cannot, the OS buffer will fill up and the writer's cooperative thread will be unable to send any bytes. In that case, calls to
Writer.write* will grow the writer's buffer without bound, as long as your program produces data. One solution to this problem is to call
Writer.flushed and not continue until that becomes determined, which will only happen once the bytes in the writer's buffer have been successfully transferred to the OS buffer. Another solution is to check
Writer.bytes_to_write and not produce any more data if that is beyond some bound.
There are two kinds of errors that one can handle with writers. First, a writer can be
closed, which will cause future
writes (and other operations) to synchronously raise an exception. Second, the writer's cooperative thread can fail due to a
write() system call failing. This will cause an exception to be sent to the writer's monitor, which will be a child of the monitor in effect when the writer is created. One can deal with such asynchronous exceptions in the usual way, by handling the stream returned by
Monitor.detach_and_get_error_stream (Writer.monitor writer).
module Id : Core.Unique_id
module Line_ending : sig ... end
val sexp_of_t : t -> Sexplib0.Sexp.t
include Async_kernel.Invariant.S with type t := t
val invariant : t -> unit
val io_stats : Io_stats.t
Overall IO statistics for all writers.
val stdout : t Core.Lazy.t
stderr are writers for file descriptors 1 and 2. They are lazy because we don't want to create them in all programs that happen to link with Async.
stderr is created, they both are created. Furthermore, if they point to the same inode, then they will be the same writer to
Fd.stdout. This can be confusing, because
fd (force stderr) will be
Fd.stderr. And subsequent modifications of
Fd.stderr will have no effect on
Unfortunately, the sharing is necessary because Async uses OS threads to do
write() syscalls using the writer buffer. When calling a program that redirects stdout and stderr to the same file, as in:
foo.exe >/tmp/z.file 2>&1
Writer.stderr weren't the same writer, then they could have threads simultaneously writing to the same file, which could easily cause data loss.
val stderr : t Core.Lazy.t
type buffer_age_limit = [
`At_most of Time_unix.Span.t
val bin_shape_buffer_age_limit : Core.Bin_prot.Shape.t
val bin_size_buffer_age_limit : buffer_age_limit Core.Bin_prot.Size.sizer
val bin_write_buffer_age_limit : buffer_age_limit Core.Bin_prot.Write.writer
val bin_writer_buffer_age_limit : buffer_age_limit Core.Bin_prot.Type_class.writer
val bin_read_buffer_age_limit : buffer_age_limit Core.Bin_prot.Read.reader
val __bin_read_buffer_age_limit__ : (int -> buffer_age_limit) Core.Bin_prot.Read.reader
val bin_reader_buffer_age_limit : buffer_age_limit Core.Bin_prot.Type_class.reader
val bin_buffer_age_limit : buffer_age_limit Core.Bin_prot.Type_class.t
val sexp_of_buffer_age_limit : buffer_age_limit -> Sexplib0.Sexp.t
val buffer_age_limit_of_sexp : Sexplib0.Sexp.t -> buffer_age_limit
val __buffer_age_limit_of_sexp__ : Sexplib0.Sexp.t -> buffer_age_limit
val create : ?buf_len:int -> ?syscall:[ `Per_cycle | `Periodic of Time_unix.Span.t ] -> ?buffer_age_limit:buffer_age_limit -> ?raise_when_consumer_leaves:bool -> ?line_ending:Line_ending.t -> ?time_source:[> Core.read ] Async_kernel.Time_source.T1.t -> Fd.t -> t
create ?buf_len ?syscall ?buffer_age_limit fd creates a new writer. The file descriptor
fd should not be in use for writing by anything else.
By default, a write system call occurs at the end of a cycle in which bytes were written. One can supply
~syscall:(`Periodic span) to get better performance. This batches writes together, doing the write system call periodically according to the supplied span.
A writer can asynchronously fail if the underlying write syscall returns an error, e.g.,
buffer_age_limit specifies how backed up you can get before raising an exception. The default is
`Unlimited for files, and 2 minutes for other kinds of file descriptors. You can supply
`Unlimited to turn off buffer-age checks.
raise_when_consumer_leaves specifies whether the writer should raise an exception when the consumer receiving bytes from the writer leaves, i.e., in Unix, the write syscall returns
not raise_when_consumer_leaves, then the writer will silently drop all writes after the consumer leaves, and the writer will eventually fail with a writer-buffer-older-than error if the application remains open long enough.
line_ending determines how
write_line terminate lines by default. If
line_ending = Unix then end of line is
line_ending = Dos then end of line is
"\r\n". Note that
line_ending = Dos is not equivalent to opening the file in text mode because any "\n" characters being printed by other means (e.g.,
"\n") are still written verbatim (in Unix style).
time_source is useful in tests to trigger
buffer_age_limit-related conditions, or simply to have the result of (for example)
flushed_time_ns agree with your test's synthetic time. It is also used to schedule the
val raise_when_consumer_leaves : t -> bool
val set_raise_when_consumer_leaves : t -> bool -> unit
set_raise_when_consumer_leaves t bool sets the
raise_when_consumer_leaves flag of
t, which determies how
t responds to a write system call raising
val set_buffer_age_limit : t -> buffer_age_limit -> unit
set_buffer_age_limit t buffer_age_limit replaces the existing buffer age limit with the new one. This is useful for stdout and stderr, which are lazily created in a context that does not allow applications to specify
val consumer_left : t -> unit Async_kernel.Deferred.t
consumer_left t returns a deferred that becomes determined when
t attempts to write to a pipe that broke because the consumer on the other side left.
val of_out_channel : Core.Out_channel.t -> Fd.Kind.t -> t
val open_file : ?info:Core.Info.t -> ?append:bool -> ?buf_len:int -> ?syscall:[ `Per_cycle | `Periodic of Time_unix.Span.t ] -> ?perm:int -> ?line_ending:Line_ending.t -> ?time_source:[> Core.read ] Async_kernel.Time_source.T1.t -> string -> t Async_kernel.Deferred.t
open_file file opens
file for writing and returns a writer for it. It uses
Unix_syscalls.openfile to open the file.
val with_file : ?perm:int -> ?append:bool -> ?syscall:[ `Per_cycle | `Periodic of Time_unix.Span.t ] -> ?exclusive:bool -> ?line_ending:Line_ending.t -> ?time_source:[> Core.read ] Async_kernel.Time_source.T1.t -> string -> f:(t -> 'a Async_kernel.Deferred.t) -> 'a Async_kernel.Deferred.t
with_file ~file f opens
file for writing, creates a writer
t, and runs
f t to obtain a deferred
d becomes determined, the writer is closed. When the close completes, the result of
with_file becomes determined with the value of
There is no need to call
Writer.flushed to ensure that
with_file waits for the writer to be flushed before closing it.
Writer.close will already wait for the flush.
exclusive = true uses a filesystem lock to try and make sure that the file is not modified during a concurrent read or write operation. This is an advisory lock, which means that the reader must be cooperating by taking a relevant lock when writing (see
Reader.with_file). This is unrelated and should not be confused with the
O_EXCL flag in
open systemcall. Note that the implementation uses
Unix.lockf, which has known pitfalls. It's recommended that you avoid the
exclusive flag in favor of using a library dedicated to dealing with file locks where the pitfalls can be documented in detail.
val set_fd : t -> Fd.t -> unit Async_kernel.Deferred.t
set_fd t fd sets the
fd used by
t for its underlying system calls. It first waits until everything being sent to the current
fd is flushed. Of course, one must understand how the writer works and what one is doing to use this.
val write_gen : ?pos:int -> ?len:int -> t -> 'a -> blit_to_bigstring:('a, Core.Bigstring.t) Core.Blit.blit -> length:('a -> int) -> unit
write_gen t a writes
a to writer
length specifying the number of bytes needed and
a directly into the
t's buffer. If one has a type that has
blit_to_bigstring functions, like:
module A : sig type t val length : t -> int val blit_to_bigstring : (t, Bigstring.t) Blit.blit end
then one can use
write_gen to implement a custom analog of
module Write_a : sig val write : ?pos:int -> ?len:int -> A.t -> Writer.t -> unit end = struct let write ?pos ?len a writer = Writer.write_gen ~length:A.length ~blit_to_bigstring:A.blit_to_bigstring ?pos ?len writer a end
In some cases it may be difficult to write only part of a value:
module B : sig type t val length : t -> int val blit_to_bigstring : t -> Bigstring.t -> pos:int -> unit end
In these cases, use
write_gen_whole instead. It never requires writing only part of a value, although it is potentially less space-efficient. It may waste portions of previously-allocated write buffers if they are too small.
module Write_b : sig val write : B.t -> Writer.t -> unit end = struct let write b writer = Writer.write_gen_whole ~length:B.length ~blit_to_bigstring:B.blit_to_bigstring writer b end
write_gen_whole give you access to the writer's internal buffer. You should not capture it; doing so might lead to errors of the segfault kind.
val write_gen_whole : t -> 'a -> blit_to_bigstring:('a -> Core.Bigstring.t -> pos:int -> unit) -> length:('a -> int) -> unit
val write_direct : t -> f:(Core.Bigstring.t -> pos:int -> len:int -> 'a * int) -> 'a option
write_direct t ~f gives
t's internal buffer to
len define the portion of the buffer that can be filled.
f must return a pair
(x, written) where
written is the number of bytes written to the buffer at
write_direct raises if
written < 0 || written > len.
Some x, or
None if the writer is stopped. By using
write_direct only, one can ensure that the writer's internal buffer never grows. Look at the
write_direct expect tests for an example of how this can be used to construct a
write_string like function that never grows the internal buffer.
val write_bytes : ?pos:int -> ?len:int -> t -> Core.Bytes.t -> unit
write ?pos ?len t s adds a job to the writer's queue of pending writes. The contents of the string are copied to an internal buffer before
write returns, so clients can do whatever they want with
s after that.
val write : ?pos:int -> ?len:int -> t -> string -> unit
val write_bigstring : ?pos:int -> ?len:int -> t -> Core.Bigstring.t -> unit
val write_substring : t -> Core.Substring.t -> unit
val write_bigsubstring : t -> Core.Bigsubstring.t -> unit
val writef : t -> ('a, unit, string, unit) Core.format4 -> 'a
val to_formatter : t -> Format.formatter
to_formatter returns an OCaml-formatter that one can print to using
Format.fprintf. Note that flushing the formatter will only submit all buffered data to the writer, but does not guarantee flushing to the operating system.
val write_char : t -> char -> unit
write_char t c writes the character.
val newline : ?line_ending:Line_ending.t -> t -> unit
newline t writes the end-of-line terminator.
line_ending can override
val write_line : ?line_ending:Line_ending.t -> t -> string -> unit
write_line t s ?line_ending is
write t s; newline t ?line_ending.
val write_byte : t -> int -> unit
write_byte t i writes one 8-bit integer (as the single character with that code). The given integer is taken modulo 256.
module Terminate_with : sig ... end
val write_sexp : ?hum:bool -> ?terminate_with:Terminate_with.t -> t -> Core.Sexp.t -> unit
write_sexp t sexp writes to
t the string representation of
sexp, possibly followed by a terminating character as per
~terminate_with:Newline, the terminating character is a newline. With
~terminate_with:Space_if_needed, if a space is needed to ensure that the sexp reader knows that it has reached the end of the sexp, then the terminating character will be a space; otherwise, no terminating character is added. A terminating space is needed if the string representation doesn't end in
val write_bin_prot : t -> 'a Core.Bin_prot.Type_class.writer -> 'a -> unit
write_bin_prot writes out a value using its bin_prot sizer/writer pair. The format is the "size-prefixed binary protocol", in which the length of the data is written before the data itself. This is the format that
val write_bin_prot_no_size_header : t -> size:int -> 'a Core.Bin_prot.Write.writer -> 'a -> unit
Writes out a value using its bin_prot writer. Unlike
write_bin_prot, this doesn't prefix the output with the size of the bin_prot blob.
size is the expected size. This function will raise if the bin_prot writer writes an amount other than
write_ functions, all functions starting with
schedule_ require flushing or closing of the writer after returning before it is safe to modify the bigstrings which were directly or indirectly passed to these functions. The reason is that these bigstrings will be read from directly when writing; their contents is not copied to internal buffers.
This is important if users need to send the same large data string to a huge number of clients simultaneously (e.g., on a cluster), because these functions then avoid needlessly exhausting memory by sharing the data.
val schedule_bigstring : t -> ?pos:int -> ?len:int -> Core.Bigstring.t -> unit
schedule_bigstring t bstr schedules a write of bigstring
bstr. It is not safe to change the bigstring until the writer has been successfully flushed or closed after this operation.
val schedule_bigsubstring : t -> Core.Bigsubstring.t -> unit
schedule_iobuf_peek is like
schedule_bigstring, but for an iobuf. It is not safe to change the iobuf until the writer has been successfully flushed or closed after this operation.
val schedule_iobuf_consume : t -> ?len:int -> ([> Core.read ], Iobuf.seek) Iobuf.t -> unit Async_kernel.Deferred.t
schedule_iobuf_consume is like
schedule_iobuf_peek. Once the result is determined, the iobuf will be fully consumed (or advanced by
min len (Iobuf.length iobuf) if
len is specified), and the writer will be flushed.
module Destroy_or_keep : sig ... end
val schedule_iovec : ?destroy_or_keep:Destroy_or_keep.t -> t -> Core.Bigstring.t Core_unix.IOVec.t -> unit
schedule_iovec t iovec schedules a write of I/O-vector
iovec. It is not safe to change the bigstrings underlying the I/O-vector until the writer has been successfully flushed or closed after this operation.
val schedule_iovecs : t -> Core.Bigstring.t Core_unix.IOVec.t Core.Queue.t -> unit
schedule_iovecs t iovecs like
schedule_iovec, but takes a whole queue
iovecs of I/O-vectors as argument. The queue is guaranteed to be empty when this function returns and can be modified. It is not safe to change the bigstrings underlying the I/O-vectors until the writer has been successfully flushed or closed after this operation.
module Flush_result : sig ... end
val flushed_or_failed_with_result : t -> Flush_result.t Async_kernel.Deferred.t
flushed_or_failed_with_result t returns a deferred that will become determined when all prior writes complete (i.e. the
write() system call returns), or when any of them fail.
Error case can be tricky due to the following race: the result gets determined concurrently with the exception propagation through the writer's monitor. The caller needs to make sure that the program behavior does not depend on which signal propagates first.
val flushed_or_failed_unit : t -> unit Async_kernel.Deferred.t
flushed_or_failed_unit t returns a deferred that will become determined when all prior writes complete, or when any of them fail.
flushed_or_failed_with_result, its return value gives you no indication of which happened. In the
Error case, the result will be determined in parallel with the error propagating to the writer's monitor. The caller should robustly handle either side winning that race.
val flushed : t -> unit Async_kernel.Deferred.t
flushed t returns a deferred that will become determined when all prior writes complete (i.e. the
write() system call returns). If a prior write fails, then the deferred will never become determined.
It is OK to call
flushed t after
t has been closed.
val flushed_time : t -> Time_unix.t Async_kernel.Deferred.t
val flushed_time_ns : t -> Time_ns_unix.t Async_kernel.Deferred.t
val fsync : t -> unit Async_kernel.Deferred.t
fsync t calls
flushed t before calling
Unix.fsync on the underlying file descriptor
val fdatasync : t -> unit Async_kernel.Deferred.t
fdatasync t calls
flushed t before calling
Unix.fdatasync on the underlying file descriptor
val send : t -> string -> unit
send writes a string to the writer that can be read back using
val monitor : t -> Async_kernel.Monitor.t
monitor t returns the writer's monitor.
val close : ?force_close:unit Async_kernel.Deferred.t -> t -> unit Async_kernel.Deferred.t
close ?force_close t waits for the writer to be flushed, and then calls
Unix.close on the underlying file descriptor.
force_close causes the
Unix.close to happen even if the flush hangs. By default
Deferred.never () for files and
after (sec 5) for other types of file descriptors (e.g., sockets). If the close is forced, data in the writer's buffer may not be written to the file descriptor. You can check this by calling
force_close will not reliably stop any write that is in progress. If there are any in-flight system calls, it will wait for them to finish, which includes
writev, which can legitimately block forever.
close will raise an exception if the
Unix.close on the underlying file descriptor fails.
You must call
close on a writer in order to close the underlying file descriptor. Not doing so will cause a file descriptor leak. It also will cause a space leak, because until the writer is closed, it is held on to in order to flush the writer on shutdown.
It is an error to call other operations on
close t has been called, except that calls of
close subsequent to the original call to
close will return the same deferred as the original call.
close_started t becomes determined as soon as
close is called.
close_finished t becomes determined after
t's underlying file descriptor has been closed, i.e., it is the same as the result of
close_finished differs from
close in that it does not have the side effect of initiating a close.
is_closed t returns
close t has been called.
is_open t is
not (is_closed t)
with_close t ~f runs
f (), and closes
f finishes or raises.
val close_started : t -> unit Async_kernel.Deferred.t
val close_finished : t -> unit Async_kernel.Deferred.t
val is_closed : t -> bool
val is_open : t -> bool
val with_close : t -> f:(unit -> 'a Async_kernel.Deferred.t) -> 'a Async_kernel.Deferred.t
val can_write : t -> bool
can_write t returns
true if calls to
write* functions on
t are allowed. If
is_open t then
can_write t. But one can have
is_closed t and
can_write t, during the time after
close t before closing has finished.
val is_stopped_permanently : t -> bool
Errors raised within the writer can stop the background job that flushes out the writer's buffers.
true when the background job has stopped.
stopped_permanently becomes determined when the background job has stopped.
val stopped_permanently : t -> unit Async_kernel.Deferred.t
val with_flushed_at_close : t -> flushed:(unit -> unit Async_kernel.Deferred.t) -> f:(unit -> 'a Async_kernel.Deferred.t) -> 'a Async_kernel.Deferred.t
In addition to flushing its internal buffer prior to closing, a writer keeps track of producers that are feeding it data, so that when
Writer.close is called, it does the following:
- requests that the writer's producers flush their data to it
- flushes the writer's internal buffer
Unix.closeon the writer's underlying file descriptor
with_flushed_at_close t ~flushed ~f calls
f and adds
flushed to the set of producers that should be flushed-at-close, for the duration of
val bytes_to_write : t -> int
bytes_to_write t returns how many bytes have been requested to write but have not yet been written.
val bytes_written : t -> Core.Int63.t
bytes_written t returns how many bytes have been written.
val bytes_received : t -> Core.Int63.t
bytes_received t returns how many bytes have been received by the writer. As long as the writer is running,
bytes_received = bytes_written + bytes_to_write.
val with_file_atomic : ?temp_file:string -> ?perm:Core_unix.file_perm -> ?fsync:bool -> ?replace_special:bool -> ?time_source:[> Core.read ] Async_kernel.Time_source.T1.t -> string -> f:(t -> 'a Async_kernel.Deferred.t) -> 'a Async_kernel.Deferred.t
with_file_atomic ?temp_file ?perm ?fsync ?replace_special file ~f creates a writer to a temp file, feeds that writer to
f, and when the result of
f becomes determined, atomically moves (using
Unix.rename) the temp file to
file currently exists and is a regular file (see below regarding
replace_special) it will be replaced, even if it is read-only.
The temp file will be
temp_file if supplied) suffixed by a unique random sequence of six characters. The temp file will be removed if an exception is raised to the monitor of
f before the result of
f becomes determined. However, if the program exits for some other reason, the temp file may not be cleaned up; so it may be prudent to choose a temp file that can be easily found by cleanup tools.
true, the temp file will be flushed to disk before it takes the place of the target file, thus guaranteeing that the target file will always be in a sound state, even after a machine crash. Since synchronization is extremely slow, this is not the default. Think carefully about the event of machine crashes and whether you may need this option!
false (the default) an existing special
file (block or character device, socket or FIFO) will not be replaced by a regular file, the temporary file is not created and an exception is raised. To explicitly replace an existing special
replace_special must be passed as
true. Note that if
file exists and is a directory, the rename will fail; if
file exists and is a symbolic link, the link will be replaced, not the target (as per
We intend for
with_file_atomic to mimic the behavior of the
open system call, so if
file does not exist, we will apply the current umask to
perm (the effective permissions become
perm land lnot umask, see
man 2 open). However, if
file does exist and
perm is specified, we do something different from
open system call: we override the permission with
perm, ignoring the umask. This means that if you create and then immediately overwrite the file with
with_file_atomic ~perm, then the umask will be honored the first time and ignored the second time. If
perm is not specified, then any existing file permissions are preserved.
f closes the writer passed to it,
with_file_atomic raises and does not create
val save : ?temp_file:string -> ?perm:Core_unix.file_perm -> ?fsync:bool -> ?replace_special:bool -> string -> contents:string -> unit Async_kernel.Deferred.t
save is a special case of
with_file_atomic that atomically writes the given string to the specified file.
val save_lines : ?temp_file:string -> ?perm:Core_unix.file_perm -> ?fsync:bool -> ?replace_special:bool -> string -> string list -> unit Async_kernel.Deferred.t
save_lines file lines writes all lines in
file, with each line followed by a newline.
val save_sexp : ?temp_file:string -> ?perm:Core_unix.file_perm -> ?fsync:bool -> ?replace_special:bool -> ?hum:bool -> string -> Core.Sexp.t -> unit Async_kernel.Deferred.t
save_sexp is a special case of
with_file_atomic that atomically writes the given sexp to the specified file.
save_sexp t sexp writes
t, followed by a newline. To read a file produced using
save_sexp, one would typically use
Reader.load_sexp, which deals with the additional whitespace and works nicely with converting the sexp to a value.
val save_sexps : ?temp_file:string -> ?perm:Core_unix.file_perm -> ?fsync:bool -> ?replace_special:bool -> ?hum:bool -> string -> Core.Sexp.t list -> unit Async_kernel.Deferred.t
save_sexps works similarly to
save_sexp, but saves a sequence of sexps instead, separated by newlines. There is a corresponding
Reader.load_sexps for reading back in.
val save_sexps_conv : ?temp_file:string -> ?perm:int -> ?fsync:bool -> ?replace_special:bool -> ?hum:bool -> string -> 'a list -> ('a -> Core.Sexp.t) -> unit Async_kernel.Deferred.t
save_sexps_conv is like
save_sexps, but converts to sexps internally, one at a time. This avoids allocating the list of sexps up front, which can be costly. The default values of the parameters are the same as
val save_bin_prot : ?temp_file:string -> ?perm:Core_unix.file_perm -> ?fsync:bool -> ?replace_special:bool -> string -> 'a Core.Bin_prot.Type_class.writer -> 'a -> unit Async_kernel.Deferred.t
save_bin_prot t bin_writer 'a is a special case of
with_file_atomic that writes
t using its bin_writer, in the size-prefixed format, like
write_bin_prot. To read a file produced using
save_bin_prot, one would typically use
val transfer' : ?stop:unit Async_kernel.Deferred.t -> ?max_num_values_per_read:int -> t -> 'a Async_kernel.Pipe.Reader.t -> ('a Core.Queue.t -> unit Async_kernel.Deferred.t) -> unit Async_kernel.Deferred.t
transfer' t pipe_r f repeatedly reads values from
pipe_r and feeds them to
f, which should in turn write them to
t. It provides pushback to
pipe_r by not reading when
t cannot keep up with the data being pushed in.
By default, each read from
pipe_r reads all the values in
pipe_r. One can supply
max_num_values_per_read to limit the number of values per read.
transfer' stops and the result becomes determined when
stop becomes determined, when
pipe_r reaches its EOF, when
t is closed, or when
t's consumer leaves. In the latter two cases,
pipe_r's writer to ensure that the bytes have been flushed to
t before returning. It also waits on
Pipe.upstream_flushed at shutdown.
transfer t pipe_r f is equivalent to:
transfer' t pipe_r (fun q -> Queue.iter q ~f; return ())
val transfer : ?stop:unit Async_kernel.Deferred.t -> ?max_num_values_per_read:int -> t -> 'a Async_kernel.Pipe.Reader.t -> ('a -> unit) -> unit Async_kernel.Deferred.t
val pipe : t -> string Async_kernel.Pipe.Writer.t
pipe t returns the writing end of a pipe attached to
t that pushes back when
t cannot keep up with the data being pushed in. Closing the pipe does not close
val of_pipe : ?time_source:[> Core.read ] Async_kernel.Time_source.T1.t -> Core.Info.t -> string Async_kernel.Pipe.Writer.t -> (t * [ `Closed_and_flushed_downstream of unit Async_kernel.Deferred.t ]) Async_kernel.Deferred.t
of_pipe info pipe_w returns a writer
t such that data written to
t will appear on
pipe_w. If either
pipe_w are closed, the other is closed as well.
of_pipe is implemented by attaching
t to the write-end of a Unix pipe, and shuttling bytes from the read-end of the Unix pipe to
val behave_nicely_in_pipeline : ?writers:t list -> unit -> unit
behave_nicely_in_pipeline ~writers () causes the program to silently exit with status 0 if any of the consumers of
writers go away. It also sets the buffer age to unlimited, in case there is a human (e.g., using
less) on the other side of the pipeline.
This can be called at the toplevel of a program, before
Command.run for instance. (this function doesn't start the async scheduler).
val set_synchronous_out_channel : t -> Core.Out_channel.t -> unit Async_kernel.Deferred.t
set_synchronous_out_channel t out_channel waits until
byte_to_write t = 0, and then mutates
t so that all future writes to
t synchronously call
Out_channel.output* functions to send data to the OS immediately.
set_synchronous_out_channel is used by expect tests to ensure that the interleaving between calls to
Core.printf (and similar IO functions) and
Async.printf generates output with the same interleaving.
set_synchronous_out_channel is idempotent.
val using_synchronous_backing_out_channel : t -> bool
using_synchronous_backing_out_channel t = true if writes to
t are being done synchronously, e.g., due to
val clear_synchronous_out_channel : t -> unit
clear_synchronous_out_channel t restores
t to its normal state, with the background writer asynchronously feeding data to the OS.
clear_synchronous_out_channel is idempotent.
val with_synchronous_out_channel : t -> Core.Out_channel.t -> f:(unit -> 'a Async_kernel.Deferred.t) -> 'a Async_kernel.Deferred.t
val use_synchronous_stdout_and_stderr : unit -> unit Async_kernel.Deferred.t
use_synchronous_stdout_and_stderr () causes all subsequent writes to stdout and stderr to occur synchronously (after any pending writes have flushed).
printf-family writes happen immediately, which avoids two common sources of confusion:
- unexpected interleaving of
Async.printfcalls that don't get flushed before an application exits
The disadvantages are:
- this makes writes blocking, which can delay unrelated asynchronous jobs until the consumer stops pushing back; and
- the errors raised by write are different and it won't respect
module Backing_out_channel : sig ... end
Out_channel to a narrow interface that can be used to collect strings, etc.
val set_synchronous_backing_out_channel : t -> Backing_out_channel.t -> unit Async_kernel.Deferred.t
val with_synchronous_backing_out_channel : t -> Backing_out_channel.t -> f:(unit -> 'a Async_kernel.Deferred.t) -> 'a Async_kernel.Deferred.t