diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 5f01af23..e065e83a 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -24,7 +24,7 @@ jobs: os: - ubuntu-latest ocaml-compiler: - - "5.2.0" + - "5.2.1" - "4.14.2" local-packages: - zarr.opam @@ -72,14 +72,14 @@ jobs: - name: setup run: | opam install --deps-only --with-test --with-doc --yes zarr - opam install bytesrw conf-zlib conf-zstd --yes - opam install lwt aws-s3-lwt --yes + opam install bytesrw conf-zlib conf-zstd ezcurl tiny_httpd --yes + opam install lwt aws-s3-lwt ezcurl-lwt --yes opam exec -- dune build zarr zarr-sync zarr-lwt - name: setup ocaml-5-specific - if: ${{ matrix.ocaml-compiler == '5.2.0' }} + if: ${{ matrix.ocaml-compiler == '5.2.1' }} run: | - opam install eio_main --yes + opam install eio_main cohttp-eio --yes opam exec -- dune build zarr-eio - name: test @@ -89,29 +89,29 @@ jobs: opam exec -- dune exec --instrument-with bisect_ppx --force -- _build/default/zarr-lwt/test/test_lwt.exe -runner sequential -ci true - name: test ocaml-5-specific libs - if: ${{ matrix.ocaml-compiler == '5.2.0' }} + if: ${{ matrix.ocaml-compiler == '5.2.1' }} run: | opam exec -- dune exec --instrument-with bisect_ppx --force -- _build/default/zarr-eio/test/test_eio.exe -runner sequential -ci true - name: Upload code coverage report - if: ${{ matrix.ocaml-compiler == '5.2.0' }} + if: ${{ matrix.ocaml-compiler == '5.2.1' }} run: opam exec -- bisect-ppx-report send-to Codecov env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} - name: Build Docs - if: ${{ matrix.ocaml-compiler == '5.2.0' }} + if: ${{ matrix.ocaml-compiler == '5.2.1' }} run: opam exec -- dune build @doc - name: Upload API Docs artifact - if: ${{ matrix.ocaml-compiler == '5.2.0' }} + if: ${{ matrix.ocaml-compiler == '5.2.1' }} uses: actions/upload-artifact@v3.1.3 with: name: docs path: ./_build/default/_doc/_html - name: Deploy API Docs - if: ${{ matrix.ocaml-compiler == '5.2.0' }} + if: ${{ matrix.ocaml-compiler == '5.2.1' }} uses: peaceiris/actions-gh-pages@v4 with: github_token: ${{ secrets.GITHUB_TOKEN }} diff --git a/README.md b/README.md index ebc824ad..1a9e91d2 100644 --- a/README.md +++ b/README.md @@ -11,9 +11,10 @@ arrays, designed for use in parallel computing. - Supports creating n-dimensional Zarr arrays and chunking them along any dimension. - Compresses chunks using a variety of supported compression codecs. - Supports indexing operations to read/write views of a Zarr array. -- Supports storing arrays in-memory or the local filesystem. It is also - extensible, allowing users to easily create and use their own custom storage - backends. See the example implementing a [Zip file store][9] for more details. +- Supports many storage backends, including in-memory store, the local filesystem, + Amazon S3 and others. It is also extensible, allowing users to easily create and + use their own custom storage backends. See the example implementing a + [Zip archive store][9] for more details. - Supports both synchronous and asynchronous I/O via [Lwt][4] and [Eio][8]. The user can easily use their own scheduler of choice. See the [example][10] implementing a filesystem store that uses the [Picos][11] concurrency library for non-blocking I/O. diff --git a/dune-project b/dune-project index 7232da60..461fd6fb 100644 --- a/dune-project +++ b/dune-project @@ -32,6 +32,7 @@ (stdint (>= 0.7.2)) (zipc (>= 0.2.0)) (checkseum (>= 0.4.0)) + (bytesrw (>= 0.1.0)) (odoc :with-doc) (ounit2 :with-test) (ppx_deriving :with-test) @@ -48,9 +49,11 @@ (ocaml (and (>= 4.14.0))) (zarr (= :version)) + (ezcurl (>= 0.2.4)) (odoc :with-doc) (ounit2 :with-test) (ppx_deriving :with-test) + (tiny_httpd :with-test) (bisect_ppx (and :dev (>= 2.5.0) :with-test)))) @@ -64,8 +67,10 @@ (zarr (= :version)) (lwt (>= 2.5.1)) (aws-s3-lwt (>= 4.8.1)) + (ezcurl-lwt (>= 0.2.4)) (odoc :with-doc) (ounit2 :with-test) + (tiny_httpd :with-test) (ppx_deriving :with-test) (bisect_ppx (and :dev (>= 2.5.0) :with-test)))) @@ -79,8 +84,10 @@ (and (>= 5.1.0))) (zarr (= :version)) (eio_main (>= 1.0)) + (cohttp-eio (>= 6.0.0)) (odoc :with-doc) (ounit2 :with-test) + (tiny_httpd :with-test) (ppx_deriving :with-test) (bisect_ppx (and :dev (>= 2.5.0) :with-test)))) diff --git a/zarr-eio.opam b/zarr-eio.opam index 9364a9ad..d311a523 100644 --- a/zarr-eio.opam +++ b/zarr-eio.opam @@ -13,8 +13,10 @@ depends: [ "ocaml" {>= "5.1.0"} "zarr" {= version} "eio_main" {>= "1.0"} + "cohttp-eio" {>= "6.0.0"} "odoc" {with-doc} "ounit2" {with-test} + "tiny_httpd" {with-test} "ppx_deriving" {with-test} "bisect_ppx" {dev & >= "2.5.0" & with-test} ] diff --git a/zarr-eio/src/dune b/zarr-eio/src/dune index b380aab7..03ba51f9 100644 --- a/zarr-eio/src/dune +++ b/zarr-eio/src/dune @@ -3,6 +3,7 @@ (public_name zarr-eio) (libraries zarr + cohttp-eio eio_main) (ocamlopt_flags (:standard -O3)) diff --git a/zarr-eio/src/storage.ml b/zarr-eio/src/storage.ml index 526a84c0..0922d61c 100644 --- a/zarr-eio/src/storage.ml +++ b/zarr-eio/src/storage.ml @@ -129,3 +129,97 @@ module FilesystemStore = struct include Zarr.Storage.Make(IO)(S) end + +module HttpStore = struct + exception Not_implemented + exception Request_failed of int * string + + open Cohttp_eio + + let raise_status_error e = + let c = Cohttp.Code.code_of_status e in + raise (Request_failed (c, Cohttp.Code.reason_phrase_of_code c)) + + let fold_response ~success resp key = match Http.Response.status resp with + | #Http.Status.success -> success () + | #Http.Status.client_error as e when e = `Not_found -> + raise (Zarr.Storage.Key_not_found key) + | e -> raise_status_error e + + module S = struct + type t = {base_url : Uri.t; client : Client.t} + type 'a io = 'a IO.t + + let get t key = + Eio.Switch.run @@ fun sw -> + let url = Uri.with_path t.base_url key in + let resp, body = Client.get ~sw t.client url in + fold_response ~success:(fun () -> Eio.Flow.read_all body) resp key + + let size t key = try String.length (get t key) with + | Zarr.Storage.Key_not_found _ -> 0 + + (*let size t key = + let content_length resp () = match Http.Response.content_length resp with + | Some l -> l + | None -> String.length (get t key) + in + Eio.Switch.run @@ fun sw -> + let url = Uri.with_path t.base_url key in + let resp = Client.head ~sw t.client url in + fold_response ~success:(content_length resp) resp key *) + + let is_member t key = if (size t key) > 0 then true else false + + let get_partial_values t key ranges = + let read_range ~data ~size (ofs, len) = match len with + | None -> String.sub data ofs (size - ofs) + | Some l -> String.sub data ofs l + in + let data = get t key in + let size = String.length data in + List.map (read_range ~data ~size) ranges + + let set t key data = + Eio.Switch.run @@ fun sw -> + let url = Uri.with_path t.base_url key in + let headers = Http.Header.of_list [("Content-Length", string_of_int (String.length data))] in + let body = Body.of_string data in + let resp, _ = Client.put ~sw ~headers ~body t.client url in + fold_response ~success:(fun () -> ()) resp key + + let set_partial_values t key ?(append=false) rsv = + let ov = try get t key with + | Zarr.Storage.Key_not_found _ -> String.empty + in + let f = if append || ov = String.empty then + fun acc (_, v) -> acc ^ v else + fun acc (rs, v) -> + let s = Bytes.unsafe_of_string acc in + Bytes.blit_string v 0 s rs String.(length v); + Bytes.unsafe_to_string s + in + set t key (List.fold_left f ov rsv) + + (*let erase t key = + Eio.Switch.run @@ fun sw -> + let url = Uri.with_path t.base_url key in + let resp, _ = Client.delete ~sw t.client url in + match Http.Response.status resp with + | #Http.Status.success -> Deferred.return_unit + | #Http.Status.client_error as e when e = `Not_found -> Deferred.return_unit + | e -> raise_status_error e *) + + let erase _ = raise Not_implemented + let erase_prefix _ = raise Not_implemented + let list _ = raise Not_implemented + let list_dir _ = raise Not_implemented + let rename _ = raise Not_implemented + end + + let with_open ?https ~net uri f = + let client = Client.make ~https net in + f S.{client; base_url = uri} + + include Zarr.Storage.Make(IO)(S) +end diff --git a/zarr-eio/src/storage.mli b/zarr-eio/src/storage.mli index 19678c0b..d36a8afa 100644 --- a/zarr-eio/src/storage.mli +++ b/zarr-eio/src/storage.mli @@ -20,3 +20,15 @@ module FilesystemStore : sig @raise Failure if [dir] is a file and not a Zarr store path. *) end + +module HttpStore : sig + exception Not_implemented + exception Request_failed of int * string + include Zarr.Storage.S with type 'a io := 'a + val with_open : + ?https:(Uri.t -> [ `Generic ] Eio.Net.stream_socket_ty Eio.Std.r -> _ Eio.Flow.two_way) -> + net:_ Eio.Net.t -> + Uri.t -> + (t -> 'a) -> + 'a +end diff --git a/zarr-eio/test/dune b/zarr-eio/test/dune index 1d337e06..f9ad930f 100644 --- a/zarr-eio/test/dune +++ b/zarr-eio/test/dune @@ -2,6 +2,7 @@ (name test_eio) (libraries zarr-eio + tiny_httpd ounit2) (package zarr-eio) (preprocess diff --git a/zarr-eio/test/test_eio.ml b/zarr-eio/test/test_eio.ml index 7bf8b8ec..542f6538 100644 --- a/zarr-eio/test/test_eio.ml +++ b/zarr-eio/test/test_eio.ml @@ -97,6 +97,131 @@ let test_storage let got = hierarchy store in assert_equal ~printer:print_node_pair ([], []) got +module type SYNC_PARTIAL_STORE = sig + exception Not_implemented + include Zarr.Storage.S with type 'a io := 'a +end + +let test_readable_writable_only + (type a) (module M : SYNC_PARTIAL_STORE with type t = a) (store : a) = + let open M in + let gnode = Node.Group.root in + let attrs = `Assoc [("questions", `String "answer")] in + Group.create ~attrs store gnode; + let exists = Group.exists store gnode in + assert_equal ~printer:string_of_bool true exists; + let meta = Group.metadata store gnode in + assert_equal ~printer:Yojson.Safe.show attrs (Metadata.Group.attributes meta); + let exists = Array.exists store Node.Array.(gnode / "non-member") in + assert_equal ~printer:string_of_bool false exists; + + let cfg = + {chunk_shape = [2; 5; 5] + ;index_location = End + ;index_codecs = [`Bytes LE] + ;codecs = [`Transpose [2; 0; 1]; `Bytes BE]} in + let anode = Node.Array.(gnode / "arrnode") in + let slice = [R (0, 5); I 10; R (0, 10)] in + let bigger_slice = [R (0, 6); L [9; 10] ; R (0, 11)] in + Array.create + ~codecs:[`ShardingIndexed cfg] ~shape:[100; 100; 50] ~chunks:[10; 15; 20] + Complex32 Complex.one anode store; + let exp = Ndarray.init Complex32 [6; 1; 11] (Fun.const Complex.one) in + let got = Array.read store anode slice Complex32 in + assert_equal exp got; + Ndarray.fill exp Complex.{re=2.0; im=0.}; + Array.write store anode slice exp; + let got = Array.read store anode slice Complex32 in + (* test if a bigger slice containing new elements can be read from store *) + let _ = Array.read store anode bigger_slice Complex32 in + assert_equal exp got; + (* test writing a bigger slice to store *) + Array.write store anode bigger_slice @@ Ndarray.init Complex32 [7; 2; 12] (Fun.const Complex.{re=0.; im=3.0}); + let got = Array.read store anode slice Complex32 in + Ndarray.fill exp Complex.{re=0.; im=3.0}; + assert_equal exp got; + let nshape = [25; 28; 10] in + Array.reshape store anode nshape; + let meta = Array.metadata store anode in + assert_equal ~printer:[%show: int list] nshape (Metadata.Array.shape meta); + assert_raises + (Zarr.Storage.Invalid_resize_shape) + (fun () -> Array.reshape store anode [25; 10]); + assert_raises + (Zarr.Storage.Key_not_found "fakegroup/zarr.json") + (fun () -> Array.metadata store Node.Array.(gnode / "fakegroup")); + assert_raises Not_implemented (fun () -> Array.rename store anode "newname"); + assert_raises Not_implemented (fun () -> Group.children store gnode); + assert_raises Not_implemented (fun () -> hierarchy store); + assert_raises Not_implemented (fun () -> Group.delete store gnode); + assert_raises Not_implemented (fun () -> clear store) + +module Dir_http_server = struct + module S = Tiny_httpd + + let make ~max_connections ~dir () = + let server = S.create ~max_connections ~addr:"127.0.0.1" ~port:8080 () in + (* HEAD request handler *) + S.add_route_handler server ~meth:`HEAD S.Route.rest_of_path_urlencoded (fun path _ -> + let headers = [("Content-Type", if String.ends_with ~suffix:".json" path then "application/json" else "application/octet-stream")] in + let fspath = Filename.concat dir path in + let headers = match In_channel.(with_open_gen [Open_rdonly] 0o700 fspath length) with + | exception Sys_error _ -> ("Content-Length", "0") :: headers + | l -> ("Content-Length", Int64.to_string l) :: headers + in + let r = S.Response.make_raw ~code:200 "" in + S.Response.update_headers (List.append headers) r + ); + (* GET request handler *) + S.add_route_handler server ~meth:`GET S.Route.rest_of_path_urlencoded (fun path _ -> + let fspath = Filename.concat dir path in + match In_channel.(with_open_gen [Open_rdonly] 0o700 fspath input_all) with + | exception Sys_error _ -> S.Response.make_raw ~code:404 (Printf.sprintf "%s not found" path) + | s -> + let headers = + [("Content-Length", Int.to_string (String.length s)) + ;("Content-Type", + if String.ends_with ~suffix:".json" path + then "application/json" + else "application/octet-stream")] + in + S.Response.make_raw ~headers ~code:200 s + ); + (* POST request handler *) + S.add_route_handler_stream server ~meth:`PUT S.Route.rest_of_path_urlencoded (fun path req -> + let write oc = + let max_size = 1024 * 10 * 1024 in + let req' = S.Request.limit_body_size ~bytes:(Bytes.create 4096) ~max_size req in + S.IO.Input.iter (Out_channel.output oc) req'.body; + Out_channel.flush oc + in + let fspath = Filename.concat dir path in + Zarr.Util.create_parent_dir fspath 0o700; + let f = [Open_wronly; Open_trunc; Open_creat] in + match Out_channel.(with_open_gen f 0o700 fspath write) with + | exception Sys_error e -> S.Response.make_raw ~code:500 e + | () -> + let opt = List.assoc_opt "content-type" req.headers in + let content_type = Option.fold ~none:"application/octet-stream" ~some:Fun.id opt in + let headers = [("content-type", content_type); ("Connection", "close")] in + S.Response.make_raw ~headers ~code:201 (Printf.sprintf "%s created" path) + ); + (* DELETE request handler *) + S.add_route_handler server ~meth:`DELETE S.Route.rest_of_path_urlencoded (fun path _ -> + let fspath = Filename.concat dir path in + match Sys.remove fspath with + | exception Sys_error e -> S.Response.make_raw ~code:404 e + | () -> + let headers = [("Connection", "close")] in + S.Response.make_raw ~headers ~code:200 (Printf.sprintf "%s deleted successfully" path) + ); + server + + let run_with t after_init = + let perform () = let _ = Thread.create S.run_exn t in after_init () in + Fun.protect ~finally:(fun () -> S.stop t) perform +end + let _ = run_test_tt_main @@ ("Run Zarr Eio API tests" >::: [ "test eio-based stores" >:: @@ -126,5 +251,14 @@ let _ = (* test just opening the now exisitant archive created by the previous test. *) ZipStore.with_open `Read_only zpath (fun _ -> ()); test_storage (module MemoryStore) (MemoryStore.create ()); - test_storage (module FilesystemStore) s) + test_storage (module FilesystemStore) s; + + let server = Dir_http_server.make ~max_connections:100 ~dir:tmp_dir () in + Dir_http_server.run_with server (fun () -> + HttpStore.with_open + ~net:env#net + (Uri.of_string "http://127.0.0.1:8080") + (test_readable_writable_only (module HttpStore)) + ) + ) ]) diff --git a/zarr-lwt.opam b/zarr-lwt.opam index f5e43a41..6a13831d 100644 --- a/zarr-lwt.opam +++ b/zarr-lwt.opam @@ -14,8 +14,10 @@ depends: [ "zarr" {= version} "lwt" {>= "2.5.1"} "aws-s3-lwt" {>= "4.8.1"} + "ezcurl-lwt" {>= "0.2.4"} "odoc" {with-doc} "ounit2" {with-test} + "tiny_httpd" {with-test} "ppx_deriving" {with-test} "bisect_ppx" {dev & >= "2.5.0" & with-test} ] diff --git a/zarr-lwt/src/dune b/zarr-lwt/src/dune index 9df95697..a548a0ab 100644 --- a/zarr-lwt/src/dune +++ b/zarr-lwt/src/dune @@ -4,6 +4,7 @@ (libraries zarr aws-s3-lwt + ezcurl-lwt lwt lwt.unix) (ocamlopt_flags diff --git a/zarr-lwt/src/storage.ml b/zarr-lwt/src/storage.ml index 52e8cf92..adfa18c0 100644 --- a/zarr-lwt/src/storage.ml +++ b/zarr-lwt/src/storage.ml @@ -21,6 +21,7 @@ end module ZipStore = Zarr.Zip.Make(IO) module MemoryStore = Zarr.Memory.Make(IO) +module HttpStore = Zarr.Http.Make(IO)(Ezcurl_lwt) module FilesystemStore = struct module S = struct diff --git a/zarr-lwt/src/storage.mli b/zarr-lwt/src/storage.mli index 774a4e14..34a7cedc 100644 --- a/zarr-lwt/src/storage.mli +++ b/zarr-lwt/src/storage.mli @@ -6,6 +6,9 @@ module MemoryStore : Zarr.Memory.S with type 'a io := 'a Lwt.t (** An Lwt-aware Zip file storage backend for a Zarr v3 hierarchy. *) module ZipStore : Zarr.Zip.S with type 'a io := 'a Lwt.t +(** An Lwt-aware Http storage backend for a Zarr v3 hierarchy. *) +module HttpStore : Zarr.Http.S with type 'a io := 'a Lwt.t + (** An Lwt-aware local filesystem storage backend for a Zarr V3 hierarchy. *) module FilesystemStore : sig include Zarr.Storage.S with type 'a io := 'a Lwt.t diff --git a/zarr-lwt/test/dune b/zarr-lwt/test/dune index 8911ba9f..2c27db5b 100644 --- a/zarr-lwt/test/dune +++ b/zarr-lwt/test/dune @@ -2,6 +2,7 @@ (name test_lwt) (libraries zarr-lwt + tiny_httpd ounit2) (package zarr-lwt) (preprocess diff --git a/zarr-lwt/test/test_lwt.ml b/zarr-lwt/test/test_lwt.ml index d3fadd27..be4c47da 100644 --- a/zarr-lwt/test/test_lwt.ml +++ b/zarr-lwt/test/test_lwt.ml @@ -113,6 +113,141 @@ let test_storage assert_equal ~printer:print_node_pair ([], []) got; IO.return_unit +module type SYNC_PARTIAL_STORE = sig + exception Not_implemented + include Zarr.Storage.S with type 'a io := 'a Lwt.t +end + +let test_readable_writable_only + (type a) (module M : SYNC_PARTIAL_STORE with type t = a) (store : a) = + let open M in + let open IO.Syntax in + let assert_not_implemented f = + Lwt.catch + (fun () -> let* _ = f () in IO.return_unit) + (function + | Not_implemented -> IO.return_unit + | _ -> failwith "Supposed to raise Not_implemented") + in + let gnode = Node.Group.root in + let attrs = `Assoc [("questions", `String "answer")] in + let* () = Group.create ~attrs store gnode in + let* exists = Group.exists store gnode in + assert_equal ~printer:string_of_bool true exists; + let* meta = Group.metadata store gnode in + assert_equal ~printer:Yojson.Safe.show attrs (Metadata.Group.attributes meta); + let* exists = Array.exists store Node.Array.(gnode / "non-member") in + assert_equal ~printer:string_of_bool false exists; + let cfg = + {chunk_shape = [2; 5; 5] + ;index_location = End + ;index_codecs = [`Bytes LE] + ;codecs = [`Transpose [2; 0; 1]; `Bytes BE]} in + let anode = Node.Array.(gnode / "arrnode") + and slice = [R (0, 5); I 10; R (0, 10)] + and bigger_slice = [R (0, 6); L [9; 10] ; R (0, 11)] + and codecs = [`ShardingIndexed cfg] and shape = [100; 100; 50] and chunks = [10; 15; 20] in + let* () = Array.create ~codecs ~shape ~chunks Complex32 Complex.one anode store in + let exp = Ndarray.init Complex32 [6; 1; 11] (Fun.const Complex.one) in + let* got = Array.read store anode slice Complex32 in + assert_equal exp got; + Ndarray.fill exp Complex.{re=2.0; im=0.}; + let* () = Array.write store anode slice exp in + let* got = Array.read store anode slice Complex32 in + (* test if a bigger slice containing new elements can be read from store *) + let* _ = Array.read store anode bigger_slice Complex32 in + assert_equal exp got; + (* test writing a bigger slice to store *) + let* () = Array.write store anode bigger_slice @@ Ndarray.init Complex32 [7; 2; 12] (Fun.const Complex.{re=0.; im=3.0}) in + let* got = Array.read store anode slice Complex32 in + Ndarray.fill exp Complex.{re=0.; im=3.0}; + assert_equal exp got; + let nshape = [25; 28; 10] in + let* () = Array.reshape store anode nshape in + let* meta = Array.metadata store anode in + assert_equal ~printer:[%show: int list] nshape (Metadata.Array.shape meta); + let* () = assert_not_implemented (fun () -> Array.rename store anode "newname") in + let* () = assert_not_implemented (fun () -> Group.children store gnode) in + let* () = assert_not_implemented (fun () -> hierarchy store) in + let* () = assert_not_implemented (fun () -> Group.delete store gnode) in + let* () = assert_not_implemented (fun () -> clear store) in + IO.return_unit + +module Dir_http_server = struct + module S = Tiny_httpd + + let make ~max_connections ~dir () = + let server = S.create ~max_connections ~addr:"127.0.0.1" ~port:8080 () in + (* HEAD request handler *) + S.add_route_handler server ~meth:`HEAD S.Route.rest_of_path_urlencoded (fun path _ -> + let fspath = Filename.concat dir path in + match In_channel.(with_open_gen [Open_rdonly] 0o700 fspath length) with + | exception Sys_error e -> S.Response.make_raw ~code:404 e + | l -> + let headers = + [("Content-Length", Int64.to_string l) + ;("Content-Type", + if String.ends_with ~suffix:".json" path + then "application/json" + else "application/octet-stream")] + in + let r = S.Response.make_raw ~code:200 "" in + S.Response.update_headers (List.append headers) r + ); + (* GET request handler *) + S.add_route_handler server ~meth:`GET S.Route.rest_of_path_urlencoded (fun path _ -> + let fspath = Filename.concat dir path in + match In_channel.(with_open_gen [Open_rdonly] 0o700 fspath input_all) with + | exception Sys_error _ -> S.Response.make_raw ~code:404 (Printf.sprintf "%s not found" path) + | s -> + let headers = + [("Content-Length", Int.to_string (String.length s)) + ;("Content-Type", + if String.ends_with ~suffix:".json" path + then "application/json" + else "application/octet-stream")] + in + S.Response.make_raw ~headers ~code:200 s + ); + (* POST request handler *) + S.add_route_handler_stream server ~meth:`POST S.Route.rest_of_path_urlencoded (fun path req -> + let write oc = + let max_size = 1024 * 10 * 1024 in + let req' = S.Request.limit_body_size ~bytes:(Bytes.create 4096) ~max_size req in + S.IO.Input.iter (Out_channel.output oc) req'.body; + Out_channel.flush oc + in + let fspath = Filename.concat dir path in + Zarr.Util.create_parent_dir fspath 0o700; + let f = [Open_wronly; Open_trunc; Open_creat] in + match Out_channel.(with_open_gen f 0o700 fspath write) with + | exception Sys_error e -> S.Response.make_raw ~code:500 e + | () -> + let opt = List.assoc_opt "content-type" req.headers in + let content_type = Option.fold ~none:"application/octet-stream" ~some:Fun.id opt in + let headers = [("content-type", content_type); ("Connection", "close")] in + S.Response.make_raw ~headers ~code:201 (Printf.sprintf "%s created" path) + ); + (* DELETE request handler *) + S.add_route_handler server ~meth:`DELETE S.Route.rest_of_path_urlencoded (fun path _ -> + let fspath = Filename.concat dir path in + match Sys.remove fspath with + | exception Sys_error e -> S.Response.make_raw ~code:404 e + | () -> + let headers = [("Connection", "close")] in + S.Response.make_raw ~headers ~code:200 (Printf.sprintf "%s deleted successfully" path) + ); + server + + let run_with t after_init = + let perform () = + let _ = Thread.create S.run_exn t in + Lwt.dont_wait after_init raise; + IO.return_unit + in + Fun.protect ~finally:(fun () -> S.stop t) perform +end + let _ = run_test_tt_main @@ ("Run Zarr Lwt API tests" >::: [ "test lwt-based stores" >:: @@ -145,11 +280,19 @@ let _ = and bucket = "test-bucket-lwt" and profile = "default" in - Lwt_main.run @@ Lwt.join - [ZipStore.with_open `Read_write zpath (fun z -> test_storage (module ZipStore) z) + let promises = + [ZipStore.with_open `Read_write zpath (test_storage (module ZipStore)) (* test just opening the now exisitant archive created by the previous test. *) ;ZipStore.with_open `Read_only zpath (fun _ -> Lwt.return_unit) ;AmazonS3Store.with_open ~region ~bucket ~profile (test_storage (module AmazonS3Store)) - ;test_storage (module MemoryStore) @@ MemoryStore.create () - ;test_storage (module FilesystemStore) s]) + ;test_storage (module MemoryStore) (MemoryStore.create ()) + ;test_storage (module FilesystemStore) s + ;begin + let server = Dir_http_server.make ~max_connections:1 ~dir:tmp_dir () in + Dir_http_server.run_with server @@ fun () -> + HttpStore.with_open "127.0.0.1:8080" (test_readable_writable_only (module HttpStore)) + end + ] + in + Lwt_main.run @@ Lwt.join promises) ]) diff --git a/zarr-sync.opam b/zarr-sync.opam index 0006d9af..48446577 100644 --- a/zarr-sync.opam +++ b/zarr-sync.opam @@ -12,9 +12,11 @@ depends: [ "dune" {>= "3.15"} "ocaml" {>= "4.14.0"} "zarr" {= version} + "ezcurl" {>= "0.2.4"} "odoc" {with-doc} "ounit2" {with-test} "ppx_deriving" {with-test} + "tiny_httpd" {with-test} "bisect_ppx" {dev & >= "2.5.0" & with-test} ] build: [ diff --git a/zarr-sync/src/dune b/zarr-sync/src/dune index 62bcee4d..a428efdb 100644 --- a/zarr-sync/src/dune +++ b/zarr-sync/src/dune @@ -1,7 +1,9 @@ (library (name zarr_sync) (public_name zarr-sync) - (libraries zarr) + (libraries + zarr + ezcurl) (ocamlopt_flags (:standard -O3)) (instrumentation diff --git a/zarr-sync/src/storage.ml b/zarr-sync/src/storage.ml index 7b0b738e..6b0da1e5 100644 --- a/zarr-sync/src/storage.ml +++ b/zarr-sync/src/storage.ml @@ -21,6 +21,7 @@ end module ZipStore = Zarr.Zip.Make(IO) module MemoryStore = Zarr.Memory.Make(IO) +module HttpStore = Zarr.Http.Make(IO)(Ezcurl) module FilesystemStore = struct module S = struct diff --git a/zarr-sync/src/storage.mli b/zarr-sync/src/storage.mli index fde585a9..e74eaa2a 100644 --- a/zarr-sync/src/storage.mli +++ b/zarr-sync/src/storage.mli @@ -6,6 +6,9 @@ module MemoryStore : Zarr.Memory.S with type 'a io := 'a (** A blocking I/O Zip file storage backend for a Zarr v3 hierarchy. *) module ZipStore : Zarr.Zip.S with type 'a io := 'a +(** A blocking I/O Http storage backend for a Zarr v3 hierarchy. *) +module HttpStore : Zarr.Http.S with type 'a io := 'a + (** A blocking I/O local filesystem storage backend for a Zarr v3 hierarchy. *) module FilesystemStore : sig include Zarr.Storage.S with type 'a io := 'a diff --git a/zarr-sync/test/dune b/zarr-sync/test/dune index df5d2683..9f0cb7fe 100644 --- a/zarr-sync/test/dune +++ b/zarr-sync/test/dune @@ -2,6 +2,7 @@ (name test_sync) (libraries zarr-sync + tiny_httpd ounit2) (package zarr-sync) (preprocess diff --git a/zarr-sync/test/test_sync.ml b/zarr-sync/test/test_sync.ml index 942949d1..5a43bd39 100644 --- a/zarr-sync/test/test_sync.ml +++ b/zarr-sync/test/test_sync.ml @@ -159,6 +159,123 @@ let test_storage let got = hierarchy store in assert_equal ~printer:print_node_pair ([], []) got +module type SYNC_PARTIAL_STORE = sig + exception Not_implemented + include Zarr.Storage.S with type 'a io := 'a +end + +let test_readable_writable_only + (type a) (module M : SYNC_PARTIAL_STORE with type t = a) (store : a) = + let open M in + let gnode = Node.Group.root in + let attrs = `Assoc [("questions", `String "answer")] in + Group.create ~attrs store gnode; + let exists = Group.exists store gnode in + let meta = Group.metadata store gnode in + assert_equal ~printer:string_of_bool true exists; + assert_equal ~printer:Yojson.Safe.show attrs (Metadata.Group.attributes meta); + let exists = Array.exists store Node.Array.(gnode / "non-member") in + assert_equal ~printer:string_of_bool false exists; + + let cfg = + {chunk_shape = [2; 5; 5] + ;index_location = End + ;index_codecs = [`Bytes LE] + ;codecs = [`Transpose [2; 0; 1]; `Bytes BE]} in + let anode = Node.Array.(gnode / "arrnode") in + let slice = [R (0, 5); I 10; R (0, 10)] in + let bigger_slice = [R (0, 6); L [9; 10] ; R (0, 11)] in + Array.create + ~codecs:[`ShardingIndexed cfg] ~shape:[100; 100; 50] ~chunks:[10; 15; 20] + Complex32 Complex.one anode store; + let exp = Ndarray.init Complex32 [6; 1; 11] (Fun.const Complex.one) in + let got = Array.read store anode slice Complex32 in + assert_equal exp got; + Ndarray.fill exp Complex.{re=2.0; im=0.}; + Array.write store anode slice exp; + let got = Array.read store anode slice Complex32 in + (* test if a bigger slice containing new elements can be read from store *) + let _ = Array.read store anode bigger_slice Complex32 in + assert_equal exp got; + (* test writing a bigger slice to store *) + Array.write store anode bigger_slice @@ Ndarray.init Complex32 [7; 2; 12] (Fun.const Complex.{re=0.; im=3.0}); + let got = Array.read store anode slice Complex32 in + Ndarray.fill exp Complex.{re=0.; im=3.0}; + assert_equal exp got; + let nshape = [25; 28; 10] in + Array.reshape store anode nshape; + let meta = Array.metadata store anode in + assert_equal ~printer:[%show: int list] nshape (Metadata.Array.shape meta); + assert_raises + (Zarr.Storage.Invalid_resize_shape) + (fun () -> Array.reshape store anode [25; 10]); + assert_raises Not_implemented (fun () -> Array.rename store anode "newname"); + assert_raises Not_implemented (fun () -> Group.children store gnode); + assert_raises Not_implemented (fun () -> hierarchy store); + assert_raises Not_implemented (fun () -> Group.delete store gnode); + assert_raises Not_implemented (fun () -> clear store) + +module Dir_http_server = struct + module S = Tiny_httpd + + let make ~max_connections ~dir () = + let server = S.create ~max_connections ~addr:"127.0.0.1" ~port:8080 () in + (* HEAD request handler *) + S.add_route_handler server ~meth:`HEAD S.Route.rest_of_path_urlencoded (fun path _ -> + let headers = [("Content-Type", if String.ends_with ~suffix:".json" path then "application/json" else "application/octet-stream")] in + let fspath = Filename.concat dir path in + let headers = match In_channel.(with_open_gen [Open_rdonly] 0o700 fspath length) with + | exception Sys_error _ -> ("Content-Length", "0") :: headers + | l -> ("Content-Length", Int64.to_string l) :: headers + in + let r = S.Response.make_raw ~code:200 "" in + S.Response.update_headers (List.append headers) r + ); + (* GET request handler *) + S.add_route_handler server ~meth:`GET S.Route.rest_of_path_urlencoded (fun path _ -> + let fspath = Filename.concat dir path in + match In_channel.(with_open_gen [Open_rdonly] 0o700 fspath input_all) with + | exception Sys_error _ -> S.Response.make_raw ~code:404 (Printf.sprintf "%s not found" path) + | s -> + let headers = + [("Content-Length", Int.to_string (String.length s)) + ;("Content-Type", + if String.ends_with ~suffix:".json" path + then "application/json" + else "application/octet-stream")] + in + S.Response.make_raw ~headers ~code:200 s + ); + (* POST request handler *) + S.add_route_handler server ~meth:`POST S.Route.rest_of_path_urlencoded (fun path req -> + let write oc = Out_channel.(output_string oc req.S.Request.body; flush oc) in + let fspath = Filename.concat dir path in + Zarr.Util.create_parent_dir fspath 0o700; + let f = [Open_wronly; Open_trunc; Open_creat] in + match Out_channel.(with_open_gen f 0o700 fspath write) with + | exception Sys_error e -> S.Response.fail ~code:500 "Upload error: %s" e + | () -> + let opt = List.assoc_opt "content-type" req.headers in + let content_type = Option.fold ~none:"application/octet-stream" ~some:Fun.id opt in + let headers = [("content-type", content_type); ("Connection", "close")] in + S.Response.make_string ~headers (Ok (Printf.sprintf "%s created" path)) + ); + (* DELETE request handler *) + S.add_route_handler server ~meth:`DELETE S.Route.rest_of_path_urlencoded (fun path _ -> + let fspath = Filename.concat dir path in + match Sys.remove fspath with + | exception Sys_error e -> S.Response.make_raw ~code:404 e + | () -> + let headers = [("Connection", "close")] in + S.Response.make_raw ~headers ~code:200 (Printf.sprintf "%s deleted successfully" path) + ); + server + + let run_with t after_init = + let perform () = let _ = Thread.create S.run_exn t in after_init () in + Fun.protect ~finally:(fun () -> S.stop t) perform +end + let _ = run_test_tt_main @@ ("Run Zarr sync API tests" >::: [ "test sync-based stores" >:: @@ -203,5 +320,10 @@ let _ = (* test just opening the now exisitant archive created by the previous test. *) ZipStore.with_open `Read_only zpath (fun _ -> ()); test_storage (module MemoryStore) @@ MemoryStore.create (); - test_storage (module FilesystemStore) s) + test_storage (module FilesystemStore) s; + + let server = Dir_http_server.make ~max_connections:1 ~dir:tmp_dir () in + Dir_http_server.run_with server (fun () -> + HttpStore.with_open "127.0.0.1:8080" (test_readable_writable_only (module HttpStore))) + ) ]) diff --git a/zarr.opam b/zarr.opam index b70d30c7..587af320 100644 --- a/zarr.opam +++ b/zarr.opam @@ -20,6 +20,7 @@ depends: [ "stdint" {>= "0.7.2"} "zipc" {>= "0.2.0"} "checkseum" {>= "0.4.0"} + "bytesrw" {>= "0.1.0"} "odoc" {with-doc} "ounit2" {with-test} "ppx_deriving" {with-test} @@ -40,6 +41,3 @@ build: [ ] ] dev-repo: "git+https://github.com/zoj613/zarr-ml.git" -pin-depends: [ - ["bytesrw.dev" "git+https://erratique.ch/repos/bytesrw.git"] -] diff --git a/zarr.opam.template b/zarr.opam.template deleted file mode 100644 index 4e82fcfe..00000000 --- a/zarr.opam.template +++ /dev/null @@ -1,3 +0,0 @@ -pin-depends: [ - ["bytesrw.dev" "git+https://erratique.ch/repos/bytesrw.git"] -] diff --git a/zarr/src/dune b/zarr/src/dune index 27d3b773..4d9806f4 100644 --- a/zarr/src/dune +++ b/zarr/src/dune @@ -6,6 +6,7 @@ bytesrw.zstd bytesrw.zlib zipc + ezcurl stdint checkseum) (ocamlopt_flags diff --git a/zarr/src/storage/http.ml b/zarr/src/storage/http.ml new file mode 100644 index 00000000..bc374fc4 --- /dev/null +++ b/zarr/src/storage/http.ml @@ -0,0 +1,154 @@ +module type S = sig + exception Not_implemented + exception Request_failed of int * string + include Storage.S + + type auth = {user : string; pwd : string} + + val with_open : + ?basic_auth:auth -> + ?redirects:int -> + ?tries:int -> + ?timeout:int -> + string -> + (t -> 'a io) -> + 'a io + (** [with_open url f] connects to the Zarr store described by the url [url] + and applies function [f] to the store's open handle. + + {ul + {- [basic_auth] is the username and password to use for each request if + required by the server.} + {- [redirects] is the maximum number of redirects allowed per http request. + Defaults to 5.} + {- [tries] is the maximum number of times to retry a failed request. + Defaults to 3.} + {- [timeout] is the timeout for the connect phase. It sets the maximum + time in seconds that you allow the connection phase to take. This + timeout only limits the connection phase, it has no impact once the + client has connected. The connection phase includes the name resolve + (DNS) and all protocol handshakes and negotiations until there is an + established connection with the remote side. + } *) +end + +module type C = sig + include module type of Ezcurl_core + include Ezcurl_core.S +end + +module Make (IO : Types.IO) (C : C with type 'a io = 'a IO.t) : S with type 'a io := 'a IO.t = struct + exception Not_implemented + exception Request_failed of int * string + open IO.Syntax + + let raise_error (code, s) = raise (Request_failed (Curl.int_of_curlCode code, s)) + let fold_result = Result.fold ~error:raise_error ~ok:Fun.id + + module Store = struct + type t = {tries : int; client : C.t; base_url : string; config : Ezcurl_core.Config.t} + type 'a io = 'a IO.t + + let get t key = + let tries = t.tries and client = t.client and config = t.config in + let url = t.base_url ^ key in + let+ res = C.get ~tries ~client ~config ~url () in + match fold_result res with + | {code; body; _} when code = 200 -> body + | {code; body; _} -> raise (Request_failed (code, body)) + + let size t key = try IO.map String.length (get t key) with + | Request_failed (404, _) -> IO.return 0 + (*let size t key = + let tries = t.tries and client = t.client and config = t.config in + let url = t.base_url ^ key in + let type' = if String.ends_with ~suffix:".json" key then "json" else "octet-stream" in + let headers = [("Content-Type", "application/" ^ type')] in + let* res = C.http ~headers ~tries ~client ~config ~url ~meth:HEAD () in + match res with + | Error _ -> IO.return 0 + | Ok {code; _} when code = 404 -> IO.return 0 + | Ok {headers; code; _} when code = 200 -> + begin match List.assoc_opt "content-length" headers with + | Some "0" -> IO.return 0 + | Some l -> IO.return @@ int_of_string l + | None -> + begin try print_endline "empty content-length header"; + IO.map String.length (get t key) with + | Request_failed (404, _) -> IO.return 0 end + end + | Ok {code; body; _} -> raise (Request_failed (code, body)) *) + + let is_member t key = IO.map (fun s -> if s > 0 then true else false) (size t key) + + let get_partial_values t key ranges = + let tries = t.tries and client = t.client and config = t.config and url = t.base_url ^ key in + let fetch range = C.get ~range ~tries ~client ~config ~url () + and end_index ofs l = Printf.sprintf "%d-%d" ofs (ofs + l - 1) in + let read_range acc (ofs, len) = + let none = Printf.sprintf "%d-" ofs in + let range = Option.fold ~none ~some:(end_index ofs) len in + IO.map (fun r -> (fold_result r).body :: acc) (fetch range) + in + IO.fold_left read_range [] (List.rev ranges) + + let set t key data = + let tries = t.tries and client = t.client and config = t.config + and url = t.base_url ^ key and content = `String data in + let type' = if String.ends_with ~suffix:".json" key then "json" else "octet-stream" in + let headers = + [("Content-Length", string_of_int (String.length data)) + ;("Content-Type", "application/" ^ type')] in + let+ res = C.post ~params:[] ~headers ~tries ~client ~config ~url ~content () in + match fold_result res with + | {code; _} when code >= 200 && code < 300 -> () + | {code; body; _} -> raise (Request_failed (code, body)) + + let set_partial_values t key ?(append=false) rsv = + let* size = size t key in + let* ov = match size with + | 0 -> IO.return String.empty + | _ -> get t key + in + let f = if append || ov = String.empty then + fun acc (_, v) -> acc ^ v else + fun acc (rs, v) -> + let s = Bytes.unsafe_of_string acc in + Bytes.blit_string v 0 s rs String.(length v); + Bytes.unsafe_to_string s + in + set t key (List.fold_left f ov rsv) + + (* make reshaping arrays possible *) + (*let erase t key = + let tries = t.tries and client = t.client and config = t.config in + let url = t.base_url ^ key in + let+ res = C.http ~tries ~client ~config ~url ~meth:DELETE () in + match fold_result res with + | {code; _} when code = 200 || code = 404 -> () + | {code; body; _} -> raise (Request_failed (code, body)) *) + + let erase _ = raise Not_implemented + let erase_prefix _ = raise Not_implemented + let list _ = raise Not_implemented + let list_dir _ = raise Not_implemented + let rename _ = raise Not_implemented + end + + type auth = {user : string; pwd : string} + + let default_cfg = Ezcurl_core.Config.(default |> follow_location true |> authmethod [CURLAUTH_ANY]) + + let with_open ?(basic_auth={user=""; pwd =""}) ?(redirects=5) ?(tries=3) ?(timeout=5) url f = + let set_opts client = Curl.set_connecttimeout client timeout in + let perform client = + let config = Ezcurl_core.Config.max_redirects redirects default_cfg + |> Ezcurl_core.Config.username basic_auth.user + |> Ezcurl_core.Config.password basic_auth.pwd + in + f Store.{tries; client; config; base_url = url ^ "/"} + in + C.with_client ~set_opts perform + + include Storage.Make(IO)(Store) +end diff --git a/zarr/src/zarr.ml b/zarr/src/zarr.ml index 330a7962..5e6fb25c 100644 --- a/zarr/src/zarr.ml +++ b/zarr/src/zarr.ml @@ -6,5 +6,6 @@ module Storage = Storage module Codecs = Codecs module Memory = Memory module Zip = Zip +module Http = Http module Types = Types module Ndarray = Ndarray diff --git a/zarr/src/zarr.mli b/zarr/src/zarr.mli index fb74a7e1..25e7be64 100644 --- a/zarr/src/zarr.mli +++ b/zarr/src/zarr.mli @@ -31,6 +31,7 @@ module Metadata = Metadata module Storage = Storage module Memory = Memory module Zip = Zip +module Http = Http module Types = Types (** {1 Codecs} *)