package rpc_parallel

  1. Overview
  2. Docs
Legend:
Library
Module
Module type
Parameter
Class
Class type
type worker
type worker_state = Worker_state.t
type connection_state = Connection_state.t
val create_rpc : ?name:string -> f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Deferred.t) -> bin_input:'query Core.Bin_prot.Type_class.t -> bin_output:'response Core.Bin_prot.Type_class.t -> unit -> (worker, 'query, 'response) Function.t

create_rpc ?name ~f ~bin_input ~bin_output () will create an Rpc.Rpc.t with name if specified and use f as an implementation for this Rpc. It returns back a _function, a type-safe Rpc protocol.

val create_pipe : ?name:string -> f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Pipe.Reader.t Async.Deferred.t) -> bin_input:'query Core.Bin_prot.Type_class.t -> bin_output:'response Core.Bin_prot.Type_class.t -> unit -> (worker, 'query, 'response Async.Pipe.Reader.t) Function.t

create_pipe ?name ~f ~bin_input ~bin_output () will create an Rpc.Pipe_rpc.t with name if specified. The implementation for this Rpc is a function that creates a Pipe.Reader.t and a Pipe.Writer.t, then calls f arg ~writer and returns the reader.

Notice that aborted is not exposed. The pipe is closed upon aborted.

val create_direct_pipe : ?name:string -> f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Rpc.Pipe_rpc.Direct_stream_writer.t -> unit Async.Deferred.t) -> bin_input:'query Core.Bin_prot.Type_class.t -> bin_output:'response Core.Bin_prot.Type_class.t -> unit -> (worker, 'query, 'response) Function.Direct_pipe.t

create_direct_pipe ?name ~f ~bin_input ~bin_output () will create an Rpc.Pipe_rpc.t with name if specified.

val create_one_way : ?name:string -> f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> unit) -> bin_input:'query Core.Bin_prot.Type_class.t -> unit -> (worker, 'query, unit) Function.t

create_one_way ?name ~f ~bin_msg () will create an Rpc.One_way.t with name if specified and use f as an implementation.

val create_reverse_pipe : ?name:string -> f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'update Async.Pipe.Reader.t -> 'response Async.Deferred.t) -> bin_query:'query Core.Bin_prot.Type_class.t -> bin_update:'update Core.Bin_prot.Type_class.t -> bin_response:'response Core.Bin_prot.Type_class.t -> unit -> (worker, 'query * 'update Async.Pipe.Reader.t, 'response) Function.t

create_reverse_pipe ?name ~f ~bin_query ~bin_update ~bin_response () generates a function allowing you to send a query and a pipe of updates to a worker. The worker will send back a response. It is up to you whether to send a response before or after finishing with the pipe; Rpc_parallel doesn't care.

val create_reverse_direct_pipe : ?name:string -> f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'update Async.Pipe.Reader.t -> 'response Async.Deferred.t) -> bin_query:'query Core.Bin_prot.Type_class.t -> bin_update:'update Core.Bin_prot.Type_class.t -> bin_response:'response Core.Bin_prot.Type_class.t -> unit -> (worker, 'query * ('update Async.Rpc.Pipe_rpc.Direct_stream_writer.t -> unit Core.Or_error.t Async.Deferred.t), 'response) Function.t

create_reverse_pipe ?name ~f ~bin_query ~bin_update ~bin_response () generates a function allowing you to send a query and a direct stream of updates to a worker. The worker will send back a response. It is up to you whether to send a response before or after finishing with the pipe; Rpc_parallel doesn't care.

val of_async_rpc : f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Deferred.t) -> ('query, 'response) Async.Rpc.Rpc.t -> (worker, 'query, 'response) Function.t

of_async_rpc ~f rpc is the analog to create_rpc but instead of creating an Rpc protocol, it uses the supplied one

val of_async_pipe_rpc : f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Pipe.Reader.t Async.Deferred.t) -> ('query, 'response, Core.Error.t) Async.Rpc.Pipe_rpc.t -> (worker, 'query, 'response Async.Pipe.Reader.t) Function.t

of_async_pipe_rpc ~f rpc is the analog to create_pipe but instead of creating a Pipe rpc protocol, it uses the supplied one.

Notice that aborted is not exposed. The pipe is closed upon aborted.

val of_async_direct_pipe_rpc : f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> 'response Async.Rpc.Pipe_rpc.Direct_stream_writer.t -> unit Async.Deferred.t) -> ('query, 'response, Core.Error.t) Async.Rpc.Pipe_rpc.t -> (worker, 'query, 'response) Function.Direct_pipe.t

of_async_direct_pipe_rpc ~f rpc is the analog to create_direct_pipe but instead of creating a Pipe rpc protocol, it uses the supplied one.

val of_async_one_way_rpc : f: (worker_state:worker_state -> conn_state:connection_state -> 'query -> unit) -> 'query Async.Rpc.One_way.t -> (worker, 'query, unit) Function.t

of_async_one_way_rpc ~f rpc is the analog to create_one_way but instead of creating a One_way rpc protocol, it uses the supplied one