-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Add encoder/decoder for Avro object container files #81
base: master
Are you sure you want to change the base?
Changes from 3 commits
7cb2259
426e44f
efb6c76
d012a83
dd981a4
22ee254
12c4514
186ef44
99a97f2
5426a95
80113f5
1fbf413
6459f86
1689f3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
defmodule AvroEx.ObjectContainer do | ||
use TypedStruct | ||
|
||
alias AvroEx.{Schema} | ||
|
||
@type codec_types :: :null | :deflate | :bzip2 | :snappy | :xz | :zstandard | ||
|
||
typedstruct do | ||
field :schema, Schema.t() | ||
field :codec, codec_types(), default: :null | ||
field :meta, map(), default: %{} | ||
field :sync, <<_::128>> | ||
end | ||
|
||
@magic <<"Obj", 1>> | ||
@bh_schema AvroEx.decode_schema!(~S({ | ||
"type":"record","name":"block_header", | ||
"fields":[ | ||
{"name":"num_objects","type":"long"}, | ||
{"name":"num_bytes","type":"long"} | ||
] | ||
})) | ||
@fh_schema AvroEx.decode_schema!(~S({ | ||
"type": "record", "name": "org.apache.avro.file.Header", | ||
"fields" : [ | ||
{"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}}, | ||
{"name": "meta", "type": {"type": "map", "values": "bytes"}}, | ||
{"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}} | ||
] | ||
})) | ||
|
||
def magic(), do: @magic | ||
def block_header_schema(), do: @bh_schema | ||
def file_header_schema(), do: @fh_schema | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see a need to expose these directly There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The magic is unnecessary, but the raw schemas are useful to have. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO we should drop the need for them at tests, since we can just validate against expected inputs/outputs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is true. The format isn't expected to change either, so the tests won't need to be updated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The best way to test this is that you can round trip the data through the encoder, so that should be completely reasonable |
||
|
||
def new(schema, opts \\ []) do | ||
%__MODULE__{ | ||
schema: schema, | ||
codec: Keyword.get(opts, :codec, :null), | ||
meta: Keyword.get(opts, :meta, %{}), | ||
sync: :rand.bytes(16) | ||
} | ||
end | ||
|
||
def encode_file_header!(ocf = %__MODULE__{}) do | ||
veedo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
metadata = | ||
%{ | ||
"avro.schema" => AvroEx.encode_schema(ocf.schema), | ||
"avro.codec" => to_string(ocf.codec) | ||
} | ||
|> Map.merge(ocf.meta) | ||
|
||
AvroEx.encode!(@fh_schema, %{ | ||
magic: @magic, | ||
meta: metadata, | ||
sync: ocf.sync | ||
}) | ||
end | ||
|
||
@spec encode_block_header!(pos_integer(), pos_integer()) :: binary() | ||
def encode_block_header!(num_objects, encoded_data_size) do | ||
header = %{"num_objects" => num_objects, "num_bytes" => encoded_data_size} | ||
AvroEx.encode!(@bh_schema, header) | ||
end | ||
|
||
def encode_block_footer!(ocf = %__MODULE__{}), do: ocf.sync | ||
|
||
def encode_block_objects!(ocf = %__MODULE__{}, objects) do | ||
codec = AvroEx.ObjectContainer.Codec.get_codec!(ocf.codec) | ||
|
||
for obj <- objects, reduce: <<>> do | ||
acc -> acc <> AvroEx.encode!(ocf.schema, obj) | ||
end | ||
|> codec.encode!() | ||
end | ||
|
||
def encode_block!(ocf = %__MODULE__{}, objects) do | ||
data = encode_block_objects!(ocf, objects) | ||
encode_block_header!(length(objects), byte_size(data)) <> data <> encode_block_footer!(ocf) | ||
end | ||
|
||
def encode_file!(ocf = %__MODULE__{}, objects) do | ||
encode_file_header!(ocf) <> encode_block!(ocf, objects) | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
defmodule AvroEx.ObjectContainer.Codec do | ||
@callback encode!(binary()) :: binary() | ||
@callback decode!(binary()) :: binary() | ||
|
||
defp get_codec_config(codec, dflt), do: Application.get_env(:avro_ex, codec, dflt) | ||
@spec get_codec!(atom) :: __MODULE__.Null | ||
def get_codec!(:null), do: get_codec_config(:null, AvroEx.ObjectContainer.Codec.Null) | ||
def get_codec!(:deflate), do: get_codec_config(:deflate, AvroEx.ObjectContainer.Codec.Deflate) | ||
def get_codec!(:snappy), do: get_codec_config(:snappy, AvroEx.ObjectContainer.Codec.Snappy) | ||
def get_codec!(codec), do: Application.fetch_env!(:avro_ex, codec) | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
defmodule AvroEx.ObjectContainer.Codec.Deflate do | ||
@behaviour AvroEx.ObjectContainer.Codec | ||
@impl AvroEx.ObjectContainer.Codec | ||
def encode!(data), do: :zlib.zip(data) | ||
@impl AvroEx.ObjectContainer.Codec | ||
def decode!(data), do: :zlib.unzip(data) | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
defmodule AvroEx.ObjectContainer.Codec.Null do | ||
@behaviour AvroEx.ObjectContainer.Codec | ||
@impl AvroEx.ObjectContainer.Codec | ||
def encode!(data), do: data | ||
@impl AvroEx.ObjectContainer.Codec | ||
def decode!(data), do: data | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
defmodule AvroEx.ObjectContainer.Codec.Snappy do | ||
@behaviour AvroEx.ObjectContainer.Codec | ||
@impl AvroEx.ObjectContainer.Codec | ||
def encode!(data) do | ||
{:ok, compressed} = :snappyer.compress(data) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We will need to add these as optional dependencies, no? We should also probably not compile this module if the user does not have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm torn over how to handle it. My plan was to worry about it after finishing the rest of the object container |
||
<<compressed, :erlang.crc32(data)::32>> | ||
end | ||
|
||
@impl AvroEx.ObjectContainer.Codec | ||
def decode!(data) do | ||
len = byte_size(data) - 4 | ||
<<compressed::binary-size(len), crc::32>> = data | ||
{:ok, decompressed} = :snappyer.decompress(compressed) | ||
|
||
if crc == :erlang.crc32(decompressed) do | ||
decompressed | ||
else | ||
raise %AvroEx.DecodeError{message: "CRC mismatch during decompression"} | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
defmodule AvroEx.ObjectContainer.Encode.Test do | ||
use ExUnit.Case, async: true | ||
|
||
@test_module AvroEx.ObjectContainer | ||
veedo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
describe "encode file header" do | ||
test "new containers have different sync bytes" do | ||
containers = | ||
for _ <- 1..10 do | ||
@test_module.new(nil) | ||
end | ||
|
||
for container <- containers do | ||
others = containers |> List.delete(container) | ||
|
||
for other <- others do | ||
refute container.sync == other.sync | ||
end | ||
end | ||
end | ||
|
||
# TODO: use multiple schemas instead of just "null" | ||
test "codec embedded in header" do | ||
codecs = [:null, :deflate, :bzip2, :snappy, :xz, :zstandard] | ||
|
||
containers = | ||
for codec <- codecs do | ||
@test_module.new(AvroEx.decode_schema!(~S("null")), codec: codec) | ||
end | ||
|
||
headers = | ||
for container <- containers do | ||
headerdata = @test_module.encode_file_header!(container) | ||
AvroEx.decode!(@test_module.file_header_schema, headerdata) | ||
end | ||
|
||
for {header, codec} <- Enum.zip(headers, codecs) do | ||
assert header["meta"]["avro.codec"] == to_string(codec) | ||
end | ||
end | ||
|
||
test "default codec is null" do | ||
container = @test_module.new(AvroEx.decode_schema!(~S("null"))) | ||
headerdata = @test_module.encode_file_header!(container) | ||
header = AvroEx.decode!(@test_module.file_header_schema, headerdata) | ||
assert header["meta"]["avro.codec"] == "null" | ||
end | ||
|
||
test "schema is stored in the file header metadata" do | ||
container = @test_module.new(AvroEx.decode_schema!(~S("null"))) | ||
headerdata = @test_module.encode_file_header!(container) | ||
header = AvroEx.decode!(@test_module.file_header_schema, headerdata) | ||
assert header["meta"]["avro.schema"] == "{\"type\":\"null\"}" | ||
end | ||
|
||
test "user metadata is stored in the file header metadata" do | ||
container = @test_module.new(AvroEx.decode_schema!(~S("null")), meta: %{first_time: "12345678"}) | ||
headerdata = @test_module.encode_file_header!(container) | ||
header = AvroEx.decode!(@test_module.file_header_schema, headerdata) | ||
assert header["meta"]["first_time"] == "12345678" | ||
end | ||
|
||
test "user metadata does not prevent schema and codec from being written preoperly" do | ||
container = @test_module.new(AvroEx.decode_schema!(~S("null")), meta: %{first_time: "12345678"}) | ||
headerdata = @test_module.encode_file_header!(container) | ||
header = AvroEx.decode!(@test_module.file_header_schema, headerdata) | ||
assert header["meta"]["avro.codec"] == "null" | ||
assert header["meta"]["avro.schema"] == "{\"type\":\"null\"}" | ||
end | ||
|
||
test "magic matches standard" do | ||
container = @test_module.new(AvroEx.decode_schema!(~S("null"))) | ||
headerdata = @test_module.encode_file_header!(container) | ||
header = AvroEx.decode!(@test_module.file_header_schema, headerdata) | ||
assert header["magic"] == <<"Obj", 1>> | ||
end | ||
end | ||
|
||
test "encode block header" do | ||
# TODO: property based test makes more sense | ||
encoded_header = @test_module.encode_block_header!(100, 5000) | ||
header = AvroEx.decode!(@test_module.block_header_schema, encoded_header) | ||
assert header["num_objects"] == 100 | ||
assert header["num_bytes"] == 5000 | ||
end | ||
|
||
describe "encode block objects" do | ||
end | ||
|
||
describe "encode file" do | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
decode_schema!/1
here causing a compile time dependency between this module andAvroEx
. Since this is a premature optimization, I suggest we move out of module attributesWDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't notice that, thanks.
A good reason for using json may be to "match" with the Avro spec:
vs
the spec is unlikely to change, so no one will ever need to update it
I don't have a strong opinion about it 🤷
let me know which you prefer with the options side by side
I'm not aware of any downsides. I work with a lot of embedded devices though, so doing stuff at compile time feels natural to me.
Is there a reason to avoid this optimization?
I can stick it in the
file_header_schema/1
insteadThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it shouldn't change much over time, I think its fine to represent in elixir and get the benefits of formatting. It is also easier to edit as elixir code
Regarding compile-time, it shouldn't make a massive difference here, but accrued work at compile time does effect the time it takes to compile the library.