diff --git a/lib/file_store.ex b/lib/file_store.ex index a77474c..f632a0d 100644 --- a/lib/file_store.ex +++ b/lib/file_store.ex @@ -16,24 +16,33 @@ defprotocol FileStore do it's usage. """ + @typedoc "The file key." @type key :: binary() - @type list_opts :: [{:prefix, binary()}] - @type delete_all_opts :: [{:prefix, binary()}] - @type write_opts :: [ - {:content_type, binary()} - | {:disposition, binary()} - ] - - @type public_url_opts :: [ - {:content_type, binary()} - | {:disposition, binary()} - ] - - @type signed_url_opts :: [ - {:content_type, binary()} - | {:disposition, binary()} - | {:expires_in, integer()} - ] + + @typedoc "The prefix path option." + @type prefix_opt :: {:prefix, binary()} + + @typedoc "The file content disposition hint for the adapter." + @type disposition_opt :: {:disposition, binary()} + + @typedoc "The file content type hint for the adapter." + @type content_type_opt :: {:content_type, binary()} + + @typedoc "The number of seconds the URL will expire in." + @type expires_in_opt :: {:expires_in, integer()} + + @typedoc "The number of bytes to chunk a stream with." + @type chunk_size_opt :: {:chunk_size, pos_integer()} + + @typedoc "Streams a file line by line." + @type line_opt :: {:line, boolean()} + + @type list_opts :: [prefix_opt()] + @type delete_all_opts :: [prefix_opt()] + @type write_opts :: [content_type_opt() | disposition_opt()] + @type public_url_opts :: [content_type_opt() | disposition_opt()] + @type signed_url_opts :: [content_type_opt() | disposition_opt() | expires_in_opt()] + @type stream_opts :: [chunk_size_opt() | line_opt()] @doc """ Write a file to the store. If a file with the given `key` @@ -65,6 +74,19 @@ defprotocol FileStore do @spec read(t, key) :: {:ok, binary} | {:error, term} def read(store, key) + @doc """ + Stream the file from from the store. + + ## Options + + * `:chunk_size` - The number of bytes for each chunk of data. It can not be + specified with `:line`. + * `:line` - Streams the file line by line. It can not be specified with + `:chunk_size`. + """ + @spec stream!(t, key, stream_opts) :: Enumerable.t() + def stream!(store, key, opts \\ []) + @doc """ Upload a file to the store. If a file with the given `key` already exists, it will be overwritten. diff --git a/lib/file_store/adapters/disk.ex b/lib/file_store/adapters/disk.ex index 63a6fc5..4e9cd95 100644 --- a/lib/file_store/adapters/disk.ex +++ b/lib/file_store/adapters/disk.ex @@ -114,6 +114,17 @@ defmodule FileStore.Adapters.Disk do store |> Disk.join(key) |> File.read() end + def stream!(store, key, opts \\ []) do + path = Disk.join(store, key) + + if opts[:line] do + File.stream!(path, :line) + else + chunk_size = opts[:chunk_size] || 2048 + File.stream!(path, chunk_size) + end + end + def copy(store, src, dest) do with {:ok, src} <- expand(store, src), {:ok, dest} <- expand(store, dest), diff --git a/lib/file_store/adapters/memory.ex b/lib/file_store/adapters/memory.ex index d997b3a..81e8a60 100644 --- a/lib/file_store/adapters/memory.ex +++ b/lib/file_store/adapters/memory.ex @@ -130,6 +130,29 @@ defmodule FileStore.Adapters.Memory do end) end + def stream!(store, key, opts \\ []) do + Agent.get(store.name, fn state -> + case Map.fetch(state, key) do + :error -> + raise FileStore.Error, reason: "file does not exist", key: key, action: "stream" + + {:ok, data} -> + do_stream!(data, opts) + end + end) + end + + defp do_stream!(data, opts) do + {:ok, stream} = StringIO.open(data) + + if opts[:line] do + IO.binstream(stream, :line) + else + chunk_size = opts[:chunk_size] || 2048 + IO.binstream(stream, chunk_size) + end + end + def copy(store, src, dest) do Agent.get_and_update(store.name, fn state -> case Map.fetch(state, src) do diff --git a/lib/file_store/adapters/null.ex b/lib/file_store/adapters/null.ex index a47f873..8ba051f 100644 --- a/lib/file_store/adapters/null.ex +++ b/lib/file_store/adapters/null.ex @@ -47,6 +47,7 @@ defmodule FileStore.Adapters.Null do def download(_store, _key, _destination), do: :ok def write(_store, _key, _content, _opts \\ []), do: :ok def read(_store, _key), do: {:ok, ""} + def stream!(_store, _key, _opts \\ []), do: Stream.into([], []) def copy(_store, _src, _dest), do: :ok def rename(_store, _src, _dest), do: :ok def list!(_store, _opts), do: Stream.into([], []) diff --git a/lib/file_store/adapters/s3.ex b/lib/file_store/adapters/s3.ex index 1e8e6b1..8ddc936 100644 --- a/lib/file_store/adapters/s3.ex +++ b/lib/file_store/adapters/s3.ex @@ -133,6 +133,23 @@ if Code.ensure_loaded?(ExAws.S3) do end end + def stream!(store, key, opts) do + aws_opts = Keyword.take(opts, [:chunk_size]) + + stream = + store.bucket + |> ExAws.S3.download_file(key, :memory, aws_opts) + |> ExAws.stream!() + + if opts[:line] do + stream + |> Stream.chunk_while("", &chunk_by_fun/2, &to_line_stream_after_fun/1) + |> Stream.concat() + else + stream + end + end + def upload(store, source, key) do source |> ExAws.S3.Upload.stream_file() @@ -195,6 +212,37 @@ if Code.ensure_loaded?(ExAws.S3) do defp to_integer(nil), do: nil defp to_integer(value) when is_integer(value), do: value defp to_integer(value) when is_binary(value), do: String.to_integer(value) + + defp chunk_by_fun(chunk, acc) do + to_try = acc <> chunk + {elements, acc} = chunk_by_newline(to_try, "\n", [], {0, byte_size(to_try)}) + {:cont, elements, acc} + end + + defp chunk_by_newline(_string, _newline, elements, {_offset, 0}) do + {Enum.reverse(elements), ""} + end + + defp chunk_by_newline(string, newline, elements, {offset, length}) do + case :binary.match(string, newline, scope: {offset, length}) do + {newline_offset, newline_length} -> + difference = newline_length + newline_offset - offset + element = binary_part(string, offset, difference) + + chunk_by_newline( + string, + newline, + [element | elements], + {newline_offset + newline_length, length - difference} + ) + + :nomatch -> + {Enum.reverse(elements), binary_part(string, offset, length)} + end + end + + defp to_line_stream_after_fun(""), do: {:cont, []} + defp to_line_stream_after_fun(acc), do: {:cont, [acc], []} end end end diff --git a/lib/file_store/middleware/errors.ex b/lib/file_store/middleware/errors.ex index a495db9..20557a6 100644 --- a/lib/file_store/middleware/errors.ex +++ b/lib/file_store/middleware/errors.ex @@ -58,6 +58,10 @@ defmodule FileStore.Middleware.Errors do |> wrap(action: "read key", key: key) end + def stream!(store, key, opts) do + FileStore.stream!(store.__next__, key, opts) + end + def copy(store, src, dest) do store.__next__ |> FileStore.copy(src, dest) diff --git a/lib/file_store/middleware/logger.ex b/lib/file_store/middleware/logger.ex index e2f1720..e07ddea 100644 --- a/lib/file_store/middleware/logger.ex +++ b/lib/file_store/middleware/logger.ex @@ -33,6 +33,12 @@ defmodule FileStore.Middleware.Logger do |> log("READ", key: key) end + def stream!(store, key, opts) do + store.__next__ + |> FileStore.stream!(key, opts) + |> log!("STREAM", key: key) + end + def copy(store, src, dest) do store.__next__ |> FileStore.copy(src, dest) @@ -102,6 +108,11 @@ defmodule FileStore.Middleware.Logger do {:error, error} end + defp log!(io, msg, meta) do + log(:ok, msg, meta) + io + end + defp format_meta(meta) do Enum.map(meta, fn {key, value} -> [Atom.to_string(key), ?\=, inspect(value)] diff --git a/lib/file_store/middleware/prefix.ex b/lib/file_store/middleware/prefix.ex index e6c3e14..2ff7bc2 100644 --- a/lib/file_store/middleware/prefix.ex +++ b/lib/file_store/middleware/prefix.ex @@ -46,6 +46,10 @@ defmodule FileStore.Middleware.Prefix do FileStore.read(store.__next__, put_prefix(key, store)) end + def stream!(store, key, opts) do + FileStore.stream!(store.__next__, put_prefix(key, store), opts) + end + def copy(store, src, dest) do FileStore.copy(store.__next__, put_prefix(src, store), put_prefix(dest, store)) end diff --git a/test/support/adapter_case.ex b/test/support/adapter_case.ex index 622a3b4..6397ed4 100644 --- a/test/support/adapter_case.ex +++ b/test/support/adapter_case.ex @@ -208,6 +208,52 @@ defmodule FileStore.AdapterCase do assert {:ok, _} = FileStore.stat(store, "bar") end end + + describe "stream!/3" do + test "the file exists and is chunked perfectly", %{store: store} do + chunks = Enum.map(~w(a b c d e f g h i j k l m n o), &String.duplicate(&1, 32)) + data = Enum.join(chunks, "") + + :ok = FileStore.write(store, "foo", data) + + assert stream = FileStore.stream!(store, "foo", chunk_size: 32) + assert chunks == Enum.to_list(stream) + end + + test "file exists but the chunk has a last chunk that is not full", %{store: store} do + chunks = Enum.map(~w(a b c d e f g h i j k l m n), &String.duplicate(&1, 32)) + chunks = chunks ++ ["oooooooo"] + data = Enum.join(chunks, "") + + :ok = FileStore.write(store, "foo", data) + + assert stream = FileStore.stream!(store, "foo", chunk_size: 32) + assert chunks == Enum.to_list(stream) + end + + test "file exists but we want to stream by line", %{store: store} do + lines = + Enum.map(~w(a b c d e f g h i j k l m n o), fn char -> + String.duplicate(char, 32) <> "\n" + end) + + data = Enum.join(lines, "") + + :ok = FileStore.write(store, "foo", data) + + assert stream = FileStore.stream!(store, "foo", line: true) + assert lines == Enum.to_list(stream) + end + + test "file does not have a newline in it", %{store: store} do + data = String.duplicate("a", 32) + + :ok = FileStore.write(store, "foo", data) + + assert stream = FileStore.stream!(store, "foo", line: true) + assert [data] == Enum.to_list(stream) + end + end end end diff --git a/test/support/error_adapter.ex b/test/support/error_adapter.ex index b383247..d3166d5 100644 --- a/test/support/error_adapter.ex +++ b/test/support/error_adapter.ex @@ -10,6 +10,11 @@ defmodule FileStore.Adapters.Error do defimpl FileStore do def write(_store, _key, _content, _opts \\ []), do: {:error, :boom} def read(_store, _key), do: {:error, :boom} + + def stream!(_store, key, _opts \\ []) do + raise FileStore.Error, reason: "Does not work", key: key, action: "stream" + end + def upload(_store, _source, _key), do: {:error, :boom} def download(_store, _key, _destination), do: {:error, :boom} def stat(_store, _key), do: {:error, :boom}