Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds stream!/3 interface #34

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 39 additions & 17 deletions lib/file_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,33 @@ defprotocol FileStore do
it's usage.
"""

@typedoc "The file key."
@type key :: binary()
@type list_opts :: [{:prefix, binary()}]
@type delete_all_opts :: [{:prefix, binary()}]
@type write_opts :: [
{:content_type, binary()}
| {:disposition, binary()}
]

@type public_url_opts :: [
{:content_type, binary()}
| {:disposition, binary()}
]

@type signed_url_opts :: [
{:content_type, binary()}
| {:disposition, binary()}
| {:expires_in, integer()}
]

@typedoc "The prefix path option."
@type prefix_opt :: {:prefix, binary()}

@typedoc "The file content disposition hint for the adapter."
@type disposition_opt :: {:disposition, binary()}

@typedoc "The file content type hint for the adapter."
@type content_type_opt :: {:content_type, binary()}

@typedoc "The number of seconds the URL will expire in."
@type expires_in_opt :: {:expires_in, integer()}

@typedoc "The number of bytes to chunk a stream with."
@type chunk_size_opt :: {:chunk_size, pos_integer()}

@typedoc "Streams a file line by line."
@type line_opt :: {:line, boolean()}

@type list_opts :: [prefix_opt()]
@type delete_all_opts :: [prefix_opt()]
@type write_opts :: [content_type_opt() | disposition_opt()]
@type public_url_opts :: [content_type_opt() | disposition_opt()]
@type signed_url_opts :: [content_type_opt() | disposition_opt() | expires_in_opt()]
@type stream_opts :: [chunk_size_opt() | line_opt()]

@doc """
Write a file to the store. If a file with the given `key`
Expand Down Expand Up @@ -65,6 +74,19 @@ defprotocol FileStore do
@spec read(t, key) :: {:ok, binary} | {:error, term}
def read(store, key)

@doc """
Stream the file from from the store.

## Options

* `:chunk_size` - The number of bytes for each chunk of data. It can not be
specified with `:line`.
* `:line` - Streams the file line by line. It can not be specified with
`:chunk_size`.
"""
@spec stream!(t, key, stream_opts) :: Enumerable.t()
def stream!(store, key, opts \\ [])

@doc """
Upload a file to the store. If a file with the given `key`
already exists, it will be overwritten.
Expand Down
11 changes: 11 additions & 0 deletions lib/file_store/adapters/disk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ defmodule FileStore.Adapters.Disk do
store |> Disk.join(key) |> File.read()
end

def stream!(store, key, opts \\ []) do
path = Disk.join(store, key)

if opts[:line] do
File.stream!(path, :line)
else
chunk_size = opts[:chunk_size] || 2048
File.stream!(path, chunk_size)
end
end

def copy(store, src, dest) do
with {:ok, src} <- expand(store, src),
{:ok, dest} <- expand(store, dest),
Expand Down
23 changes: 23 additions & 0 deletions lib/file_store/adapters/memory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,29 @@ defmodule FileStore.Adapters.Memory do
end)
end

def stream!(store, key, opts \\ []) do
Agent.get(store.name, fn state ->
case Map.fetch(state, key) do
:error ->
raise FileStore.Error, reason: "file does not exist", key: key, action: "stream"

{:ok, data} ->
do_stream!(data, opts)
end
end)
end

defp do_stream!(data, opts) do
{:ok, stream} = StringIO.open(data)

if opts[:line] do
IO.binstream(stream, :line)
else
chunk_size = opts[:chunk_size] || 2048
IO.binstream(stream, chunk_size)
end
end

def copy(store, src, dest) do
Agent.get_and_update(store.name, fn state ->
case Map.fetch(state, src) do
Expand Down
1 change: 1 addition & 0 deletions lib/file_store/adapters/null.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ defmodule FileStore.Adapters.Null do
def download(_store, _key, _destination), do: :ok
def write(_store, _key, _content, _opts \\ []), do: :ok
def read(_store, _key), do: {:ok, ""}
def stream!(_store, _key, _opts \\ []), do: Stream.into([], [])
def copy(_store, _src, _dest), do: :ok
def rename(_store, _src, _dest), do: :ok
def list!(_store, _opts), do: Stream.into([], [])
Expand Down
48 changes: 48 additions & 0 deletions lib/file_store/adapters/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,23 @@ if Code.ensure_loaded?(ExAws.S3) do
end
end

def stream!(store, key, opts) do
aws_opts = Keyword.take(opts, [:chunk_size])

stream =
store.bucket
|> ExAws.S3.download_file(key, :memory, aws_opts)
|> ExAws.stream!()

if opts[:line] do
stream
|> Stream.chunk_while("", &chunk_by_fun/2, &to_line_stream_after_fun/1)
|> Stream.concat()
else
stream
end
end

def upload(store, source, key) do
source
|> ExAws.S3.Upload.stream_file()
Expand Down Expand Up @@ -195,6 +212,37 @@ if Code.ensure_loaded?(ExAws.S3) do
defp to_integer(nil), do: nil
defp to_integer(value) when is_integer(value), do: value
defp to_integer(value) when is_binary(value), do: String.to_integer(value)

defp chunk_by_fun(chunk, acc) do
to_try = acc <> chunk
{elements, acc} = chunk_by_newline(to_try, "\n", [], {0, byte_size(to_try)})
{:cont, elements, acc}
end

defp chunk_by_newline(_string, _newline, elements, {_offset, 0}) do
{Enum.reverse(elements), ""}
end

defp chunk_by_newline(string, newline, elements, {offset, length}) do
case :binary.match(string, newline, scope: {offset, length}) do
{newline_offset, newline_length} ->
difference = newline_length + newline_offset - offset
element = binary_part(string, offset, difference)

chunk_by_newline(
string,
newline,
[element | elements],
{newline_offset + newline_length, length - difference}
)

:nomatch ->
{Enum.reverse(elements), binary_part(string, offset, length)}
end
end

defp to_line_stream_after_fun(""), do: {:cont, []}
defp to_line_stream_after_fun(acc), do: {:cont, [acc], []}
end
end
end
4 changes: 4 additions & 0 deletions lib/file_store/middleware/errors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ defmodule FileStore.Middleware.Errors do
|> wrap(action: "read key", key: key)
end

def stream!(store, key, opts) do
FileStore.stream!(store.__next__, key, opts)
end

def copy(store, src, dest) do
store.__next__
|> FileStore.copy(src, dest)
Expand Down
11 changes: 11 additions & 0 deletions lib/file_store/middleware/logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ defmodule FileStore.Middleware.Logger do
|> log("READ", key: key)
end

def stream!(store, key, opts) do
store.__next__
|> FileStore.stream!(key, opts)
|> log!("STREAM", key: key)
end

def copy(store, src, dest) do
store.__next__
|> FileStore.copy(src, dest)
Expand Down Expand Up @@ -102,6 +108,11 @@ defmodule FileStore.Middleware.Logger do
{:error, error}
end

defp log!(io, msg, meta) do
log(:ok, msg, meta)
io
end

defp format_meta(meta) do
Enum.map(meta, fn {key, value} ->
[Atom.to_string(key), ?\=, inspect(value)]
Expand Down
4 changes: 4 additions & 0 deletions lib/file_store/middleware/prefix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ defmodule FileStore.Middleware.Prefix do
FileStore.read(store.__next__, put_prefix(key, store))
end

def stream!(store, key, opts) do
FileStore.stream!(store.__next__, put_prefix(key, store), opts)
end

def copy(store, src, dest) do
FileStore.copy(store.__next__, put_prefix(src, store), put_prefix(dest, store))
end
Expand Down
46 changes: 46 additions & 0 deletions test/support/adapter_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,52 @@ defmodule FileStore.AdapterCase do
assert {:ok, _} = FileStore.stat(store, "bar")
end
end

describe "stream!/3" do
test "the file exists and is chunked perfectly", %{store: store} do
chunks = Enum.map(~w(a b c d e f g h i j k l m n o), &String.duplicate(&1, 32))
data = Enum.join(chunks, "")

:ok = FileStore.write(store, "foo", data)

assert stream = FileStore.stream!(store, "foo", chunk_size: 32)
assert chunks == Enum.to_list(stream)
end

test "file exists but the chunk has a last chunk that is not full", %{store: store} do
chunks = Enum.map(~w(a b c d e f g h i j k l m n), &String.duplicate(&1, 32))
chunks = chunks ++ ["oooooooo"]
data = Enum.join(chunks, "")

:ok = FileStore.write(store, "foo", data)

assert stream = FileStore.stream!(store, "foo", chunk_size: 32)
assert chunks == Enum.to_list(stream)
end

test "file exists but we want to stream by line", %{store: store} do
lines =
Enum.map(~w(a b c d e f g h i j k l m n o), fn char ->
String.duplicate(char, 32) <> "\n"
end)

data = Enum.join(lines, "")

:ok = FileStore.write(store, "foo", data)

assert stream = FileStore.stream!(store, "foo", line: true)
assert lines == Enum.to_list(stream)
end

test "file does not have a newline in it", %{store: store} do
data = String.duplicate("a", 32)

:ok = FileStore.write(store, "foo", data)

assert stream = FileStore.stream!(store, "foo", line: true)
assert [data] == Enum.to_list(stream)
end
end
end
end

Expand Down
5 changes: 5 additions & 0 deletions test/support/error_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ defmodule FileStore.Adapters.Error do
defimpl FileStore do
def write(_store, _key, _content, _opts \\ []), do: {:error, :boom}
def read(_store, _key), do: {:error, :boom}

def stream!(_store, key, _opts \\ []) do
raise FileStore.Error, reason: "Does not work", key: key, action: "stream"
end

def upload(_store, _source, _key), do: {:error, :boom}
def download(_store, _key, _destination), do: {:error, :boom}
def stat(_store, _key), do: {:error, :boom}
Expand Down