Skip to content

Commit

Permalink
update lwt httpstore
Browse files Browse the repository at this point in the history
  • Loading branch information
zoj613 committed Nov 30, 2024
1 parent 7932b3d commit 5f67805
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 36 deletions.
30 changes: 19 additions & 11 deletions zarr-lwt/src/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -340,16 +340,16 @@ end

module HttpStore = struct
exception Not_implemented
exception Request_failed of string
exception Request_failed of int * string

let raise_status_error s =
let c = Cohttp.Code.code_of_status s in
let msg = Printf.sprintf "%d: %s" c (Cohttp.Code.reason_phrase_of_code c) in
raise (Request_failed msg)
raise (Request_failed (c, Cohttp.Code.reason_phrase_of_code c))

module IO = struct
module Deferred = Deferred
open Deferred.Syntax
open Deferred.Infix
open Cohttp_lwt_unix

type t = {base_url : Uri.t}
Expand All @@ -363,7 +363,12 @@ module HttpStore = struct
raise (Zarr.Storage.Key_not_found key)
| e -> raise_status_error e

let size t key =
let size t key = Lwt.catch
(fun () -> get t key >>| String.length)
(function
| Zarr.Storage.Key_not_found _ -> Deferred.return 0
| exn -> raise exn)
(*let size t key =
let url = Uri.with_path t.base_url key in
let* resp = Client.head url in
match Response.status resp with
Expand All @@ -375,7 +380,7 @@ module HttpStore = struct
String.length data
end
| #Cohttp.Code.client_error_status as e when e = `Not_found -> Deferred.return 0
| e -> raise_status_error e
| e -> raise_status_error e *)

let is_member t key =
let+ s = size t key in
Expand All @@ -390,11 +395,11 @@ module HttpStore = struct
let size = String.length data in
List.map (read_range ~data ~size) ranges

(*let set t key data =
let set t key data =
let url = Uri.with_path t.base_url key in
let body = Cohttp_lwt.Body.of_string data in
let headers = Cohttp.Header.of_list [("Content-Length", string_of_int (String.length data))] in
let* resp, _ = Client.put ~body ~headers url in
let* resp, _ = Client.post ~body ~headers url in
match Response.status resp with
| #Cohttp.Code.success_status -> Deferred.return_unit
| e -> raise_status_error e
Expand All @@ -413,11 +418,14 @@ module HttpStore = struct
Bytes.unsafe_to_string s
in
set t key (List.fold_left f ov rsv)
*)

let set _ = raise Not_implemented
let set_partial_values _ = raise Not_implemented
let erase _ = raise Not_implemented
let erase t key =
let url = Uri.with_path t.base_url key in
let* resp, _ = Client.delete url in
match Response.status resp with
| #Cohttp.Code.success_status -> Deferred.return_unit
| e -> raise_status_error e

let erase_prefix _ = raise Not_implemented
let list _ = raise Not_implemented
let list_dir _ = raise Not_implemented
Expand Down
2 changes: 1 addition & 1 deletion zarr-lwt/src/storage.mli
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ end

module HttpStore : sig
exception Not_implemented
exception Request_failed of string
exception Request_failed of int * string
include Zarr.Storage.STORE with module Deferred = Deferred
val with_open : string -> (t -> 'a Lwt.t) -> 'a Lwt.t
end
Expand Down
106 changes: 82 additions & 24 deletions zarr-lwt/test/test_lwt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -149,21 +149,52 @@ let _ =
and profile = "default" in

let promises =
[ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z)
[ZipStore.with_open `Read_write zpath (test_storage (module ZipStore))
(* test just opening the now exisitant archive created by the previous test. *)
;ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit)
;AmazonS3Store.with_open ~region ~bucket ~profile (test_storage (module AmazonS3Store))
;test_storage (module MemoryStore) @@ MemoryStore.create ()
;test_storage (module MemoryStore) (MemoryStore.create ())
;test_storage (module FilesystemStore) s
;HttpStore.with_open "http://127.0.0.1:8080" (fun store ->
let module S = Tiny_httpd in
let dir_behavior = S.Dir.Lists and download = true and delete = true and upload = true in
let config = S.Dir.config ~dir_behavior ~delete ~upload ~download ()
and addr = "127.0.0.1" and port = 8080 in
let server = S.create ~max_connections:4 ~addr ~port () in
(*let dir = "/home/zoj/dev/zarr-ml/testdata.zarr" in *)
let dir = Sys.getenv "HTTPSTORE_DIR" in
S.Dir.add_dir_path ~config ~dir ~prefix:"" server;
let server = S.create ~max_connections:1000 ~addr:"127.0.0.1" ~port:8080 () in
let dir = tmp_dir in
S.add_route_handler server ~meth:`HEAD S.Route.rest_of_path_urlencoded (fun path _ ->
let fspath = Filename.concat dir path in
match In_channel.(with_open_gen [Open_rdonly] 0o700 fspath length) with
| exception Sys_error e -> S.Response.make_raw ~code:404 e
| s ->
let headers =
[("Content-Length", Int64.to_string s)
;("Content-Type",
if String.ends_with ~suffix:".json" path
then "application/json"
else "application/octet-stream")]
in
let r = S.Response.make_raw ~code:200 "" in
S.Response.update_headers (List.append headers) r
);
S.add_route_handler server ~meth:`GET S.Route.rest_of_path_urlencoded (fun path _ ->
let fspath = Filename.concat dir path in
match In_channel.(with_open_gen [Open_rdonly] 0o700 fspath input_all) with
| exception Sys_error _ -> S.Response.make_raw ~code:404 (Printf.sprintf "%s not found" path)
| s -> S.Response.make_raw ~code:200 s
);
S.add_route_handler server ~meth:`POST S.Route.rest_of_path_urlencoded (fun path req ->
let write oc = Out_channel.(output_string oc req.body; flush oc) in
let fspath = Filename.concat dir path in
Zarr.Util.create_parent_dir fspath 0o700;
let f = [Open_wronly; Open_trunc; Open_creat] in
match Out_channel.(with_open_gen f 0o700 fspath write) with
| exception Sys_error e -> S.Response.make_raw ~code:500 e
| () -> S.Response.make_raw ~code:201 req.body
);
S.add_route_handler server ~meth:`DELETE S.Route.rest_of_path_urlencoded (fun path _ ->
let fspath = Filename.concat dir path in
match Sys.remove fspath with
| exception Sys_error e -> S.Response.make_raw ~code:404 e
| () -> S.Response.make_raw ~code:200 (Printf.sprintf "%s deleted successfully" path)
);
let _ = Thread.create S.run server in

let open Deferred.Syntax in
Expand All @@ -172,25 +203,52 @@ let _ =
(fun () -> let* _ = f () in Deferred.return_unit)
(function
| HttpStore.Not_implemented -> Deferred.return_unit
| exn -> raise exn)
| _ -> failwith "Supposed to raise Not_implemented")
in
let gnode = Node.Group.of_path "/some/group" in
let gnode = Node.Group.root in
let attrs = `Assoc [("questions", `String "answer")] in
let* () = HttpStore.Group.create ~attrs store gnode in
let* exists = HttpStore.Group.exists store gnode in
assert_equal ~printer:string_of_bool true exists;
let* meta = HttpStore.Group.metadata store gnode in
assert_equal ~printer:Metadata.Group.show Metadata.Group.default meta;
let anode = Node.Array.of_path "/some/group/another" in
let slice = [|R [|0; 5|]; I 10; R [|0; 10|]|] in
let* _ = HttpStore.Array.read store anode slice Complex32 in
let* () = assert_not_implemented (fun () -> HttpStore.hierarchy store) in
let* () = assert_not_implemented (fun () -> HttpStore.Group.create store (Node.Group.of_path "/blah")) in
assert_equal ~printer:Yojson.Safe.show attrs (Metadata.Group.attributes meta);
let* exists = HttpStore.Array.exists store Node.Array.(gnode / "non-member") in
assert_equal ~printer:string_of_bool false exists;
let cfg =
{chunk_shape = [|2; 5; 5|]
;index_location = End
;index_codecs = [`Bytes LE]
;codecs = [`Transpose [|2; 0; 1|]; `Bytes BE]} in
let anode = Node.Array.(gnode / "arrnode")
and slice = [|R [|0; 5|]; I 10; R [|0; 10|]|]
and bigger_slice = [|R [|0; 6|]; L [|9; 10|] ; R [|0; 11|]|]
and codecs = [`ShardingIndexed cfg] and shape = [|100; 100; 50|] and chunks = [|10; 15; 20|] in
let* () = HttpStore.Array.create ~codecs ~shape ~chunks Complex32 Complex.one anode store in
let exp = Ndarray.init Complex32 [|6; 1; 11|] (Fun.const Complex.one) in
let* got = HttpStore.Array.read store anode slice Complex32 in
assert_equal exp got;
Ndarray.fill exp Complex.{re=2.0; im=0.};
let* () = HttpStore.Array.write store anode slice exp in
let* got = HttpStore.Array.read store anode slice Complex32 in
(* test if a bigger slice containing new elements can be read from store *)
let* _ = HttpStore.Array.read store anode bigger_slice Complex32 in
assert_equal exp got;
(* test writing a bigger slice to store *)
let* () = HttpStore.Array.write store anode bigger_slice @@ Ndarray.init Complex32 [|7; 2; 12|] (Fun.const Complex.{re=0.; im=3.0}) in
let* got = HttpStore.Array.read store anode slice Complex32 in
Ndarray.fill exp Complex.{re=0.; im=3.0};
assert_equal exp got;
let nshape = [|25; 28; 10|] in
let* () = HttpStore.Array.reshape store anode nshape in
let* meta = HttpStore.Array.metadata store anode in
assert_equal ~printer:print_int_array nshape (Metadata.Array.shape meta);
let* () = assert_not_implemented (fun () -> HttpStore.Array.rename store anode "newname") in
let* () = assert_not_implemented (fun () -> HttpStore.Group.children store gnode) in
(*let* () = assert_not_implemented (fun () -> HttpStore.Array.rename store anode "newname") in *)
let* () = assert_not_implemented (fun () -> HttpStore.Array.reshape store anode [|1;1;1|]) in
let* () = assert_not_implemented (fun () -> HttpStore.clear store) in
let+ () = assert_not_implemented (fun () ->
let exp = Ndarray.init Complex32 [|6; 1; 11|] (Fun.const Complex.one) in
HttpStore.Array.write store anode slice exp) in
let* () = assert_not_implemented (fun () -> HttpStore.hierarchy store) in
let* () = assert_not_implemented (fun () -> HttpStore.Group.delete store gnode) in
let+ () = assert_not_implemented (fun () -> HttpStore.clear store) in
Tiny_httpd.stop server)
]
in
ignore (Lwt_main.run @@ Lwt.join promises))
Lwt_main.run @@ Lwt.join promises)
])

0 comments on commit 5f67805

Please sign in to comment.