Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(draft) Eio.Net.connect: optionally bind before connect #713

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib_eio/mock/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ module Impl = struct
Switch.on_release sw (fun () -> Eio.Resource.close socket);
socket

let connect t ~sw addr =
let connect t ~sw ~options:_ addr =
traceln "%s: connect to %a" t.label Eio.Net.Sockaddr.pp addr;
let socket = Handler.run t.on_connect in
Switch.on_release sw (fun () -> Eio.Flow.close socket);
Expand Down
12 changes: 9 additions & 3 deletions lib_eio/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ type 'tag ty = [`Network | `Platform of 'tag]
type 'a t = 'a r
constraint 'a = [> [> `Generic] ty]

type option = ..
type option +=
| Source_addr of Sockaddr.stream
| Reuse_addr
| Reuse_port

module Pi = struct
module type STREAM_SOCKET = sig
type tag
Expand Down Expand Up @@ -235,7 +241,7 @@ module Pi = struct
type tag

val listen : t -> reuse_addr:bool -> reuse_port:bool -> backlog:int -> sw:Switch.t -> Sockaddr.stream -> tag listening_socket_ty r
val connect : t -> sw:Switch.t -> Sockaddr.stream -> tag stream_socket_ty r
val connect : t -> sw:Switch.t -> options:option list -> Sockaddr.stream -> tag stream_socket_ty r
val datagram_socket :
t
-> reuse_addr:bool
Expand Down Expand Up @@ -295,10 +301,10 @@ let listen (type tag) ?(reuse_addr=false) ?(reuse_port=false) ~backlog ~sw (t:[>
let module X = (val (Resource.get ops Pi.Network)) in
X.listen t ~reuse_addr ~reuse_port ~backlog ~sw

let connect (type tag) ~sw (t:[> tag ty] r) addr =
let connect (type tag) ~sw ?(options = []) (t:[> tag ty] r) addr =
let (Resource.T (t, ops)) = t in
let module X = (val (Resource.get ops Pi.Network)) in
try X.connect t ~sw addr
try X.connect t ~sw ~options addr
with Exn.Io _ as ex ->
let bt = Printexc.get_raw_backtrace () in
Exn.reraise_with_context ex bt "connecting to %a" Sockaddr.pp addr
Expand Down
10 changes: 8 additions & 2 deletions lib_eio/net.mli
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,13 @@ type 'a t = 'a r

(** {2 Out-bound Connections} *)

val connect : sw:Switch.t -> [> 'tag ty] t -> Sockaddr.stream -> 'tag stream_socket_ty r
type option = ..
type option +=
| Source_addr of Sockaddr.stream
| Reuse_addr
| Reuse_port

val connect : sw:Switch.t -> ?options:option list -> [> 'tag ty] t -> Sockaddr.stream -> 'tag stream_socket_ty r
(** [connect ~sw t addr] is a new socket connected to remote address [addr].

The new socket will be closed when [sw] finishes, unless closed manually first. *)
Expand Down Expand Up @@ -346,7 +352,7 @@ module Pi : sig
t -> reuse_addr:bool -> reuse_port:bool -> backlog:int -> sw:Switch.t ->
Sockaddr.stream -> tag listening_socket_ty r

val connect : t -> sw:Switch.t -> Sockaddr.stream -> tag stream_socket_ty r
val connect : t -> sw:Switch.t -> options:option list -> Sockaddr.stream -> tag stream_socket_ty r

val datagram_socket :
t
Expand Down
12 changes: 12 additions & 0 deletions lib_eio/unix/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,15 @@ let socketpair_datagram ~sw ?(domain=Unix.PF_UNIX) ?(protocol=0) () =

let fd socket =
Option.get (Resource.fd_opt socket)

let apply_option fd = function
| Eio.Net.Source_addr addr ->
Unix.bind fd (sockaddr_to_unix addr)
| Eio.Net.Reuse_addr ->
Unix.setsockopt fd Unix.SO_REUSEADDR true
| Eio.Net.Reuse_port ->
Unix.setsockopt fd Unix.SO_REUSEPORT true
| _ ->
invalid_arg "Unknown Eio.Net.option"

let configure options fd = List.iter (apply_option fd) options
3 changes: 3 additions & 0 deletions lib_eio/unix/net.mli
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ val socketpair_datagram :
val getnameinfo : Eio.Net.Sockaddr.t -> (string * string)
(** [getnameinfo sockaddr] returns domain name and service for [sockaddr]. *)

val configure : Eio.Net.option list -> Unix.file_descr -> unit
(** [configure options fd] prepare the socket with the chosen options. *)

type _ Effect.t +=
| Import_socket_stream :
Switch.t * bool * Unix.file_descr -> [`Unix_fd | stream_socket_ty] r Effect.t (** See {!import_socket_stream} *)
Expand Down
7 changes: 4 additions & 3 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,11 @@ let socket_domain_of = function
~v4:(fun _ -> Unix.PF_INET)
~v6:(fun _ -> Unix.PF_INET6)

let connect ~sw connect_addr =
let connect ~sw ~options connect_addr =
let addr = Eio_unix.Net.sockaddr_to_unix connect_addr in
let sock_unix = Unix.socket ~cloexec:true (socket_domain_of connect_addr) Unix.SOCK_STREAM 0 in
let sock = Fd.of_unix ~sw ~seekable:false ~close_unix:true sock_unix in
Low_level.connect sock addr;
Low_level.connect sock ~options addr;
(flow sock :> _ Eio_unix.Net.stream_socket)

module Impl = struct
Expand Down Expand Up @@ -289,7 +289,8 @@ module Impl = struct
Unix.listen sock_unix backlog;
(listening_socket sock :> _ Eio.Net.listening_socket_ty r)

let connect () ~sw addr = (connect ~sw addr :> [`Generic | `Unix] Eio.Net.stream_socket_ty r)
let connect () ~sw ~options addr =
(connect ~sw ~options addr :> [`Generic | `Unix] Eio.Net.stream_socket_ty r)

let datagram_socket () ~reuse_addr ~reuse_port ~sw saddr =
if reuse_addr then (
Expand Down
3 changes: 2 additions & 1 deletion lib_eio_linux/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,9 @@ let splice src ~dst ~len =
else if res = 0 then raise End_of_file
else raise @@ Err.wrap (Uring.error_of_errno res) "splice" ""

let connect fd addr =
let connect fd ~options addr =
Fd.use_exn "connect" fd @@ fun fd ->
Eio_unix.Net.configure options fd ;
let res = Sched.enter "connect" (enqueue_connect fd addr) in
if res < 0 then (
let ex =
Expand Down
5 changes: 3 additions & 2 deletions lib_eio_linux/low_level.mli
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ val splice : fd -> dst:fd -> len:int -> int
@raise End_of_file [src] is at the end of the file.
@raise Unix.Unix_error(EINVAL, "splice", _) if splice is not supported for these FDs. *)

val connect : fd -> Unix.sockaddr -> unit
(** [connect fd addr] attempts to connect socket [fd] to [addr]. *)
val connect : fd -> options:Eio.Net.option list -> Unix.sockaddr -> unit
(** [connect fd ~options addr] attempts to connect socket [fd] to [addr]
after configuring the socket with [options]. *)

val await_readable : fd -> unit
(** [await_readable fd] blocks until [fd] is readable (or has an error). *)
Expand Down
11 changes: 7 additions & 4 deletions lib_eio_posix/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,18 @@ let socket ~sw socket_domain socket_type protocol =
Unix.set_nonblock sock_unix;
Fd.of_unix ~sw ~blocking:false ~close_unix:true sock_unix

let connect fd addr =
let connect fd ~options addr =
try
Fd.use_exn "connect" fd (fun fd -> Unix.connect fd addr)
Fd.use_exn "connect" fd @@ fun fd ->
Eio_unix.Net.configure options fd ;
Unix.connect fd addr
with
| Unix.Unix_error ((EINTR | EAGAIN | EWOULDBLOCK | EINPROGRESS), _, _) ->
await_writable "connect" fd;
match Fd.use_exn "connect" fd Unix.getsockopt_error with
(match Fd.use_exn "connect" fd Unix.getsockopt_error with
| None -> ()
| Some code -> raise (Err.wrap code "connect-in-progress" "")
| Some code -> raise (Err.wrap code "connect-in-progress" ""))
| Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg)

let accept ~sw sock =
Fd.use_exn "accept" sock @@ fun sock ->
Expand Down
2 changes: 1 addition & 1 deletion lib_eio_posix/low_level.mli
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ val read : fd -> bytes -> int -> int -> int
val write : fd -> bytes -> int -> int -> int

val socket : sw:Switch.t -> Unix.socket_domain -> Unix.socket_type -> int -> fd
val connect : fd -> Unix.sockaddr -> unit
val connect : fd -> options:Eio.Net.option list -> Unix.sockaddr -> unit
val accept : sw:Switch.t -> fd -> fd * Unix.sockaddr

val shutdown : fd -> Unix.shutdown_command -> unit
Expand Down
8 changes: 4 additions & 4 deletions lib_eio_posix/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ let listen ~reuse_addr ~reuse_port ~backlog ~sw (listen_addr : Eio.Net.Sockaddr.
);
(listening_socket ~hook sock :> _ Eio.Net.listening_socket_ty r)

let connect ~sw connect_addr =
let connect ~sw ~options connect_addr =
let socket_type, addr =
match connect_addr with
| `Unix path -> Unix.SOCK_STREAM, Unix.ADDR_UNIX path
Expand All @@ -148,7 +148,7 @@ let connect ~sw connect_addr =
in
let sock = Low_level.socket ~sw (socket_domain_of connect_addr) socket_type 0 in
try
Low_level.connect sock addr;
Low_level.connect sock ~options addr;
(Flow.of_fd sock :> _ Eio_unix.Net.stream_socket)
with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg)

Expand All @@ -174,8 +174,8 @@ module Impl = struct

let listen () = listen

let connect () ~sw addr =
let socket = connect ~sw addr in
let connect () ~sw ~options addr =
let socket = connect ~sw ~options addr in
(socket :> [`Generic | `Unix] Eio.Net.stream_socket_ty r)

let datagram_socket () ~reuse_addr ~reuse_port ~sw saddr =
Expand Down
7 changes: 5 additions & 2 deletions lib_eio_windows/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,18 @@ let socket ~sw socket_domain socket_type protocol =
Unix.set_nonblock sock_unix;
Fd.of_unix ~sw ~blocking:false ~close_unix:true sock_unix

let connect fd addr =
let connect fd ~options addr =
try
Fd.use_exn "connect" fd (fun fd -> Unix.connect fd addr)
Fd.use_exn "connect" fd @@ fun fd ->
Eio_unix.Net.configure options fd ;
Unix.connect fd addr
with
| Unix.Unix_error ((EINTR | EAGAIN | EWOULDBLOCK | EINPROGRESS), _, _) ->
await_writable fd;
match Fd.use_exn "connect" fd Unix.getsockopt_error with
| None -> ()
| Some code -> raise (Err.wrap code "connect-in-progress" "")
| Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg)

let accept ~sw sock =
Fd.use_exn "accept" sock @@ fun sock ->
Expand Down
2 changes: 1 addition & 1 deletion lib_eio_windows/low_level.mli
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ val read_cstruct : fd -> Cstruct.t -> int
val write : fd -> bytes -> int -> int -> int

val socket : sw:Switch.t -> Unix.socket_domain -> Unix.socket_type -> int -> fd
val connect : fd -> Unix.sockaddr -> unit
val connect : fd -> options:Eio.Net.option list -> Unix.sockaddr -> unit
val accept : sw:Switch.t -> fd -> fd * Unix.sockaddr

val shutdown : fd -> Unix.shutdown_command -> unit
Expand Down
4 changes: 2 additions & 2 deletions lib_eio_windows/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ let listen ~reuse_addr ~reuse_port ~backlog ~sw (listen_addr : Eio.Net.Sockaddr.
);
(listening_socket ~hook sock :> _ Eio.Net.listening_socket_ty r)

let connect ~sw connect_addr =
let connect ~sw ~options connect_addr =
let socket_type, addr =
match connect_addr with
| `Unix path -> Unix.SOCK_STREAM, Unix.ADDR_UNIX path
Expand All @@ -152,7 +152,7 @@ let connect ~sw connect_addr =
in
let sock = Low_level.socket ~sw (socket_domain_of connect_addr) socket_type 0 in
try
Low_level.connect sock addr;
Low_level.connect sock ~options addr;
(Flow.of_fd sock :> _ Eio_unix.Net.stream_socket)
with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg)

Expand Down