update Eio httpstore
zoj613 committed Nov 30, 2024
commit ca93445
Expand Up @@ -136,14 +136,13 @@ end

module HttpStore = struct
exception Not_implemented
exception Request_failed of string
exception Request_failed of int * string

open Cohttp_eio

let raise_status_error s =
let c = Cohttp.Code.code_of_status s in
let msg = Printf.sprintf "%d: %s" c (Cohttp.Code.reason_phrase_of_code c) in
raise (Request_failed msg)
raise (Request_failed (c, Cohttp.Code.reason_phrase_of_code c))

module IO = struct
module Deferred = Deferred
Expand All @@ -160,18 +159,23 @@ module HttpStore = struct
raise (Zarr.Storage.Key_not_found key)
| e -> raise_status_error e

let size t key =
let size t key = try String.length (get t key) with
| Zarr.Storage.Key_not_found _ -> 0

(*let size t key = @@ fun sw ->
let url = Uri.with_path t.base_url key in
let resp = Client.head ~sw t.client url in
match Http.Response.status resp with
| #Http.Status.success ->
begin match Http.Response.content_length resp with
| Some l -> l
| None -> String.length (get t key)
| None ->
try String.length (get t key) with
| Zarr.Storage.Key_not_found _ -> 0
| #Http.Status.client_error as e when e = `Not_found -> Deferred.return 0
| e -> raise_status_error e
| e -> raise_status_error e *)

let is_member t key = if (size t key) = 0 then false else true

Expand All @@ -184,19 +188,19 @@ module HttpStore = struct
let size = String.length data in (read_range ~data ~size) ranges

(*let set t key data =
let set t key data = @@ fun sw ->
let url = Uri.with_path t.base_url key in
let headers = Http.Header.of_list [("Content-Length", string_of_int (String.length data))] in
let body = Body.of_string data in
let resp, _ = Client.put ~sw ~headers ~body t.client url in
let resp, _ = ~sw ~headers ~body t.client url in
match Http.Response.status resp with
| #Http.Status.success -> Deferred.return_unit
| e -> raise_status_error e

let set_partial_values t key ?(append=false) rsv =
let* size = size t key in
let* ov = match size with
let size = size t key in
let ov = match size with
| 0 -> Deferred.return String.empty
| _ -> get t key
Expand All @@ -208,11 +212,15 @@ module HttpStore = struct
Bytes.unsafe_to_string s
set t key (List.fold_left f ov rsv)

let erase t key = @@ fun sw ->
let url = Uri.with_path t.base_url key in
let resp, _ = Client.delete ~sw t.client url in
match Http.Response.status resp with
| #Http.Status.success -> Deferred.return_unit
| e -> raise_status_error e

let set _ = raise Not_implemented
let set_partial_values _ = raise Not_implemented
let erase _ = raise Not_implemented
let erase_prefix _ = raise Not_implemented
let list _ = raise Not_implemented
let list_dir _ = raise Not_implemented
Expand Up @@ -23,7 +23,7 @@ end

module HttpStore : sig
exception Not_implemented
exception Request_failed of string
exception Request_failed of int * string
include Zarr.Storage.STORE with module Deferred = Deferred
val with_open :
net:_ Eio.Net.t ->
Expand Up @@ -152,31 +152,92 @@ let _ =
test_storage (module FilesystemStore) s;
HttpStore.with_open ~net:env#net (Uri.of_string "") (fun store ->
let module S = Tiny_httpd in
let dir_behavior = S.Dir.Forbidden and download = true and delete = true and upload = true in
let config = S.Dir.config ~dir_behavior ~delete ~upload ~download ()
and addr = "" and port = 8080 in
let server = S.create ~max_connections:4 ~addr ~port () in
(*let dir = "/home/zoj/dev/zarr-ml/testdata.zarr" in *)
let dir = Sys.getenv "HTTPSTORE_DIR" in
S.Dir.add_dir_path ~config ~dir ~prefix:"" server;
let server = S.create ~max_connections:1000 ~addr:"" ~port:8080 () in
let dir = tmp_dir in
S.add_route_handler server ~meth:`HEAD S.Route.rest_of_path_urlencoded (fun path _ ->
let fspath = Filename.concat dir path in
match In_channel.(with_open_gen [Open_rdonly] 0o700 fspath length) with
| exception Sys_error e -> S.Response.make_raw ~code:404 e
| s ->
let headers =
[("Content-Length", Int64.to_string s)
;("Content-Type", "application/octet-stream")]
let r = S.Response.make_raw ~code:200 "" in
S.Response.update_headers (List.append headers) r
S.add_route_handler server ~meth:`GET S.Route.rest_of_path_urlencoded (fun path _ ->
let fspath = Filename.concat dir path in
match In_channel.(with_open_gen [Open_rdonly] 0o700 fspath input_all) with
| exception Sys_error _ -> S.Response.make_raw ~code:404 (Printf.sprintf "%s not found" path)
| s -> S.Response.make_raw ~code:200 s
S.add_route_handler server ~meth:`POST S.Route.rest_of_path_urlencoded (fun path req ->
let write oc = Out_channel.(output_string oc req.body; flush oc) in
let fspath = Filename.concat dir path in
Zarr.Util.create_parent_dir fspath 0o700;
let f = [Open_wronly; Open_trunc; Open_creat] in
match Out_channel.(with_open_gen f 0o700 fspath write) with
| exception Sys_error e -> S.Response.make_raw ~code:500 e
| () -> S.Response.make_raw ~code:201 req.body
S.add_route_handler server ~meth:`DELETE S.Route.rest_of_path_urlencoded (fun path _ ->
let fspath = Filename.concat dir path in
match Sys.remove fspath with
| exception Sys_error e -> S.Response.make_raw ~code:404 e
| () -> S.Response.make_raw ~code:200 (Printf.sprintf "%s deleted successfully" path)
let _ = Thread.create server in
let gnode = Node.Group.of_path "/some/group" in
let gnode = Node.Group.root in
let attrs = `Assoc [("questions", `String "answer")] in
HttpStore.Group.create ~attrs store gnode;
let exists = HttpStore.Group.exists store gnode in
assert_equal ~printer:string_of_bool true exists;
let meta = HttpStore.Group.metadata store gnode in
assert_equal Metadata.Group.default meta;
let anode = Node.Array.of_path "/some/group/another" in
assert_equal attrs (Metadata.Group.attributes meta);
let exists = HttpStore.Array.exists store Node.Array.(gnode / "non-member") in
assert_equal ~printer:string_of_bool false exists;

let cfg =
{chunk_shape = [|2; 5; 5|]
;index_location = End
;index_codecs = [`Bytes LE]
;codecs = [`Transpose [|2; 0; 1|]; `Bytes BE]} in
let anode = Node.Array.(gnode / "arrnode") in
let slice = [|R [|0; 5|]; I 10; R [|0; 10|]|] in
let _ = store anode slice Complex32 in
assert_raises (HttpStore.Not_implemented) (fun () -> HttpStore.hierarchy store);
assert_raises (HttpStore.Not_implemented) (fun () -> HttpStore.Group.create store (Node.Group.of_path "/blah"));
(*assert_raises (HttpStore.Not_implemented) (fun () -> HttpStore.Group.children store gnode);
assert_raises (HttpStore.Not_implemented) (fun () -> HttpStore.Array.rename store anode "newname"); *)
(*assert_raises (HttpStore.Not_implemented) (fun () -> HttpStore.Array.reshape store anode [|1;1;1|]); *)
assert_raises (HttpStore.Not_implemented) (fun () -> HttpStore.clear store);
let bigger_slice = [|R [|0; 6|]; L [|9; 10|] ; R [|0; 11|]|] in
~codecs:[`ShardingIndexed cfg] ~shape:[|100; 100; 50|] ~chunks:[|10; 15; 20|]
Complex32 anode store;
let exp = Ndarray.init Complex32 [|6; 1; 11|] (Fun.const in
let got = store anode slice Complex32 in
assert_equal exp got;
Ndarray.fill exp Complex.{re=2.0; im=0.};
HttpStore.Array.write store anode slice exp;
let got = store anode slice Complex32 in
(* test if a bigger slice containing new elements can be read from store *)
let _ = store anode bigger_slice Complex32 in
assert_equal exp got;
(* test writing a bigger slice to store *)
HttpStore.Array.write store anode bigger_slice @@ Ndarray.init Complex32 [|7; 2; 12|] (Fun.const Complex.{re=0.; im=3.0});
let got = store anode slice Complex32 in
Ndarray.fill exp Complex.{re=0.; im=3.0};
assert_equal exp got;
let nshape = [|25; 28; 10|] in
HttpStore.Array.reshape store anode nshape;
let meta = HttpStore.Array.metadata store anode in
assert_equal ~printer:print_int_array nshape (Metadata.Array.shape meta);
(fun () ->
let exp = Ndarray.init Complex32 [|6; 1; 11|] (Fun.const in
HttpStore.Array.write store anode slice exp);
(fun () -> HttpStore.Array.reshape store anode [|25; 10|]);
(Zarr.Storage.Key_not_found "fakegroup/zarr.json")
(fun () -> HttpStore.Array.metadata store Node.Array.(gnode / "fakegroup"));
assert_raises HttpStore.Not_implemented (fun () -> HttpStore.Array.rename store anode "newname");
assert_raises HttpStore.Not_implemented (fun () -> HttpStore.Group.children store gnode);
assert_raises HttpStore.Not_implemented (fun () -> HttpStore.hierarchy store);
assert_raises HttpStore.Not_implemented (fun () -> HttpStore.Group.delete store gnode);
assert_raises HttpStore.Not_implemented (fun () -> HttpStore.clear store);
Tiny_httpd.stop server)

