From 7cb2259506fa21ce221a040cdd5ba95eac0f8933 Mon Sep 17 00:00:00 2001 From: Zander Erasmus Date: Sun, 21 Aug 2022 12:04:29 -0700 Subject: [PATCH 01/13] Add encoder for object container files Only adding the raising functions for now The implementation is designed so that people can pass different implementations of the codecs as required. Some people may want to use pure beam functions, some may want to use NIFs. --- lib/avro_ex/object_container.ex | 75 +++++++++++++++++++++++++++ lib/avro_ex/object_container/codec.ex | 49 +++++++++++++++++ 2 files changed, 124 insertions(+) create mode 100644 lib/avro_ex/object_container.ex create mode 100644 lib/avro_ex/object_container/codec.ex diff --git a/lib/avro_ex/object_container.ex b/lib/avro_ex/object_container.ex new file mode 100644 index 0000000..ca0ad2a --- /dev/null +++ b/lib/avro_ex/object_container.ex @@ -0,0 +1,75 @@ +defmodule AvroEx.ObjectContainer do + use TypedStruct + + alias AvroEx.{Schema} + + @type codec_types :: :null | :deflate | :bzip2 | :snappy | :xz | :zstandard + + typedstruct do + field :schema, Schema.t() + field :codec, codec_types(), default: :null + field :meta, map(), default: %{} + field :sync, <<_::128>> + end + + @magic <<"Obj", 1>> + @bh_schema AvroEx.decode_schema!("long") + @fh_schema AvroEx.decode_schema!(~S""" + {"type": "record", "name": "org.apache.avro.file.Header", + "fields" : [ + {"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}}, + {"name": "meta", "type": {"type": "map", "values": "bytes"}}, + {"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}} + ] + } + """) + + def new(schema, opts \\ []) do + %__MODULE__{ + schema: schema, + codec: Keyword.get(opts, :codec, :null), + meta: Keyword.get(opts, :meta, %{}), + sync: :rand.bytes(16) + } + end + + def encode_file_header!(ocf = %__MODULE__{}) do + metadata = + %{ + "avro.schema" => AvroEx.encode_schema(ocf.schema), + "avro.codec" => to_string(ocf.codec) + } + |> Map.merge(ocf.meta) + + AvroEx.encode!(@fh_schema, %{ + magic: @magic, + meta: metadata, + sync: ocf.sync + }) + end + + @spec encode_block_header!(pos_integer(), pos_integer()) :: binary() + def encode_block_header!(num_objects, encoded_data_size) do + AvroEx.encode!(@bh_schema, num_objects) <> AvroEx.encode!(@bh_schema, encoded_data_size) + end + + def encode_block_footer!(ocf = %__MODULE__{}), do: ocf.sync + + def encode_block_objects!(ocf = %__MODULE__{}, objects) do + codec = AvroEx.ObjectContainer.Codec.get_codec!(ocf.codec) + + for obj <- objects, reduce: <<>> do + acc -> acc <> AvroEx.encode!(ocf.schema, obj) + end + |> codec.encode!() + end + + def encode_block!(ocf = %__MODULE__{}, objects) do + data = encode_block_objects!(ocf, objects) + encode_block_header!(length(objects), byte_size(data)) <> data <> encode_block_footer!(ocf) + end + + def encode_file!(ocf = %__MODULE__{}, objects) do + encode_file_header!(ocf) <> encode_block!(ocf, objects) + end +end diff --git a/lib/avro_ex/object_container/codec.ex b/lib/avro_ex/object_container/codec.ex new file mode 100644 index 0000000..c89d0b3 --- /dev/null +++ b/lib/avro_ex/object_container/codec.ex @@ -0,0 +1,49 @@ +defmodule AvroEx.ObjectContainer.Codec do + @callback encode!(binary()) :: binary() + @callback decode!(binary()) :: binary() + + defp get_codec_config(codec, dflt), do: Application.get_env(:avro_ex, codec, dflt) + @spec get_codec!(atom) :: __MODULE__.Null + def get_codec!(:null), do: get_codec_config(:null, AvroEx.ObjectContainer.Codec.Null) + def get_codec!(:deflate), do: get_codec_config(:deflate, AvroEx.ObjectContainer.Codec.Deflate) + def get_codec!(:snappy), do: get_codec_config(:snappy, AvroEx.ObjectContainer.Codec.Snappy) + def get_codec!(codec), do: Application.fetch_env!(:avro_ex, codec) +end + +defmodule AvroEx.ObjectContainer.Codec.Null do + @behaviour AvroEx.ObjectContainer.Codec + @impl AvroEx.ObjectContainer.Codec + def encode!(data), do: data + @impl AvroEx.ObjectContainer.Codec + def decode!(data), do: data +end + +defmodule AvroEx.ObjectContainer.Codec.Deflate do + @behaviour AvroEx.ObjectContainer.Codec + @impl AvroEx.ObjectContainer.Codec + def encode!(data), do: :zlib.zip(data) + @impl AvroEx.ObjectContainer.Codec + def decode!(data), do: :zlib.unzip(data) +end + +defmodule AvroEx.ObjectContainer.Codec.Snappy do + @behaviour AvroEx.ObjectContainer.Codec + @impl AvroEx.ObjectContainer.Codec + def encode!(data) do + {:ok, compressed} = :snappyer.compress(data) + <> + end + + @impl AvroEx.ObjectContainer.Codec + def decode!(data) do + len = byte_size(data) - 4 + <> = data + {:ok, decompressed} = :snappyer.decompress(compressed) + + if crc == :erlang.crc32(decompressed) do + decompressed + else + raise %AvroEx.DecodeError{message: "CRC mismatch during decompression"} + end + end +end From 426e44fda922b3b9d973e9802be594bb7438c190 Mon Sep 17 00:00:00 2001 From: Zander Erasmus Date: Sun, 21 Aug 2022 16:27:29 -0700 Subject: [PATCH 02/13] Add file header encoding tests Improve how block headers are encoded to use an avro record This will help during decoding, since longs are variable length --- lib/avro_ex/object_container.ex | 32 ++++++---- test/object_container_encode_test.exs | 92 +++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 11 deletions(-) create mode 100644 test/object_container_encode_test.exs diff --git a/lib/avro_ex/object_container.ex b/lib/avro_ex/object_container.ex index ca0ad2a..00c66b7 100644 --- a/lib/avro_ex/object_container.ex +++ b/lib/avro_ex/object_container.ex @@ -13,16 +13,25 @@ defmodule AvroEx.ObjectContainer do end @magic <<"Obj", 1>> - @bh_schema AvroEx.decode_schema!("long") - @fh_schema AvroEx.decode_schema!(~S""" - {"type": "record", "name": "org.apache.avro.file.Header", - "fields" : [ - {"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}}, - {"name": "meta", "type": {"type": "map", "values": "bytes"}}, - {"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}} - ] - } - """) + @bh_schema AvroEx.decode_schema!(~S({ + "type":"record","name":"block_header", + "fields":[ + {"name":"num_objects","type":"long"}, + {"name":"num_bytes","type":"long"} + ] + })) + @fh_schema AvroEx.decode_schema!(~S({ + "type": "record", "name": "org.apache.avro.file.Header", + "fields" : [ + {"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}}, + {"name": "meta", "type": {"type": "map", "values": "bytes"}}, + {"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}} + ] + })) + + def magic(), do: @magic + def block_header_schema(), do: @bh_schema + def file_header_schema(), do: @fh_schema def new(schema, opts \\ []) do %__MODULE__{ @@ -50,7 +59,8 @@ defmodule AvroEx.ObjectContainer do @spec encode_block_header!(pos_integer(), pos_integer()) :: binary() def encode_block_header!(num_objects, encoded_data_size) do - AvroEx.encode!(@bh_schema, num_objects) <> AvroEx.encode!(@bh_schema, encoded_data_size) + header = %{"num_objects" => num_objects, "num_bytes" => encoded_data_size} + AvroEx.encode!(@bh_schema, header) end def encode_block_footer!(ocf = %__MODULE__{}), do: ocf.sync diff --git a/test/object_container_encode_test.exs b/test/object_container_encode_test.exs new file mode 100644 index 0000000..f7aca7e --- /dev/null +++ b/test/object_container_encode_test.exs @@ -0,0 +1,92 @@ +defmodule AvroEx.ObjectContainer.Encode.Test do + use ExUnit.Case, async: true + + @test_module AvroEx.ObjectContainer + + describe "encode file header" do + test "new containers have different sync bytes" do + containers = + for _ <- 1..10 do + @test_module.new(nil) + end + + for container <- containers do + others = containers |> List.delete(container) + + for other <- others do + refute container.sync == other.sync + end + end + end + + # TODO: use multiple schemas instead of just "null" + test "codec embedded in header" do + codecs = [:null, :deflate, :bzip2, :snappy, :xz, :zstandard] + + containers = + for codec <- codecs do + @test_module.new(AvroEx.decode_schema!(~S("null")), codec: codec) + end + + headers = + for container <- containers do + headerdata = @test_module.encode_file_header!(container) + AvroEx.decode!(@test_module.file_header_schema, headerdata) + end + + for {header, codec} <- Enum.zip(headers, codecs) do + assert header["meta"]["avro.codec"] == to_string(codec) + end + end + + test "default codec is null" do + container = @test_module.new(AvroEx.decode_schema!(~S("null"))) + headerdata = @test_module.encode_file_header!(container) + header = AvroEx.decode!(@test_module.file_header_schema, headerdata) + assert header["meta"]["avro.codec"] == "null" + end + + test "schema is stored in the file header metadata" do + container = @test_module.new(AvroEx.decode_schema!(~S("null"))) + headerdata = @test_module.encode_file_header!(container) + header = AvroEx.decode!(@test_module.file_header_schema, headerdata) + assert header["meta"]["avro.schema"] == "{\"type\":\"null\"}" + end + + test "user metadata is stored in the file header metadata" do + container = @test_module.new(AvroEx.decode_schema!(~S("null")), meta: %{first_time: "12345678"}) + headerdata = @test_module.encode_file_header!(container) + header = AvroEx.decode!(@test_module.file_header_schema, headerdata) + assert header["meta"]["first_time"] == "12345678" + end + + test "user metadata does not prevent schema and codec from being written preoperly" do + container = @test_module.new(AvroEx.decode_schema!(~S("null")), meta: %{first_time: "12345678"}) + headerdata = @test_module.encode_file_header!(container) + header = AvroEx.decode!(@test_module.file_header_schema, headerdata) + assert header["meta"]["avro.codec"] == "null" + assert header["meta"]["avro.schema"] == "{\"type\":\"null\"}" + end + + test "magic matches standard" do + container = @test_module.new(AvroEx.decode_schema!(~S("null"))) + headerdata = @test_module.encode_file_header!(container) + header = AvroEx.decode!(@test_module.file_header_schema, headerdata) + assert header["magic"] == <<"Obj", 1>> + end + end + + test "encode block header" do + # TODO: property based test makes more sense + encoded_header = @test_module.encode_block_header!(100, 5000) + header = AvroEx.decode!(@test_module.block_header_schema, encoded_header) + assert header["num_objects"] == 100 + assert header["num_bytes"] == 5000 + end + + describe "encode block objects" do + end + + describe "encode file" do + end +end From efb6c76c93466ba047ce1b38ae6a3c10c5567bee Mon Sep 17 00:00:00 2001 From: Zander Erasmus Date: Sun, 21 Aug 2022 16:27:58 -0700 Subject: [PATCH 03/13] Split each codec into its own file --- lib/avro_ex/object_container/codec.ex | 38 ------------------- lib/avro_ex/object_container/codec/deflate.ex | 7 ++++ lib/avro_ex/object_container/codec/null.ex | 7 ++++ lib/avro_ex/object_container/codec/snappy.ex | 21 ++++++++++ 4 files changed, 35 insertions(+), 38 deletions(-) create mode 100644 lib/avro_ex/object_container/codec/deflate.ex create mode 100644 lib/avro_ex/object_container/codec/null.ex create mode 100644 lib/avro_ex/object_container/codec/snappy.ex diff --git a/lib/avro_ex/object_container/codec.ex b/lib/avro_ex/object_container/codec.ex index c89d0b3..986c31a 100644 --- a/lib/avro_ex/object_container/codec.ex +++ b/lib/avro_ex/object_container/codec.ex @@ -9,41 +9,3 @@ defmodule AvroEx.ObjectContainer.Codec do def get_codec!(:snappy), do: get_codec_config(:snappy, AvroEx.ObjectContainer.Codec.Snappy) def get_codec!(codec), do: Application.fetch_env!(:avro_ex, codec) end - -defmodule AvroEx.ObjectContainer.Codec.Null do - @behaviour AvroEx.ObjectContainer.Codec - @impl AvroEx.ObjectContainer.Codec - def encode!(data), do: data - @impl AvroEx.ObjectContainer.Codec - def decode!(data), do: data -end - -defmodule AvroEx.ObjectContainer.Codec.Deflate do - @behaviour AvroEx.ObjectContainer.Codec - @impl AvroEx.ObjectContainer.Codec - def encode!(data), do: :zlib.zip(data) - @impl AvroEx.ObjectContainer.Codec - def decode!(data), do: :zlib.unzip(data) -end - -defmodule AvroEx.ObjectContainer.Codec.Snappy do - @behaviour AvroEx.ObjectContainer.Codec - @impl AvroEx.ObjectContainer.Codec - def encode!(data) do - {:ok, compressed} = :snappyer.compress(data) - <> - end - - @impl AvroEx.ObjectContainer.Codec - def decode!(data) do - len = byte_size(data) - 4 - <> = data - {:ok, decompressed} = :snappyer.decompress(compressed) - - if crc == :erlang.crc32(decompressed) do - decompressed - else - raise %AvroEx.DecodeError{message: "CRC mismatch during decompression"} - end - end -end diff --git a/lib/avro_ex/object_container/codec/deflate.ex b/lib/avro_ex/object_container/codec/deflate.ex new file mode 100644 index 0000000..44e58e1 --- /dev/null +++ b/lib/avro_ex/object_container/codec/deflate.ex @@ -0,0 +1,7 @@ +defmodule AvroEx.ObjectContainer.Codec.Deflate do + @behaviour AvroEx.ObjectContainer.Codec + @impl AvroEx.ObjectContainer.Codec + def encode!(data), do: :zlib.zip(data) + @impl AvroEx.ObjectContainer.Codec + def decode!(data), do: :zlib.unzip(data) +end diff --git a/lib/avro_ex/object_container/codec/null.ex b/lib/avro_ex/object_container/codec/null.ex new file mode 100644 index 0000000..477058e --- /dev/null +++ b/lib/avro_ex/object_container/codec/null.ex @@ -0,0 +1,7 @@ +defmodule AvroEx.ObjectContainer.Codec.Null do + @behaviour AvroEx.ObjectContainer.Codec + @impl AvroEx.ObjectContainer.Codec + def encode!(data), do: data + @impl AvroEx.ObjectContainer.Codec + def decode!(data), do: data +end diff --git a/lib/avro_ex/object_container/codec/snappy.ex b/lib/avro_ex/object_container/codec/snappy.ex new file mode 100644 index 0000000..632cfda --- /dev/null +++ b/lib/avro_ex/object_container/codec/snappy.ex @@ -0,0 +1,21 @@ +defmodule AvroEx.ObjectContainer.Codec.Snappy do + @behaviour AvroEx.ObjectContainer.Codec + @impl AvroEx.ObjectContainer.Codec + def encode!(data) do + {:ok, compressed} = :snappyer.compress(data) + <> + end + + @impl AvroEx.ObjectContainer.Codec + def decode!(data) do + len = byte_size(data) - 4 + <> = data + {:ok, decompressed} = :snappyer.decompress(compressed) + + if crc == :erlang.crc32(decompressed) do + decompressed + else + raise %AvroEx.DecodeError{message: "CRC mismatch during decompression"} + end + end +end From d012a83fcb4d636513c1565a7b1da4652a46a5fe Mon Sep 17 00:00:00 2001 From: Zander Erasmus Date: Mon, 22 Aug 2022 10:16:47 -0700 Subject: [PATCH 04/13] Use alias instead of property --- test/object_container_encode_test.exs | 44 +++++++++++++-------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/test/object_container_encode_test.exs b/test/object_container_encode_test.exs index f7aca7e..3e3822a 100644 --- a/test/object_container_encode_test.exs +++ b/test/object_container_encode_test.exs @@ -1,13 +1,13 @@ defmodule AvroEx.ObjectContainer.Encode.Test do use ExUnit.Case, async: true - @test_module AvroEx.ObjectContainer + alias AvroEx.ObjectContainer describe "encode file header" do test "new containers have different sync bytes" do containers = for _ <- 1..10 do - @test_module.new(nil) + ObjectContainer.new(nil) end for container <- containers do @@ -25,13 +25,13 @@ defmodule AvroEx.ObjectContainer.Encode.Test do containers = for codec <- codecs do - @test_module.new(AvroEx.decode_schema!(~S("null")), codec: codec) + ObjectContainer.new(AvroEx.decode_schema!(~S("null")), codec: codec) end headers = for container <- containers do - headerdata = @test_module.encode_file_header!(container) - AvroEx.decode!(@test_module.file_header_schema, headerdata) + headerdata = ObjectContainer.encode_file_header!(container) + AvroEx.decode!(ObjectContainer.file_header_schema, headerdata) end for {header, codec} <- Enum.zip(headers, codecs) do @@ -40,46 +40,46 @@ defmodule AvroEx.ObjectContainer.Encode.Test do end test "default codec is null" do - container = @test_module.new(AvroEx.decode_schema!(~S("null"))) - headerdata = @test_module.encode_file_header!(container) - header = AvroEx.decode!(@test_module.file_header_schema, headerdata) + container = ObjectContainer.new(AvroEx.decode_schema!(~S("null"))) + headerdata = ObjectContainer.encode_file_header!(container) + header = AvroEx.decode!(ObjectContainer.file_header_schema, headerdata) assert header["meta"]["avro.codec"] == "null" end test "schema is stored in the file header metadata" do - container = @test_module.new(AvroEx.decode_schema!(~S("null"))) - headerdata = @test_module.encode_file_header!(container) - header = AvroEx.decode!(@test_module.file_header_schema, headerdata) + container = ObjectContainer.new(AvroEx.decode_schema!(~S("null"))) + headerdata = ObjectContainer.encode_file_header!(container) + header = AvroEx.decode!(ObjectContainer.file_header_schema, headerdata) assert header["meta"]["avro.schema"] == "{\"type\":\"null\"}" end test "user metadata is stored in the file header metadata" do - container = @test_module.new(AvroEx.decode_schema!(~S("null")), meta: %{first_time: "12345678"}) - headerdata = @test_module.encode_file_header!(container) - header = AvroEx.decode!(@test_module.file_header_schema, headerdata) + container = ObjectContainer.new(AvroEx.decode_schema!(~S("null")), meta: %{first_time: "12345678"}) + headerdata = ObjectContainer.encode_file_header!(container) + header = AvroEx.decode!(ObjectContainer.file_header_schema, headerdata) assert header["meta"]["first_time"] == "12345678" end test "user metadata does not prevent schema and codec from being written preoperly" do - container = @test_module.new(AvroEx.decode_schema!(~S("null")), meta: %{first_time: "12345678"}) - headerdata = @test_module.encode_file_header!(container) - header = AvroEx.decode!(@test_module.file_header_schema, headerdata) + container = ObjectContainer.new(AvroEx.decode_schema!(~S("null")), meta: %{first_time: "12345678"}) + headerdata = ObjectContainer.encode_file_header!(container) + header = AvroEx.decode!(ObjectContainer.file_header_schema, headerdata) assert header["meta"]["avro.codec"] == "null" assert header["meta"]["avro.schema"] == "{\"type\":\"null\"}" end test "magic matches standard" do - container = @test_module.new(AvroEx.decode_schema!(~S("null"))) - headerdata = @test_module.encode_file_header!(container) - header = AvroEx.decode!(@test_module.file_header_schema, headerdata) + container = ObjectContainer.new(AvroEx.decode_schema!(~S("null"))) + headerdata = ObjectContainer.encode_file_header!(container) + header = AvroEx.decode!(ObjectContainer.file_header_schema, headerdata) assert header["magic"] == <<"Obj", 1>> end end test "encode block header" do # TODO: property based test makes more sense - encoded_header = @test_module.encode_block_header!(100, 5000) - header = AvroEx.decode!(@test_module.block_header_schema, encoded_header) + encoded_header = ObjectContainer.encode_block_header!(100, 5000) + header = AvroEx.decode!(ObjectContainer.block_header_schema, encoded_header) assert header["num_objects"] == 100 assert header["num_bytes"] == 5000 end From dd981a4aaa6f42514d10688cc1a06896c7398a04 Mon Sep 17 00:00:00 2001 From: Zander Erasmus Date: Mon, 22 Aug 2022 10:27:22 -0700 Subject: [PATCH 05/13] Swap __MODULE__ position in argument pattern Matches the rest of the library --- lib/avro_ex/object_container.ex | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/avro_ex/object_container.ex b/lib/avro_ex/object_container.ex index 00c66b7..d3a2add 100644 --- a/lib/avro_ex/object_container.ex +++ b/lib/avro_ex/object_container.ex @@ -42,7 +42,7 @@ defmodule AvroEx.ObjectContainer do } end - def encode_file_header!(ocf = %__MODULE__{}) do + def encode_file_header!(%__MODULE__{} = ocf) do metadata = %{ "avro.schema" => AvroEx.encode_schema(ocf.schema), @@ -63,9 +63,9 @@ defmodule AvroEx.ObjectContainer do AvroEx.encode!(@bh_schema, header) end - def encode_block_footer!(ocf = %__MODULE__{}), do: ocf.sync + def encode_block_footer!(%__MODULE__{sync: sync}), do: sync - def encode_block_objects!(ocf = %__MODULE__{}, objects) do + def encode_block_objects!(%__MODULE__{} = ocf, objects) do codec = AvroEx.ObjectContainer.Codec.get_codec!(ocf.codec) for obj <- objects, reduce: <<>> do @@ -74,12 +74,12 @@ defmodule AvroEx.ObjectContainer do |> codec.encode!() end - def encode_block!(ocf = %__MODULE__{}, objects) do + def encode_block!(%__MODULE__{} = ocf, objects) do data = encode_block_objects!(ocf, objects) encode_block_header!(length(objects), byte_size(data)) <> data <> encode_block_footer!(ocf) end - def encode_file!(ocf = %__MODULE__{}, objects) do + def encode_file!(%__MODULE__{} = ocf, objects) do encode_file_header!(ocf) <> encode_block!(ocf, objects) end end From 22ee254baf246c60590ecc8e14f46c48bf8cb441 Mon Sep 17 00:00:00 2001 From: Zander Erasmus Date: Thu, 25 Aug 2022 07:48:38 -0700 Subject: [PATCH 06/13] Use elixir maps instead JSON strings for static schemas --- lib/avro_ex/object_container.ex | 34 +++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/lib/avro_ex/object_container.ex b/lib/avro_ex/object_container.ex index d3a2add..3b6ffa2 100644 --- a/lib/avro_ex/object_container.ex +++ b/lib/avro_ex/object_container.ex @@ -13,23 +13,25 @@ defmodule AvroEx.ObjectContainer do end @magic <<"Obj", 1>> - @bh_schema AvroEx.decode_schema!(~S({ - "type":"record","name":"block_header", - "fields":[ - {"name":"num_objects","type":"long"}, - {"name":"num_bytes","type":"long"} - ] - })) - @fh_schema AvroEx.decode_schema!(~S({ - "type": "record", "name": "org.apache.avro.file.Header", - "fields" : [ - {"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}}, - {"name": "meta", "type": {"type": "map", "values": "bytes"}}, - {"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}} - ] - })) + @bh_schema AvroEx.decode_schema!(%{ + "type" => "record", + "name" => "block_header", + "fields" => [ + %{"name" => "num_objects", "type" => "long"}, + %{"name" => "num_bytes", "type" => "long"} + ] + }) + + @fh_schema AvroEx.decode_schema!(%{ + "type" => "record", + "name" => "org.apache.avro.file.Header", + "fields" => [ + %{"name" => "magic", "type" => %{"type" => "fixed", "name" => "Magic", "size" => 4}}, + %{"name" => "meta", "type" => %{"type" => "map", "values" => "bytes"}}, + %{"name" => "sync", "type" => %{"type" => "fixed", "name" => "Sync", "size" => 16}} + ] + }) - def magic(), do: @magic def block_header_schema(), do: @bh_schema def file_header_schema(), do: @fh_schema From 12c4514eb9ba49f26f2333d5d996595aa0ad833b Mon Sep 17 00:00:00 2001 From: Zander Erasmus Date: Thu, 25 Aug 2022 08:31:51 -0700 Subject: [PATCH 07/13] Pass in codec into encoder instead of using env --- lib/avro_ex/object_container.ex | 10 ++++------ lib/avro_ex/object_container/codec.ex | 8 +------- lib/avro_ex/object_container/codec/deflate.ex | 2 ++ lib/avro_ex/object_container/codec/null.ex | 2 ++ lib/avro_ex/object_container/codec/snappy.ex | 2 ++ test/object_container_encode_test.exs | 5 +++-- 6 files changed, 14 insertions(+), 15 deletions(-) diff --git a/lib/avro_ex/object_container.ex b/lib/avro_ex/object_container.ex index 3b6ffa2..b792f0f 100644 --- a/lib/avro_ex/object_container.ex +++ b/lib/avro_ex/object_container.ex @@ -7,7 +7,7 @@ defmodule AvroEx.ObjectContainer do typedstruct do field :schema, Schema.t() - field :codec, codec_types(), default: :null + field :codec, AvroEx.ObjectContainer.Codec, default: AvroEx.ObjectContainer.Codec.Null field :meta, map(), default: %{} field :sync, <<_::128>> end @@ -38,7 +38,7 @@ defmodule AvroEx.ObjectContainer do def new(schema, opts \\ []) do %__MODULE__{ schema: schema, - codec: Keyword.get(opts, :codec, :null), + codec: Keyword.get(opts, :codec, AvroEx.ObjectContainer.Codec.Null), meta: Keyword.get(opts, :meta, %{}), sync: :rand.bytes(16) } @@ -48,7 +48,7 @@ defmodule AvroEx.ObjectContainer do metadata = %{ "avro.schema" => AvroEx.encode_schema(ocf.schema), - "avro.codec" => to_string(ocf.codec) + "avro.codec" => to_string(ocf.codec.name()) } |> Map.merge(ocf.meta) @@ -68,12 +68,10 @@ defmodule AvroEx.ObjectContainer do def encode_block_footer!(%__MODULE__{sync: sync}), do: sync def encode_block_objects!(%__MODULE__{} = ocf, objects) do - codec = AvroEx.ObjectContainer.Codec.get_codec!(ocf.codec) - for obj <- objects, reduce: <<>> do acc -> acc <> AvroEx.encode!(ocf.schema, obj) end - |> codec.encode!() + |> ocf.codec.encode!() end def encode_block!(%__MODULE__{} = ocf, objects) do diff --git a/lib/avro_ex/object_container/codec.ex b/lib/avro_ex/object_container/codec.ex index 986c31a..41bd208 100644 --- a/lib/avro_ex/object_container/codec.ex +++ b/lib/avro_ex/object_container/codec.ex @@ -1,11 +1,5 @@ defmodule AvroEx.ObjectContainer.Codec do @callback encode!(binary()) :: binary() @callback decode!(binary()) :: binary() - - defp get_codec_config(codec, dflt), do: Application.get_env(:avro_ex, codec, dflt) - @spec get_codec!(atom) :: __MODULE__.Null - def get_codec!(:null), do: get_codec_config(:null, AvroEx.ObjectContainer.Codec.Null) - def get_codec!(:deflate), do: get_codec_config(:deflate, AvroEx.ObjectContainer.Codec.Deflate) - def get_codec!(:snappy), do: get_codec_config(:snappy, AvroEx.ObjectContainer.Codec.Snappy) - def get_codec!(codec), do: Application.fetch_env!(:avro_ex, codec) + @callback name() :: atom() end diff --git a/lib/avro_ex/object_container/codec/deflate.ex b/lib/avro_ex/object_container/codec/deflate.ex index 44e58e1..e890c80 100644 --- a/lib/avro_ex/object_container/codec/deflate.ex +++ b/lib/avro_ex/object_container/codec/deflate.ex @@ -1,6 +1,8 @@ defmodule AvroEx.ObjectContainer.Codec.Deflate do @behaviour AvroEx.ObjectContainer.Codec @impl AvroEx.ObjectContainer.Codec + def name(), do: :deflate + @impl AvroEx.ObjectContainer.Codec def encode!(data), do: :zlib.zip(data) @impl AvroEx.ObjectContainer.Codec def decode!(data), do: :zlib.unzip(data) diff --git a/lib/avro_ex/object_container/codec/null.ex b/lib/avro_ex/object_container/codec/null.ex index 477058e..144e412 100644 --- a/lib/avro_ex/object_container/codec/null.ex +++ b/lib/avro_ex/object_container/codec/null.ex @@ -1,6 +1,8 @@ defmodule AvroEx.ObjectContainer.Codec.Null do @behaviour AvroEx.ObjectContainer.Codec @impl AvroEx.ObjectContainer.Codec + def name(), do: :null + @impl AvroEx.ObjectContainer.Codec def encode!(data), do: data @impl AvroEx.ObjectContainer.Codec def decode!(data), do: data diff --git a/lib/avro_ex/object_container/codec/snappy.ex b/lib/avro_ex/object_container/codec/snappy.ex index 632cfda..ab70fb8 100644 --- a/lib/avro_ex/object_container/codec/snappy.ex +++ b/lib/avro_ex/object_container/codec/snappy.ex @@ -1,6 +1,8 @@ defmodule AvroEx.ObjectContainer.Codec.Snappy do @behaviour AvroEx.ObjectContainer.Codec @impl AvroEx.ObjectContainer.Codec + def name(), do: :snappy + @impl AvroEx.ObjectContainer.Codec def encode!(data) do {:ok, compressed} = :snappyer.compress(data) <> diff --git a/test/object_container_encode_test.exs b/test/object_container_encode_test.exs index 3e3822a..1948189 100644 --- a/test/object_container_encode_test.exs +++ b/test/object_container_encode_test.exs @@ -2,6 +2,7 @@ defmodule AvroEx.ObjectContainer.Encode.Test do use ExUnit.Case, async: true alias AvroEx.ObjectContainer + alias AvroEx.ObjectContainer.Codec describe "encode file header" do test "new containers have different sync bytes" do @@ -21,7 +22,7 @@ defmodule AvroEx.ObjectContainer.Encode.Test do # TODO: use multiple schemas instead of just "null" test "codec embedded in header" do - codecs = [:null, :deflate, :bzip2, :snappy, :xz, :zstandard] + codecs = [Codec.Null, Codec.Deflate, Codec.Snappy] containers = for codec <- codecs do @@ -35,7 +36,7 @@ defmodule AvroEx.ObjectContainer.Encode.Test do end for {header, codec} <- Enum.zip(headers, codecs) do - assert header["meta"]["avro.codec"] == to_string(codec) + assert header["meta"]["avro.codec"] == to_string(codec.name()) end end From 186ef445fc077dc502d9d6d70686b0382275248d Mon Sep 17 00:00:00 2001 From: Zander Erasmus Date: Tue, 30 Aug 2022 11:51:29 -0700 Subject: [PATCH 08/13] Add initial file header decode function --- lib/avro_ex/object_container.ex | 20 ++++++++++++++++++++ lib/avro_ex/object_container/codec.ex | 21 +++++++++++++++++++++ test/object_container_encode_test.exs | 20 ++++++++++++++++++++ 3 files changed, 61 insertions(+) diff --git a/lib/avro_ex/object_container.ex b/lib/avro_ex/object_container.ex index b792f0f..6d86e79 100644 --- a/lib/avro_ex/object_container.ex +++ b/lib/avro_ex/object_container.ex @@ -82,4 +82,24 @@ defmodule AvroEx.ObjectContainer do def encode_file!(%__MODULE__{} = ocf, objects) do encode_file_header!(ocf) <> encode_block!(ocf, objects) end + + @spec decode_file_header(<<_::32, _::_*8>>, keyword()) :: {:ok, AvroEx.ObjectContainer.t(), binary()} | {:error, AvroEx.DecodeError.t()} + def decode_file_header(file_header, opts \\ []) + def decode_file_header(file_header, opts) do + user_codecs = Keyword.get(opts, :codecs, []) + + with {:ok, decoded_header, rest} <- AvroEx.Decode.decode(@fh_schema, file_header), + %{"avro.schema" => schema, "avro.codec" => codec} <- decoded_header["meta"], + {:ok, decoded_schema} <- AvroEx.decode_schema(schema), + {:ok, codec_impl} <- __MODULE__.Codec.get_codec_implementation(codec, user_codecs) + do + meta = Map.drop(decoded_header["meta"], ["avro.schema", "avro.codec"]) + {:ok, %__MODULE__{ + schema: decoded_schema, + codec: codec_impl, + meta: meta, + sync: decoded_header["sync"], + }, rest} + end + end end diff --git a/lib/avro_ex/object_container/codec.ex b/lib/avro_ex/object_container/codec.ex index 41bd208..8ec24c1 100644 --- a/lib/avro_ex/object_container/codec.ex +++ b/lib/avro_ex/object_container/codec.ex @@ -2,4 +2,25 @@ defmodule AvroEx.ObjectContainer.Codec do @callback encode!(binary()) :: binary() @callback decode!(binary()) :: binary() @callback name() :: atom() + + def mandatory_codecs do + [null: __MODULE__.Null, deflate: __MODULE__.Deflate] + end + + def get_codec_implementation(codec, user_codecs \\ []) + def get_codec_implementation(codec, user_codecs) when is_binary(codec), + do: get_codec_implementation(String.to_atom(codec), user_codecs) + + def get_codec_implementation(codec, user_codecs) when is_atom(codec) do + impl = + mandatory_codecs() + |> Keyword.merge(user_codecs) + |> Keyword.get(codec) + + if impl do + {:ok, impl} + else + {:error, %AvroEx.DecodeError{message: "Codec implimentation not found"}} + end + end end diff --git a/test/object_container_encode_test.exs b/test/object_container_encode_test.exs index 1948189..855c42b 100644 --- a/test/object_container_encode_test.exs +++ b/test/object_container_encode_test.exs @@ -90,4 +90,24 @@ defmodule AvroEx.ObjectContainer.Encode.Test do describe "encode file" do end + + describe "decode file header" do + @example_file_header AvroEx.encode!(ObjectContainer.file_header_schema, %{ + "magic" => <<"Obj", 1>>, + "meta" => %{ + "avro.schema" => "{\"type\":\"null\"}", + "avro.codec" => "null", + "custom_meta" => "custom_value" + }, + "sync" => <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16>> + }) + + test "full valid file header with optional metas" do + {:ok, header, <<>>} = AvroEx.ObjectContainer.decode_file_header(@example_file_header) + assert header.schema == AvroEx.decode_schema!(nil) + assert header.codec == ObjectContainer.Codec.Null + assert header.meta == %{"custom_meta" => "custom_value"} + assert header.sync == <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16>> + end + end end From 99a97f2753c9e99a7d88a60d0732e367977c230e Mon Sep 17 00:00:00 2001 From: Zander Erasmus Date: Tue, 30 Aug 2022 13:34:26 -0700 Subject: [PATCH 09/13] Improve error handling and responses Handle case with missing codec Add tests for parts of the file header decoding required by the spec --- lib/avro_ex/object_container.ex | 48 ++++++++++----- test/object_container_encode_test.exs | 84 +++++++++++++++++++++------ 2 files changed, 100 insertions(+), 32 deletions(-) diff --git a/lib/avro_ex/object_container.ex b/lib/avro_ex/object_container.ex index 6d86e79..84f8201 100644 --- a/lib/avro_ex/object_container.ex +++ b/lib/avro_ex/object_container.ex @@ -83,23 +83,43 @@ defmodule AvroEx.ObjectContainer do encode_file_header!(ocf) <> encode_block!(ocf, objects) end - @spec decode_file_header(<<_::32, _::_*8>>, keyword()) :: {:ok, AvroEx.ObjectContainer.t(), binary()} | {:error, AvroEx.DecodeError.t()} - def decode_file_header(file_header, opts \\ []) - def decode_file_header(file_header, opts) do + defp check_magic(<<"Obj", 1, _::binary>>), do: :ok + defp check_magic(_), do: {:error, %AvroEx.DecodeError{message: "Invalid file header"}} + + defp decode_with_rest(schema, message, opts \\ []) do + try do + AvroEx.Decode.decode(schema, message, opts) + rescue + e in MatchError -> {:error, e} + end + end + + + defp get_schema(%{"avro.schema" => schema}), do: {:ok, schema} + defp get_schema(_), do: {:error, %AvroEx.DecodeError{message: "Invalid or missing schema in file header"}} + defp get_codec(%{"avro.codec" => codec}), do: {:ok, codec} + defp get_codec(_), do: {:ok, :null} + + @spec decode_file_header(binary(), keyword()) :: + {:ok, AvroEx.ObjectContainer.t(), binary()} | {:error, AvroEx.DecodeError.t()} + def decode_file_header(file_header, opts \\ []) do user_codecs = Keyword.get(opts, :codecs, []) - with {:ok, decoded_header, rest} <- AvroEx.Decode.decode(@fh_schema, file_header), - %{"avro.schema" => schema, "avro.codec" => codec} <- decoded_header["meta"], - {:ok, decoded_schema} <- AvroEx.decode_schema(schema), - {:ok, codec_impl} <- __MODULE__.Codec.get_codec_implementation(codec, user_codecs) - do + with :ok <- check_magic(file_header), + {:ok, decoded_header, rest} <- decode_with_rest(@fh_schema, file_header), + {:ok, schema} <- get_schema(decoded_header["meta"]), + {:ok, codec} <- get_codec(decoded_header["meta"]), + {:ok, decoded_schema} <- AvroEx.decode_schema(schema), + {:ok, codec_impl} <- __MODULE__.Codec.get_codec_implementation(codec, user_codecs) do meta = Map.drop(decoded_header["meta"], ["avro.schema", "avro.codec"]) - {:ok, %__MODULE__{ - schema: decoded_schema, - codec: codec_impl, - meta: meta, - sync: decoded_header["sync"], - }, rest} + + {:ok, + %__MODULE__{ + schema: decoded_schema, + codec: codec_impl, + meta: meta, + sync: decoded_header["sync"] + }, rest} end end end diff --git a/test/object_container_encode_test.exs b/test/object_container_encode_test.exs index 855c42b..8c2608a 100644 --- a/test/object_container_encode_test.exs +++ b/test/object_container_encode_test.exs @@ -32,7 +32,7 @@ defmodule AvroEx.ObjectContainer.Encode.Test do headers = for container <- containers do headerdata = ObjectContainer.encode_file_header!(container) - AvroEx.decode!(ObjectContainer.file_header_schema, headerdata) + AvroEx.decode!(ObjectContainer.file_header_schema(), headerdata) end for {header, codec} <- Enum.zip(headers, codecs) do @@ -43,28 +43,28 @@ defmodule AvroEx.ObjectContainer.Encode.Test do test "default codec is null" do container = ObjectContainer.new(AvroEx.decode_schema!(~S("null"))) headerdata = ObjectContainer.encode_file_header!(container) - header = AvroEx.decode!(ObjectContainer.file_header_schema, headerdata) + header = AvroEx.decode!(ObjectContainer.file_header_schema(), headerdata) assert header["meta"]["avro.codec"] == "null" end test "schema is stored in the file header metadata" do container = ObjectContainer.new(AvroEx.decode_schema!(~S("null"))) headerdata = ObjectContainer.encode_file_header!(container) - header = AvroEx.decode!(ObjectContainer.file_header_schema, headerdata) + header = AvroEx.decode!(ObjectContainer.file_header_schema(), headerdata) assert header["meta"]["avro.schema"] == "{\"type\":\"null\"}" end test "user metadata is stored in the file header metadata" do container = ObjectContainer.new(AvroEx.decode_schema!(~S("null")), meta: %{first_time: "12345678"}) headerdata = ObjectContainer.encode_file_header!(container) - header = AvroEx.decode!(ObjectContainer.file_header_schema, headerdata) + header = AvroEx.decode!(ObjectContainer.file_header_schema(), headerdata) assert header["meta"]["first_time"] == "12345678" end test "user metadata does not prevent schema and codec from being written preoperly" do container = ObjectContainer.new(AvroEx.decode_schema!(~S("null")), meta: %{first_time: "12345678"}) headerdata = ObjectContainer.encode_file_header!(container) - header = AvroEx.decode!(ObjectContainer.file_header_schema, headerdata) + header = AvroEx.decode!(ObjectContainer.file_header_schema(), headerdata) assert header["meta"]["avro.codec"] == "null" assert header["meta"]["avro.schema"] == "{\"type\":\"null\"}" end @@ -72,7 +72,7 @@ defmodule AvroEx.ObjectContainer.Encode.Test do test "magic matches standard" do container = ObjectContainer.new(AvroEx.decode_schema!(~S("null"))) headerdata = ObjectContainer.encode_file_header!(container) - header = AvroEx.decode!(ObjectContainer.file_header_schema, headerdata) + header = AvroEx.decode!(ObjectContainer.file_header_schema(), headerdata) assert header["magic"] == <<"Obj", 1>> end end @@ -80,7 +80,7 @@ defmodule AvroEx.ObjectContainer.Encode.Test do test "encode block header" do # TODO: property based test makes more sense encoded_header = ObjectContainer.encode_block_header!(100, 5000) - header = AvroEx.decode!(ObjectContainer.block_header_schema, encoded_header) + header = AvroEx.decode!(ObjectContainer.block_header_schema(), encoded_header) assert header["num_objects"] == 100 assert header["num_bytes"] == 5000 end @@ -92,22 +92,70 @@ defmodule AvroEx.ObjectContainer.Encode.Test do end describe "decode file header" do - @example_file_header AvroEx.encode!(ObjectContainer.file_header_schema, %{ - "magic" => <<"Obj", 1>>, - "meta" => %{ - "avro.schema" => "{\"type\":\"null\"}", - "avro.codec" => "null", - "custom_meta" => "custom_value" - }, - "sync" => <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16>> - }) - test "full valid file header with optional metas" do - {:ok, header, <<>>} = AvroEx.ObjectContainer.decode_file_header(@example_file_header) + {:ok, header, <<>>} = + ObjectContainer.decode_file_header( + AvroEx.encode!(ObjectContainer.file_header_schema(), %{ + "magic" => <<"Obj", 1>>, + "meta" => %{ + "avro.schema" => "{\"type\":\"null\"}", + "avro.codec" => "null", + "custom_meta" => "custom_value" + }, + "sync" => <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16>> + }) + ) + assert header.schema == AvroEx.decode_schema!(nil) assert header.codec == ObjectContainer.Codec.Null assert header.meta == %{"custom_meta" => "custom_value"} assert header.sync == <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16>> end + + test "invalid magic is detected" do + assert {:error, %AvroEx.DecodeError{}} = + ObjectContainer.decode_file_header("some random data stream that doesn't start with right magic") + end + + test "missing schema detected" do + assert {:error, %AvroEx.DecodeError{}} = + ObjectContainer.decode_file_header( + AvroEx.encode!(ObjectContainer.file_header_schema(), %{ + "magic" => <<"Obj", 1>>, + "meta" => %{"avro.codec" => "null"}, + "sync" => <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16>> + }) + ) + end + + test "missing codec defaults to null" do + assert {:ok, header, <<>>} = + ObjectContainer.decode_file_header( + AvroEx.encode!(ObjectContainer.file_header_schema(), %{ + "magic" => <<"Obj", 1>>, + "meta" => %{"avro.schema" => "{\"type\":\"null\"}"}, + "sync" => <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16>> + }) + ) + + assert header.codec == ObjectContainer.Codec.Null + end + + test "missing sync detected" do + data = + AvroEx.encode!(ObjectContainer.file_header_schema(), %{ + "magic" => <<"Obj", 1>>, + "meta" => %{ + "avro.schema" => "{\"type\":\"null\"}", + "avro.codec" => "null", + "custom_meta" => "custom_value" + }, + "sync" => <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16>> + }) + + slice = byte_size(data) - Enum.random(1..16) + <> = data + assert {:error, _} = ObjectContainer.decode_file_header(corrupt_data) + end end end From 5426a95c8a93937a021b5232ddb50b893cdd97de Mon Sep 17 00:00:00 2001 From: Zander Erasmus Date: Mon, 24 Oct 2022 10:24:56 -0700 Subject: [PATCH 10/13] Decode file container objects Implement all the functions that go into decoding parts of the object container --- lib/avro_ex/object_container.ex | 76 ++++++++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/lib/avro_ex/object_container.ex b/lib/avro_ex/object_container.ex index 84f8201..15ed6ed 100644 --- a/lib/avro_ex/object_container.ex +++ b/lib/avro_ex/object_container.ex @@ -94,7 +94,6 @@ defmodule AvroEx.ObjectContainer do end end - defp get_schema(%{"avro.schema" => schema}), do: {:ok, schema} defp get_schema(_), do: {:error, %AvroEx.DecodeError{message: "Invalid or missing schema in file header"}} defp get_codec(%{"avro.codec" => codec}), do: {:ok, codec} @@ -122,4 +121,79 @@ defmodule AvroEx.ObjectContainer do }, rest} end end + + defp check_block_header(%{"num_objects" => num_objects, "num_bytes" => num_bytes}) + when is_integer(num_objects) and num_objects >= 0 and is_integer(num_bytes) and num_bytes >= 0, + do: {:ok, %{num_objects: num_objects, num_bytes: num_bytes}} + + defp check_block_header(%{num_objects: num_objects, num_bytes: num_bytes}) + when is_integer(num_objects) and num_objects >= 0 and is_integer(num_bytes) and num_bytes >= 0, + do: {:ok, %{num_objects: num_objects, num_bytes: num_bytes}} + + defp check_block_header(_), do: {:error, %AvroEx.DecodeError{message: "Invalid block header"}} + + def decode_block_header(data) do + with {:ok, header, rest} <- decode_with_rest(@bh_schema, data), + {:ok, checked_header} <- check_block_header(header) do + {:ok, checked_header, rest} + end + end + + def check_block_footer(%__MODULE__{sync: sync}, <>) when sync == read_sync, + do: {:ok, rest} + + def check_block_footer(_, <<_::128, _::binary>>), do: {:error, %AvroEx.DecodeError{message: "Invalid sync bytes"}} + + defp do_decode_block_objects(file_header, data, objects \\ []) + defp do_decode_block_objects(_file_header, <<>>, objects), do: {:ok, Enum.reverse(objects)} + + defp do_decode_block_objects(%__MODULE__{} = file_header, data, objects) do + with {:ok, object, rest} <- decode_with_rest(file_header.schema, data) do + do_decode_block_objects(file_header, rest, [object | objects]) + end + end + + defp get_object_data(num_bytes, data) do + with <> <- data do + {:ok, object_data, rest} + else + _ -> {:error, %AvroEx.DecodeError{message: "Not enough bytes for block objects"}} + end + end + + defp check_num_objects(objects, num_objects) when length(objects) == num_objects, do: :ok + defp check_num_objects(_, _), do: {:error, %AvroEx.DecodeError{message: "Invalid number of objects"}} + + def decode_block_objects(file_header, block_header, data) do + with {:ok, %{num_objects: num_objects, num_bytes: num_bytes}} <- check_block_header(block_header), + {:ok, object_data, rest} <- get_object_data(num_bytes, data), + {:ok, objects} <- do_decode_block_objects(file_header, object_data), + :ok <- check_num_objects(objects, num_objects) do + {:ok, objects, rest} + end + end + + def decode_block(file_header, data) do + with {:ok, block_header, rest} <- decode_block_header(data), + {:ok, objects, rest} <- decode_block_objects(file_header, block_header, rest), + {:ok, rest} <- check_block_footer(file_header, rest) do + {:ok, objects, rest} + end + end + + defp do_decode_blocks(file_header, data, objects \\ []) + defp do_decode_blocks(_file_header, <<>>, objects), do: {:ok, objects |> Enum.reverse() |> List.flatten()} + + defp do_decode_blocks(file_header, data, objects) do + with {:ok, new_objects, rest} <- decode_block(file_header, data) do + do_decode_blocks(file_header, rest, [new_objects | objects]) + end + end + + def decode_blocks(file_header, data) do + do_decode_blocks(file_header, data) + end + + def decode_file(data) do + end end From 80113f5c1aa9047f1ffc779d429a45c6d78572ef Mon Sep 17 00:00:00 2001 From: Zander Erasmus Date: Sat, 4 Mar 2023 11:27:23 -0800 Subject: [PATCH 11/13] File decoding Implement the function that decodes an object container --- lib/avro_ex/object_container.ex | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/avro_ex/object_container.ex b/lib/avro_ex/object_container.ex index 15ed6ed..e4b5b16 100644 --- a/lib/avro_ex/object_container.ex +++ b/lib/avro_ex/object_container.ex @@ -139,10 +139,11 @@ defmodule AvroEx.ObjectContainer do end end - def check_block_footer(%__MODULE__{sync: sync}, <>) when sync == read_sync, + def check_block_footer(%__MODULE__{sync: sync}, <>) when sync == <>, do: {:ok, rest} - def check_block_footer(_, <<_::128, _::binary>>), do: {:error, %AvroEx.DecodeError{message: "Invalid sync bytes"}} + def check_block_footer(%__MODULE__{sync: sync}, <>), + do: {:error, %AvroEx.DecodeError{message: "Invalid sync bytes: #{inspect(sync)} != #{inspect(read_sync)}"}} defp do_decode_block_objects(file_header, data, objects \\ []) defp do_decode_block_objects(_file_header, <<>>, objects), do: {:ok, Enum.reverse(objects)} @@ -194,6 +195,10 @@ defmodule AvroEx.ObjectContainer do do_decode_blocks(file_header, data) end - def decode_file(data) do + def decode_file(data, opts \\ []) do + with {:ok, %__MODULE__{} = fileheader, rest} <- decode_file_header(data, opts), + {:ok, objects} <- decode_blocks(fileheader, rest) do + {:ok, fileheader, objects} + end end end From 1fbf41358a1b19427ed37995bea970538eb72679 Mon Sep 17 00:00:00 2001 From: Zander Erasmus Date: Sat, 4 Mar 2023 11:28:33 -0800 Subject: [PATCH 12/13] Test Encode/Decode blocks and object containers --- test/object_container_encode_test.exs | 74 ++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 12 deletions(-) diff --git a/test/object_container_encode_test.exs b/test/object_container_encode_test.exs index 8c2608a..3ad4489 100644 --- a/test/object_container_encode_test.exs +++ b/test/object_container_encode_test.exs @@ -77,18 +77,14 @@ defmodule AvroEx.ObjectContainer.Encode.Test do end end - test "encode block header" do - # TODO: property based test makes more sense - encoded_header = ObjectContainer.encode_block_header!(100, 5000) - header = AvroEx.decode!(ObjectContainer.block_header_schema(), encoded_header) - assert header["num_objects"] == 100 - assert header["num_bytes"] == 5000 - end - - describe "encode block objects" do - end - - describe "encode file" do + describe "block header" do + test "encode and then decode block header" do + # TODO: property based test makes more sense + encoded_header = ObjectContainer.encode_block_header!(100, 5000) + header = AvroEx.decode!(ObjectContainer.block_header_schema(), encoded_header) + assert header["num_objects"] == 100 + assert header["num_bytes"] == 5000 + end end describe "decode file header" do @@ -158,4 +154,58 @@ defmodule AvroEx.ObjectContainer.Encode.Test do assert {:error, _} = ObjectContainer.decode_file_header(corrupt_data) end end + + describe "encode and decode block objects" do + setup testinfo do + data_scema = + AvroEx.decode_schema!(%{ + "type" => "record", + "name" => "block_test_data", + "fields" => [%{"name" => "testdata1", "type" => "bytes"}, %{"name" => "testdata2", "type" => "int"}] + }) + ocf = ObjectContainer.new(data_scema) + testinfo |> Map.merge(%{data_scema: data_scema, ocf: ocf}) + end + + test "encode and then decode objects", %{ocf: ocf} do + data_input = for v <- 1..10, do: %{"testdata1" => "test#{v}", "testdata2" => v} + encoded = ObjectContainer.encode_block_objects!(ocf, data_input) + block_header = %{num_objects: 10, num_bytes: byte_size(encoded)} + {:ok, data_output, _rest} = ObjectContainer.decode_block_objects(ocf, block_header, encoded) + assert data_output == data_input + end + end + + describe "full object container" do + setup testinfo do + data_scema = + AvroEx.decode_schema!(%{ + "type" => "record", + "name" => "block_test_data", + "fields" => [%{"name" => "testdata1", "type" => "bytes"}, %{"name" => "testdata2", "type" => "int"}] + }) + ocf = ObjectContainer.new(data_scema) + testinfo |> Map.merge(%{data_scema: data_scema, ocf: ocf}) + end + + test "encode and then decode a file with a single block", %{data_scema: data_scema, ocf: ocf} do + data_input = for v <- 1..10, do: %{"testdata1" => "test#{v}", "testdata2" => v} + file_data = ObjectContainer.encode_file!(ocf, data_input) + {:ok, ocf_output, data_output} = ObjectContainer.decode_file(file_data) + assert AvroEx.encode_schema(ocf_output.schema) == AvroEx.encode_schema(data_scema) + assert data_output == data_input + end + + test "encode and then decode a file with a multiple blocks", %{data_scema: data_scema, ocf: ocf} do + data_input = for v <- 1..30, do: %{"testdata1" => "test#{v}", "testdata2" => v} + data_chunks = Enum.chunk_every(data_input, 10) + file_data = ObjectContainer.encode_file!(ocf, hd(data_chunks)) + file_data = for block <- tl(data_chunks), reduce: file_data do + acc -> acc <> ObjectContainer.encode_block!(ocf, block) + end + {:ok, ocf_output, data_output} = ObjectContainer.decode_file(file_data) + assert AvroEx.encode_schema(ocf_output.schema) == AvroEx.encode_schema(data_scema) + assert data_output == data_input + end + end end From 1689f3cbfb6486d7e84a3dd3bb3ccf7fca85e251 Mon Sep 17 00:00:00 2001 From: Zander Erasmus Date: Sat, 4 Mar 2023 15:13:13 -0800 Subject: [PATCH 13/13] Make the snappy codec optional. The implementation of the snappy codec in avro is quite unique, so providing an implementation is valuable. Uses the optional dependency method similar to ecto. --- lib/avro_ex/object_container/codec/snappy.ex | 47 ++++++++++++++------ 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/lib/avro_ex/object_container/codec/snappy.ex b/lib/avro_ex/object_container/codec/snappy.ex index ab70fb8..409d180 100644 --- a/lib/avro_ex/object_container/codec/snappy.ex +++ b/lib/avro_ex/object_container/codec/snappy.ex @@ -2,22 +2,41 @@ defmodule AvroEx.ObjectContainer.Codec.Snappy do @behaviour AvroEx.ObjectContainer.Codec @impl AvroEx.ObjectContainer.Codec def name(), do: :snappy - @impl AvroEx.ObjectContainer.Codec - def encode!(data) do - {:ok, compressed} = :snappyer.compress(data) - <> - end - @impl AvroEx.ObjectContainer.Codec - def decode!(data) do - len = byte_size(data) - 4 - <> = data - {:ok, decompressed} = :snappyer.decompress(compressed) + if Code.ensure_loaded?(:snappyer) do + @impl AvroEx.ObjectContainer.Codec + def encode!(data) do + {:ok, compressed} = :snappyer.compress(data) + <> + end + + @impl AvroEx.ObjectContainer.Codec + def decode!(data) do + len = byte_size(data) - 4 + <> = data + {:ok, decompressed} = :snappyer.decompress(compressed) + + if crc == :erlang.crc32(decompressed) do + decompressed + else + raise %AvroEx.DecodeError{message: "CRC mismatch during decompression"} + end + end + else + @impl AvroEx.ObjectContainer.Codec + def encode!(_data) do + raise """ + Cannot encode data using the Snappy codec because snappyer has not been loaded. + If you require Snappy compression, you must add snappyer as a dependency in your mix.exs file. + """ + end - if crc == :erlang.crc32(decompressed) do - decompressed - else - raise %AvroEx.DecodeError{message: "CRC mismatch during decompression"} + @impl AvroEx.ObjectContainer.Codec + def decode!(_data) do + raise """ + Cannot encode data using the Snappy codec because snappyer has not been loaded. + If you require Snappy compression, you must add snappyer as a dependency in your mix.exs file. + """ end end end