Skip to content

Commit

Permalink
Adds stream!/3 interface
Browse files Browse the repository at this point in the history
This is a rough draft of what I think the stream implementation should
be like. I do not know how I am going to test the error case where the
file can not be found with any of the adapters. The exception happens in
a supervised storage implementation that is inside of a macro for the
adapter cases.
  • Loading branch information
warmwaffles committed Jun 12, 2024
1 parent 0e732ce commit 245c10f
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 17 deletions.
56 changes: 39 additions & 17 deletions lib/file_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,33 @@ defprotocol FileStore do
it's usage.
"""

@typedoc "The file key."
@type key :: binary()
@type list_opts :: [{:prefix, binary()}]
@type delete_all_opts :: [{:prefix, binary()}]
@type write_opts :: [
{:content_type, binary()}
| {:disposition, binary()}
]

@type public_url_opts :: [
{:content_type, binary()}
| {:disposition, binary()}
]

@type signed_url_opts :: [
{:content_type, binary()}
| {:disposition, binary()}
| {:expires_in, integer()}
]

@typedoc "The prefix path option."
@type prefix_opt :: {:prefix, binary()}

@typedoc "The file content disposition hint for the adapter."
@type disposition_opt :: {:disposition, binary()}

@typedoc "The file content type hint for the adapter."
@type content_type_opt :: {:content_type, binary()}

@typedoc "The number of seconds the URL will expire in."
@type expires_in_opt :: {:expires_in, integer()}

@typedoc "The number of bytes to chunk a stream with."
@type chunk_size_opt :: {:chunk_size, pos_integer()}

@typedoc "Streams a file line by line."
@type line_opt :: {:line, boolean()}

@type list_opts :: [prefix_opt()]
@type delete_all_opts :: [prefix_opt()]
@type write_opts :: [content_type_opt() | disposition_opt()]
@type public_url_opts :: [content_type_opt() | disposition_opt()]
@type signed_url_opts :: [content_type_opt() | disposition_opt() | expires_in_opt()]
@type stream_opts :: [chunk_size_opt() | line_opt()]

@doc """
Write a file to the store. If a file with the given `key`
Expand Down Expand Up @@ -65,6 +74,19 @@ defprotocol FileStore do
@spec read(t, key) :: {:ok, binary} | {:error, term}
def read(store, key)

@doc """
Stream the file from from the store.
## Options
* `:chunk_size` - The number of bytes for each chunk of data. It can not be
specified with `:line`.
* `:line` - Streams the file line by line. It can not be specified with
`:chunk_size`.
"""
@spec stream!(t, key, stream_opts) :: Enumerable.t()
def stream!(store, key, opts \\ [])

@doc """
Upload a file to the store. If a file with the given `key`
already exists, it will be overwritten.
Expand Down
11 changes: 11 additions & 0 deletions lib/file_store/adapters/disk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ defmodule FileStore.Adapters.Disk do
store |> Disk.join(key) |> File.read()
end

def stream!(store, key, opts \\ []) do
path = Disk.join(store, key)

if opts[:line] do
File.stream!(path, :line)
else
chunk_size = opts[:chunk_size] || 2048
File.stream!(path, chunk_size)
end
end

def copy(store, src, dest) do
with {:ok, src} <- expand(store, src),
{:ok, dest} <- expand(store, dest),
Expand Down
23 changes: 23 additions & 0 deletions lib/file_store/adapters/memory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,29 @@ defmodule FileStore.Adapters.Memory do
end)
end

def stream!(store, key, opts \\ []) do
Agent.get(store.name, fn state ->
case Map.fetch(state, key) do
:error ->
raise FileStore.Error, reason: "file does not exist", key: key, action: "stream"

{:ok, data} ->
do_stream!(data, opts)
end
end)
end

defp do_stream!(data, opts) do
{:ok, stream} = StringIO.open(data)

if opts[:line] do
IO.binstream(stream, :line)
else
chunk_size = opts[:chunk_size] || 2048
IO.binstream(stream, chunk_size)
end
end

def copy(store, src, dest) do
Agent.get_and_update(store.name, fn state ->
case Map.fetch(state, src) do
Expand Down
1 change: 1 addition & 0 deletions lib/file_store/adapters/null.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ defmodule FileStore.Adapters.Null do
def download(_store, _key, _destination), do: :ok
def write(_store, _key, _content, _opts \\ []), do: :ok
def read(_store, _key), do: {:ok, ""}
def stream!(_store, _key, _opts \\ []), do: Stream.into([], [])
def copy(_store, _src, _dest), do: :ok
def rename(_store, _src, _dest), do: :ok
def list!(_store, _opts), do: Stream.into([], [])
Expand Down
48 changes: 48 additions & 0 deletions lib/file_store/adapters/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@ if Code.ensure_loaded?(ExAws.S3) do
end
end

def stream!(store, key, opts) do
aws_opts = Keyword.take(opts, [:chunk_size])

stream =
store.bucket
|> ExAws.S3.download_file(key, :memory, aws_opts)
|> ExAws.stream!()

if opts[:line] do
stream
|> Stream.chunk_while("", &chunk_by_fun/2, &to_line_stream_after_fun/1)
|> Stream.concat()
else
stream
end
end

def upload(store, source, key) do
source
|> ExAws.S3.Upload.stream_file()
Expand Down Expand Up @@ -195,6 +212,37 @@ if Code.ensure_loaded?(ExAws.S3) do
defp to_integer(nil), do: nil
defp to_integer(value) when is_integer(value), do: value
defp to_integer(value) when is_binary(value), do: String.to_integer(value)

defp chunk_by_fun(chunk, acc) do
to_try = acc <> chunk
{elements, acc} = chunk_by_newline(to_try, "\n", [], {0, byte_size(to_try)})
{:cont, elements, acc}
end

defp chunk_by_newline(_string, _newline, elements, {_offset, 0}) do
{Enum.reverse(elements), ""}
end

defp chunk_by_newline(string, newline, elements, {offset, length}) do
case :binary.match(string, newline, scope: {offset, length}) do
{newline_offset, newline_length} ->
difference = newline_length + newline_offset - offset
element = binary_part(string, offset, difference)

chunk_by_newline(
string,
newline,
[element | elements],
{newline_offset + newline_length, length - difference}
)

:nomatch ->
{Enum.reverse(elements), binary_part(string, offset, length)}
end
end

defp to_line_stream_after_fun(""), do: {:cont, []}
defp to_line_stream_after_fun(acc), do: {:cont, [acc], []}
end
end
end
4 changes: 4 additions & 0 deletions lib/file_store/middleware/errors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ defmodule FileStore.Middleware.Errors do
|> wrap(action: "read key", key: key)
end

def stream!(store, key, opts) do
FileStore.stream!(store.__next__, key, opts)
end

def copy(store, src, dest) do
store.__next__
|> FileStore.copy(src, dest)
Expand Down
11 changes: 11 additions & 0 deletions lib/file_store/middleware/logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ defmodule FileStore.Middleware.Logger do
|> log("READ", key: key)
end

def stream!(store, key, opts) do
store.__next__
|> FileStore.stream!(key, opts)
|> log!("STREAM", key: key)
end

def copy(store, src, dest) do
store.__next__
|> FileStore.copy(src, dest)
Expand Down Expand Up @@ -102,6 +108,11 @@ defmodule FileStore.Middleware.Logger do
{:error, error}
end

defp log!(io, msg, meta) do
log(:ok, msg, meta)
io
end

defp format_meta(meta) do
Enum.map(meta, fn {key, value} ->
[Atom.to_string(key), ?\=, inspect(value)]
Expand Down
4 changes: 4 additions & 0 deletions lib/file_store/middleware/prefix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ defmodule FileStore.Middleware.Prefix do
FileStore.read(store.__next__, put_prefix(key, store))
end

def stream!(store, key, opts) do
FileStore.stream!(store.__next__, put_prefix(key, store), opts)
end

def copy(store, src, dest) do
FileStore.copy(store.__next__, put_prefix(src, store), put_prefix(dest, store))
end
Expand Down
46 changes: 46 additions & 0 deletions test/support/adapter_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,52 @@ defmodule FileStore.AdapterCase do
assert {:ok, _} = FileStore.stat(store, "bar")
end
end

describe "stream!/3" do
test "the file exists and is chunked perfectly", %{store: store} do
chunks = Enum.map(~w(a b c d e f g h i j k l m n o), &String.duplicate(&1, 32))
data = Enum.join(chunks, "")

:ok = FileStore.write(store, "foo", data)

assert stream = FileStore.stream!(store, "foo", chunk_size: 32)
assert chunks == Enum.to_list(stream)
end

test "file exists but the chunk has a last chunk that is not full", %{store: store} do
chunks = Enum.map(~w(a b c d e f g h i j k l m n), &String.duplicate(&1, 32))
chunks = chunks ++ ["oooooooo"]
data = Enum.join(chunks, "")

:ok = FileStore.write(store, "foo", data)

assert stream = FileStore.stream!(store, "foo", chunk_size: 32)
assert chunks == Enum.to_list(stream)
end

test "file exists but we want to stream by line", %{store: store} do
lines =
Enum.map(~w(a b c d e f g h i j k l m n o), fn char ->
String.duplicate(char, 32) <> "\n"
end)

data = Enum.join(lines, "")

:ok = FileStore.write(store, "foo", data)

assert stream = FileStore.stream!(store, "foo", line: true)
assert lines == Enum.to_list(stream)
end

test "file does not have a newline in it", %{store: store} do
data = String.duplicate("a", 32)

:ok = FileStore.write(store, "foo", data)

assert stream = FileStore.stream!(store, "foo", line: true)
assert [data] == Enum.to_list(stream)
end
end
end
end

Expand Down
5 changes: 5 additions & 0 deletions test/support/error_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ defmodule FileStore.Adapters.Error do
defimpl FileStore do
def write(_store, _key, _content, _opts \\ []), do: {:error, :boom}
def read(_store, _key), do: {:error, :boom}

def stream!(_store, key, _opts \\ []) do
raise FileStore.Error, reason: "Does not work", key: key, action: "stream"
end

def upload(_store, _source, _key), do: {:error, :boom}
def download(_store, _key, _destination), do: {:error, :boom}
def stat(_store, _key), do: {:error, :boom}
Expand Down

0 comments on commit 245c10f

Please sign in to comment.