Skip to content

Commit

Permalink
Add fix from Avro main stream for broken schema evolution (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-zh authored Aug 18, 2020
1 parent 93ffb3b commit 23d53a3
Showing 1 changed file with 54 additions and 2 deletions.
56 changes: 54 additions & 2 deletions lib/avro/datum.php
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ public function read_record($writers_schema, $readers_schema, $decoder)
$readers_fields[$writers_field->name()]->type(),
$decoder);
else
$this->skip_data($type, $decoder);
self::skip_data($type, $decoder);
}
// Fill in default values
if (count($readers_fields) > count($record))
Expand Down Expand Up @@ -816,7 +816,7 @@ public function read_default_value($field_schema, $default_value)
* @return
* @throws AvroException
*/
private function skip_data($writers_schema, $decoder)
public static function skip_data($writers_schema, $decoder)
{
switch ($writers_schema->type())
{
Expand Down Expand Up @@ -1030,6 +1030,58 @@ public function skip_bytes() { return $this->skip($this->read_long()); }

public function skip_string() { return $this->skip_bytes(); }

public function skip_fixed($writers_schema, AvroIOBinaryDecoder $decoder)
{
$decoder->skip($writers_schema->size());
}

public function skip_enum($writers_schema, AvroIOBinaryDecoder $decoder)
{
$decoder->skip_int();
}

public function skip_union($writers_schema, AvroIOBinaryDecoder $decoder)
{
$index = $decoder->read_long();
AvroIODatumReader::skip_data($writers_schema->schema_by_index($index), $decoder);
}

public function skip_record($writers_schema, AvroIOBinaryDecoder $decoder)
{
foreach ($writers_schema->fields() as $f) {
AvroIODatumReader::skip_data($f->type(), $decoder);
}
}

public function skip_array($writers_schema, AvroIOBinaryDecoder $decoder)
{
$block_count = $decoder->read_long();
while (0 !== $block_count) {
if ($block_count < 0) {
$decoder->skip($this->read_long());
}
for ($i = 0; $i < $block_count; $i++) {
AvroIODatumReader::skip_data($writers_schema->items(), $decoder);
}
$block_count = $decoder->read_long();
}
}

public function skip_map($writers_schema, AvroIOBinaryDecoder $decoder)
{
$block_count = $decoder->read_long();
while (0 !== $block_count) {
if ($block_count < 0) {
$decoder->skip($this->read_long());
}
for ($i = 0; $i < $block_count; $i++) {
$decoder->skip_string();
AvroIODatumReader::skip_data($writers_schema->values(), $decoder);
}
$block_count = $decoder->read_long();
}
}

/**
* @param int $len count of bytes to skip
* @uses AvroIO::seek()
Expand Down

0 comments on commit 23d53a3

Please sign in to comment.