Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for an Http server storage backend #85

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
os:
- ubuntu-latest
ocaml-compiler:
- "5.2.0"
- "5.2.1"
- "4.14.2"
local-packages:
- zarr.opam
Expand Down Expand Up @@ -72,14 +72,14 @@ jobs:
- name: setup
run: |
opam install --deps-only --with-test --with-doc --yes zarr
opam install bytesrw conf-zlib conf-zstd --yes
opam install lwt aws-s3-lwt --yes
opam install bytesrw conf-zlib conf-zstd ezcurl tiny_httpd --yes
opam install lwt aws-s3-lwt ezcurl-lwt --yes
opam exec -- dune build zarr zarr-sync zarr-lwt

- name: setup ocaml-5-specific
if: ${{ matrix.ocaml-compiler == '5.2.0' }}
if: ${{ matrix.ocaml-compiler == '5.2.1' }}
run: |
opam install eio_main --yes
opam install eio_main cohttp-eio --yes
opam exec -- dune build zarr-eio

- name: test
Expand All @@ -89,29 +89,29 @@ jobs:
opam exec -- dune exec --instrument-with bisect_ppx --force -- _build/default/zarr-lwt/test/test_lwt.exe -runner sequential -ci true

- name: test ocaml-5-specific libs
if: ${{ matrix.ocaml-compiler == '5.2.0' }}
if: ${{ matrix.ocaml-compiler == '5.2.1' }}
run: |
opam exec -- dune exec --instrument-with bisect_ppx --force -- _build/default/zarr-eio/test/test_eio.exe -runner sequential -ci true

- name: Upload code coverage report
if: ${{ matrix.ocaml-compiler == '5.2.0' }}
if: ${{ matrix.ocaml-compiler == '5.2.1' }}
run: opam exec -- bisect-ppx-report send-to Codecov
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

- name: Build Docs
if: ${{ matrix.ocaml-compiler == '5.2.0' }}
if: ${{ matrix.ocaml-compiler == '5.2.1' }}
run: opam exec -- dune build @doc

- name: Upload API Docs artifact
if: ${{ matrix.ocaml-compiler == '5.2.0' }}
if: ${{ matrix.ocaml-compiler == '5.2.1' }}
uses: actions/[email protected]
with:
name: docs
path: ./_build/default/_doc/_html

- name: Deploy API Docs
if: ${{ matrix.ocaml-compiler == '5.2.0' }}
if: ${{ matrix.ocaml-compiler == '5.2.1' }}
uses: peaceiris/actions-gh-pages@v4
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
Expand Down
7 changes: 7 additions & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
(stdint (>= 0.7.2))
(zipc (>= 0.2.0))
(checkseum (>= 0.4.0))
(bytesrw (>= 0.1.0))
(odoc :with-doc)
(ounit2 :with-test)
(ppx_deriving :with-test)
Expand All @@ -48,9 +49,11 @@
(ocaml
(and (>= 4.14.0)))
(zarr (= :version))
(ezcurl (>= 0.2.4))
(odoc :with-doc)
(ounit2 :with-test)
(ppx_deriving :with-test)
(tiny_httpd :with-test)
(bisect_ppx
(and :dev (>= 2.5.0) :with-test))))

Expand All @@ -64,8 +67,10 @@
(zarr (= :version))
(lwt (>= 2.5.1))
(aws-s3-lwt (>= 4.8.1))
(ezcurl-lwt (>= 0.2.4))
(odoc :with-doc)
(ounit2 :with-test)
(tiny_httpd :with-test)
(ppx_deriving :with-test)
(bisect_ppx
(and :dev (>= 2.5.0) :with-test))))
Expand All @@ -79,8 +84,10 @@
(and (>= 5.1.0)))
(zarr (= :version))
(eio_main (>= 1.0))
(cohttp-eio (>= 6.0.0))
(odoc :with-doc)
(ounit2 :with-test)
(tiny_httpd :with-test)
(ppx_deriving :with-test)
(bisect_ppx
(and :dev (>= 2.5.0) :with-test))))
2 changes: 2 additions & 0 deletions zarr-eio.opam
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ depends: [
"ocaml" {>= "5.1.0"}
"zarr" {= version}
"eio_main" {>= "1.0"}
"cohttp-eio" {>= "6.0.0"}
"odoc" {with-doc}
"ounit2" {with-test}
"tiny_httpd" {with-test}
"ppx_deriving" {with-test}
"bisect_ppx" {dev & >= "2.5.0" & with-test}
]
Expand Down
1 change: 1 addition & 0 deletions zarr-eio/src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(public_name zarr-eio)
(libraries
zarr
cohttp-eio
eio_main)
(ocamlopt_flags
(:standard -O3))
Expand Down
95 changes: 95 additions & 0 deletions zarr-eio/src/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,98 @@

include Zarr.Storage.Make(IO)
end

module HttpStore = struct
exception Not_implemented
exception Request_failed of int * string

open Cohttp_eio

let raise_status_error e =
let c = Cohttp.Code.code_of_status e in
raise (Request_failed (c, Cohttp.Code.reason_phrase_of_code c))

Check warning on line 145 in zarr-eio/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-eio/src/storage.ml#L144-L145

Added lines #L144 - L145 were not covered by tests

let fold_response ~success resp key = match Http.Response.status resp with
| #Http.Status.success -> success ()
| #Http.Status.client_error as e when e = `Not_found ->
raise (Zarr.Storage.Key_not_found key)
| e -> raise_status_error e

Check warning on line 151 in zarr-eio/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-eio/src/storage.ml#L151

Added line #L151 was not covered by tests

module IO = struct
module Deferred = Deferred

type t = {base_url : Uri.t; client : Client.t}

let get t key =
Eio.Switch.run @@ fun sw ->
let url = Uri.with_path t.base_url key in
let resp, body = Client.get ~sw t.client url in
fold_response ~success:(fun () -> Eio.Flow.read_all body) resp key

let size t key = try String.length (get t key) with
| Zarr.Storage.Key_not_found _ -> 0

(*let size t key =
let content_length resp () = match Http.Response.content_length resp with
| Some l -> l
| None -> String.length (get t key)
in
Eio.Switch.run @@ fun sw ->
let url = Uri.with_path t.base_url key in
let resp = Client.head ~sw t.client url in
fold_response ~success:(content_length resp) resp key *)

let is_member t key = if (size t key) > 0 then true else false

let get_partial_values t key ranges =
let read_range ~data ~size (ofs, len) = match len with
| None -> String.sub data ofs (size - ofs)
| Some l -> String.sub data ofs l
in
let data = get t key in
let size = String.length data in
List.map (read_range ~data ~size) ranges

let set t key data =
Eio.Switch.run @@ fun sw ->
let url = Uri.with_path t.base_url key in
let headers = Http.Header.of_list [("Content-Length", string_of_int (String.length data))] in
let body = Body.of_string data in
let resp, _ = Client.put ~sw ~headers ~body t.client url in
fold_response ~success:(fun () -> ()) resp key

let set_partial_values t key ?(append=false) rsv =
let ov = try get t key with
| Zarr.Storage.Key_not_found _ -> String.empty
in
let f = if append || ov = String.empty then
fun acc (_, v) -> acc ^ v else
fun acc (rs, v) ->
let s = Bytes.unsafe_of_string acc in
Bytes.blit_string v 0 s rs String.(length v);
Bytes.unsafe_to_string s
in
set t key (List.fold_left f ov rsv)

(*let erase t key =
Eio.Switch.run @@ fun sw ->
let url = Uri.with_path t.base_url key in
let resp, _ = Client.delete ~sw t.client url in
match Http.Response.status resp with
| #Http.Status.success -> Deferred.return_unit
| #Http.Status.client_error as e when e = `Not_found -> Deferred.return_unit
| e -> raise_status_error e *)

let erase _ = raise Not_implemented

Check warning on line 218 in zarr-eio/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-eio/src/storage.ml#L218

Added line #L218 was not covered by tests
let erase_prefix _ = raise Not_implemented
let list _ = raise Not_implemented
let list_dir _ = raise Not_implemented
let rename _ = raise Not_implemented
end

let with_open ~net uri f =
let client = Client.make ~https:None net in
f IO.{client; base_url = uri}

include Zarr.Storage.Make(IO)
end
11 changes: 11 additions & 0 deletions zarr-eio/src/storage.mli
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,14 @@ module FilesystemStore : sig

@raise Failure if [dir] is a file and not a Zarr store path. *)
end

module HttpStore : sig
exception Not_implemented
exception Request_failed of int * string
include Zarr.Storage.STORE with module Deferred = Deferred
val with_open :
net:_ Eio.Net.t ->
Uri.t ->
(t -> 'a) ->
'a
end
1 change: 1 addition & 0 deletions zarr-eio/test/dune
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
(name test_eio)
(libraries
zarr-eio
tiny_httpd
ounit2)
(package zarr-eio)
(preprocess
Expand Down
136 changes: 135 additions & 1 deletion zarr-eio/test/test_eio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,131 @@ let test_storage
let got = hierarchy store in
assert_equal ~printer:print_node_pair ([], []) got

module type SYNC_PARTIAL_STORE = sig
exception Not_implemented
include Zarr.Storage.STORE with type 'a Deferred.t = 'a
end

let test_readable_writable_only
(type a) (module M : SYNC_PARTIAL_STORE with type t = a) (store : a) =
let open M in
let gnode = Node.Group.root in
let attrs = `Assoc [("questions", `String "answer")] in
Group.create ~attrs store gnode;
let exists = Group.exists store gnode in
assert_equal ~printer:string_of_bool true exists;
let meta = Group.metadata store gnode in
assert_equal ~printer:Yojson.Safe.show attrs (Metadata.Group.attributes meta);
let exists = Array.exists store Node.Array.(gnode / "non-member") in
assert_equal ~printer:string_of_bool false exists;

let cfg =
{chunk_shape = [|2; 5; 5|]
;index_location = End
;index_codecs = [`Bytes LE]
;codecs = [`Transpose [|2; 0; 1|]; `Bytes BE]} in
let anode = Node.Array.(gnode / "arrnode") in
let slice = [|R [|0; 5|]; I 10; R [|0; 10|]|] in
let bigger_slice = [|R [|0; 6|]; L [|9; 10|] ; R [|0; 11|]|] in
Array.create
~codecs:[`ShardingIndexed cfg] ~shape:[|100; 100; 50|] ~chunks:[|10; 15; 20|]
Complex32 Complex.one anode store;
let exp = Ndarray.init Complex32 [|6; 1; 11|] (Fun.const Complex.one) in
let got = Array.read store anode slice Complex32 in
assert_equal exp got;
Ndarray.fill exp Complex.{re=2.0; im=0.};
Array.write store anode slice exp;
let got = Array.read store anode slice Complex32 in
(* test if a bigger slice containing new elements can be read from store *)
let _ = Array.read store anode bigger_slice Complex32 in
assert_equal exp got;
(* test writing a bigger slice to store *)
Array.write store anode bigger_slice @@ Ndarray.init Complex32 [|7; 2; 12|] (Fun.const Complex.{re=0.; im=3.0});
let got = Array.read store anode slice Complex32 in
Ndarray.fill exp Complex.{re=0.; im=3.0};
assert_equal exp got;
let nshape = [|25; 28; 10|] in
Array.reshape store anode nshape;
let meta = Array.metadata store anode in
assert_equal ~printer:print_int_array nshape (Metadata.Array.shape meta);
assert_raises
(Zarr.Storage.Invalid_resize_shape)
(fun () -> Array.reshape store anode [|25; 10|]);
assert_raises
(Zarr.Storage.Key_not_found "fakegroup/zarr.json")
(fun () -> Array.metadata store Node.Array.(gnode / "fakegroup"));
assert_raises Not_implemented (fun () -> Array.rename store anode "newname");
assert_raises Not_implemented (fun () -> Group.children store gnode);
assert_raises Not_implemented (fun () -> hierarchy store);
assert_raises Not_implemented (fun () -> Group.delete store gnode);
assert_raises Not_implemented (fun () -> clear store)

module Dir_http_server = struct
module S = Tiny_httpd

let make ~max_connections ~dir () =
let server = S.create ~max_connections ~addr:"127.0.0.1" ~port:8080 () in
(* HEAD request handler *)
S.add_route_handler server ~meth:`HEAD S.Route.rest_of_path_urlencoded (fun path _ ->
let headers = [("Content-Type", if String.ends_with ~suffix:".json" path then "application/json" else "application/octet-stream")] in
let fspath = Filename.concat dir path in
let headers = match In_channel.(with_open_gen [Open_rdonly] 0o700 fspath length) with
| exception Sys_error _ -> ("Content-Length", "0") :: headers
| l -> ("Content-Length", Int64.to_string l) :: headers
in
let r = S.Response.make_raw ~code:200 "" in
S.Response.update_headers (List.append headers) r
);
(* GET request handler *)
S.add_route_handler server ~meth:`GET S.Route.rest_of_path_urlencoded (fun path _ ->
let fspath = Filename.concat dir path in
match In_channel.(with_open_gen [Open_rdonly] 0o700 fspath input_all) with
| exception Sys_error _ -> S.Response.make_raw ~code:404 (Printf.sprintf "%s not found" path)
| s ->
let headers =
[("Content-Length", Int.to_string (String.length s))
;("Content-Type",
if String.ends_with ~suffix:".json" path
then "application/json"
else "application/octet-stream")]
in
S.Response.make_raw ~headers ~code:200 s
);
(* POST request handler *)
S.add_route_handler_stream server ~meth:`PUT S.Route.rest_of_path_urlencoded (fun path req ->
let write oc =
let max_size = 1024 * 10 * 1024 in
let req' = S.Request.limit_body_size ~bytes:(Bytes.create 4096) ~max_size req in
S.IO.Input.iter (Out_channel.output oc) req'.body;
Out_channel.flush oc
in
let fspath = Filename.concat dir path in
Zarr.Util.create_parent_dir fspath 0o700;
let f = [Open_wronly; Open_trunc; Open_creat] in
match Out_channel.(with_open_gen f 0o700 fspath write) with
| exception Sys_error e -> S.Response.make_raw ~code:500 e
| () ->
let opt = List.assoc_opt "content-type" req.headers in
let content_type = Option.fold ~none:"application/octet-stream" ~some:Fun.id opt in
let headers = [("content-type", content_type); ("Connection", "close")] in
S.Response.make_raw ~headers ~code:201 (Printf.sprintf "%s created" path)
);
(* DELETE request handler *)
S.add_route_handler server ~meth:`DELETE S.Route.rest_of_path_urlencoded (fun path _ ->
let fspath = Filename.concat dir path in
match Sys.remove fspath with
| exception Sys_error e -> S.Response.make_raw ~code:404 e
| () ->
let headers = [("Connection", "close")] in
S.Response.make_raw ~headers ~code:200 (Printf.sprintf "%s deleted successfully" path)
);
server

let run_with t after_init =
let perform () = let _ = Thread.create S.run_exn t in after_init () in
Fun.protect ~finally:(fun () -> S.stop t) perform
end

let _ =
run_test_tt_main @@ ("Run Zarr Eio API tests" >::: [
"test eio-based stores" >::
Expand Down Expand Up @@ -149,5 +274,14 @@ let _ =
(* test just opening the now exisitant archive created by the previous test. *)
ZipStore.with_open `Read_only zpath (fun _ -> ZipStore.Deferred.return_unit);
test_storage (module MemoryStore) @@ MemoryStore.create ();
test_storage (module FilesystemStore) s)
test_storage (module FilesystemStore) s;

let server = Dir_http_server.make ~max_connections:100 ~dir:tmp_dir () in
Dir_http_server.run_with server (fun () ->
HttpStore.with_open
~net:env#net
(Uri.of_string "http://127.0.0.1:8080")
(test_readable_writable_only (module HttpStore))
)
)
])
Loading