Skip to content

Commit

Permalink
[GOBBLIN-1951] Emit GTE when deleting corrupted ORC files (#3821)
Browse files Browse the repository at this point in the history
* [GOBBLIN-1951] Emit GTE when deleting corrupted ORC files

This commit adds ORC file validation during the commit phase and deletes
corrupted files. It also includes a test for ORC file validation.

* Linter fixes
  • Loading branch information
homatthew authored Nov 7, 2023
1 parent 10397ab commit 50b6ca9
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.gobblin.util.HadoopUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
Expand All @@ -37,18 +38,28 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.state.ConstructState;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.JobConfigurationUtils;


/**
* A wrapper for ORC-core writer without dependency on Hive SerDe library.
*/
@Slf4j
public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
public static final String ORC_WRITER_NAMESPACE = "gobblin.orc.writer";
public static final String CORRUPTED_ORC_FILE_DELETION_EVENT = "CorruptedOrcFileDeletion";

protected final MetricContext metricContext;
protected final OrcValueWriter<D> valueWriter;
@VisibleForTesting
VectorizedRowBatch rowBatch;
Expand Down Expand Up @@ -113,6 +124,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder<S, D> builder, State properties)
this.orcWriterStripeSizeBytes = properties.getPropAsLong(OrcConf.STRIPE_SIZE.getAttribute(), (long) OrcConf.STRIPE_SIZE.getDefaultValue());
this.converterMemoryManager = new OrcConverterMemoryManager(this.rowBatch, properties);
this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, properties);
this.metricContext = getMetricContext();

// Track the number of other writer tasks from different datasets ingesting on the same container
this.concurrentWriterTasks = properties.getPropAsInt(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_CONCURRENT_TASKS, 1);
Expand Down Expand Up @@ -262,16 +274,11 @@ public void close()
public void commit()
throws IOException {
closeInternal();
// Validate the ORC file after writer close. Default is false as it introduce more load to FS and decrease the performance
// Validate the ORC file after writer close. Default is false as it introduce more load to FS from an extra read call
if(this.validateORCAfterClose) {
try (Reader reader = OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf))) {
} catch (Exception e) {
log.error("Found error when validating staging ORC file {} during commit phase. "
+ "Will delete the malformed file and terminate the commit", this.stagingFile, e);
HadoopUtils.deletePath(this.fs, this.stagingFile, false);
throw e;
}
assertOrcFileIsValid(this.fs, this.stagingFile, new OrcFile.ReaderOptions(conf), this.metricContext);
}

super.commit();

if (this.selfTuningWriter) {
Expand Down Expand Up @@ -375,4 +382,26 @@ public void write(D record) throws IOException {
this.flush();
}
}

protected MetricContext getMetricContext() {
return Instrumented.getMetricContext(new State(properties), this.getClass());
}

@VisibleForTesting
@SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE",
justification = "Find bugs believes the eventBuilder is always null and that there is a null check, "
+ "but both are not true.")
static void assertOrcFileIsValid(FileSystem fs, Path filePath, OrcFile.ReaderOptions readerOptions, MetricContext metricContext) throws IOException {
try (Reader ignored = OrcFile.createReader(filePath, readerOptions)) {
} catch (Exception e) {
log.error("Found error when validating staging ORC file {} during the commit phase. "
+ "Will delete the malformed file and abort the commit by throwing an exception", filePath, e);
HadoopUtils.deletePath(fs, filePath, false);
GobblinEventBuilder eventBuilder = new GobblinEventBuilder(CORRUPTED_ORC_FILE_DELETION_EVENT, GobblinBaseOrcWriter.ORC_WRITER_NAMESPACE);
eventBuilder.addMetadata("filePath", filePath.toString());
EventSubmitter.submit(metricContext, eventBuilder);

throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;

Expand All @@ -39,18 +37,16 @@
*/
@Slf4j
public class InstrumentedGobblinOrcWriter extends GobblinOrcWriter {
MetricContext metricContext;

public static final String METRICS_SCHEMA_NAME = "schemaName";
public static final String METRICS_BYTES_WRITTEN = "bytesWritten";
public static final String METRICS_RECORDS_WRITTEN = "recordsWritten";
public static final String METRICS_BUFFER_RESIZES = "bufferResizes";
public static final String METRICS_BUFFER_SIZE = "bufferSize";
public static final String ORC_WRITER_METRICS_NAME = "OrcWriterMetrics";
private static final String ORC_WRITER_NAMESPACE = "gobblin.orc.writer";

public InstrumentedGobblinOrcWriter(FsDataWriterBuilder<Schema, GenericRecord> builder, State properties) throws IOException {
super(builder, properties);
metricContext = Instrumented.getMetricContext(new State(properties), this.getClass());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.writer;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.orc.FileFormatException;
import org.apache.orc.OrcFile;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

import com.google.common.io.Files;

import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;

import static org.apache.gobblin.writer.GobblinBaseOrcWriter.CORRUPTED_ORC_FILE_DELETION_EVENT;


public class GobblinBaseOrcWriterTest {

@Test
public void testOrcValidation()
throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
File tmpDir = Files.createTempDir();
File corruptedOrcFile = new File(tmpDir, "test.orc");
try (FileWriter writer = new FileWriter(corruptedOrcFile)) {
// write a corrupted ORC file that only contains the header but without content
writer.write(OrcFile.MAGIC);
}

OrcFile.ReaderOptions readerOptions = new OrcFile.ReaderOptions(conf);

MetricContext mockContext = Mockito.mock(MetricContext.class);
Path p = new Path(corruptedOrcFile.getAbsolutePath());
Assert.assertThrows(FileFormatException.class,
() -> GobblinBaseOrcWriter.assertOrcFileIsValid(fs, p, readerOptions, mockContext));

GobblinEventBuilder eventBuilder = new GobblinEventBuilder(CORRUPTED_ORC_FILE_DELETION_EVENT, GobblinBaseOrcWriter.ORC_WRITER_NAMESPACE);
eventBuilder.addMetadata("filePath", p.toString());
Mockito.verify(mockContext, Mockito.times(1))
.submitEvent(eventBuilder.build());
}
}

0 comments on commit 50b6ca9

Please sign in to comment.