From 50b6ca998d47e4b42b6823d0c3816e12ad89d79d Mon Sep 17 00:00:00 2001 From: Matthew Ho Date: Tue, 7 Nov 2023 12:59:54 -0800 Subject: [PATCH] [GOBBLIN-1951] Emit GTE when deleting corrupted ORC files (#3821) * [GOBBLIN-1951] Emit GTE when deleting corrupted ORC files This commit adds ORC file validation during the commit phase and deletes corrupted files. It also includes a test for ORC file validation. * Linter fixes --- .../gobblin/writer/GobblinBaseOrcWriter.java | 47 ++++++++++--- .../writer/InstrumentedGobblinOrcWriter.java | 6 +- .../writer/GobblinBaseOrcWriterTest.java | 67 +++++++++++++++++++ 3 files changed, 106 insertions(+), 14 deletions(-) create mode 100644 gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinBaseOrcWriterTest.java diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java index 9f81e82d9ca..bb6c11aaaa4 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java @@ -24,8 +24,9 @@ import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.gobblin.util.HadoopUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.orc.OrcConf; import org.apache.orc.OrcFile; @@ -37,18 +38,28 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.event.EventSubmitter; +import org.apache.gobblin.metrics.event.GobblinEventBuilder; import org.apache.gobblin.state.ConstructState; +import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.JobConfigurationUtils; + /** * A wrapper for ORC-core writer without dependency on Hive SerDe library. */ @Slf4j public abstract class GobblinBaseOrcWriter extends FsDataWriter { + public static final String ORC_WRITER_NAMESPACE = "gobblin.orc.writer"; + public static final String CORRUPTED_ORC_FILE_DELETION_EVENT = "CorruptedOrcFileDeletion"; + protected final MetricContext metricContext; protected final OrcValueWriter valueWriter; @VisibleForTesting VectorizedRowBatch rowBatch; @@ -113,6 +124,7 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder builder, State properties) this.orcWriterStripeSizeBytes = properties.getPropAsLong(OrcConf.STRIPE_SIZE.getAttribute(), (long) OrcConf.STRIPE_SIZE.getDefaultValue()); this.converterMemoryManager = new OrcConverterMemoryManager(this.rowBatch, properties); this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, properties); + this.metricContext = getMetricContext(); // Track the number of other writer tasks from different datasets ingesting on the same container this.concurrentWriterTasks = properties.getPropAsInt(GobblinOrcWriterConfigs.RuntimeStateConfigs.ORC_WRITER_CONCURRENT_TASKS, 1); @@ -262,16 +274,11 @@ public void close() public void commit() throws IOException { closeInternal(); - // Validate the ORC file after writer close. Default is false as it introduce more load to FS and decrease the performance + // Validate the ORC file after writer close. Default is false as it introduce more load to FS from an extra read call if(this.validateORCAfterClose) { - try (Reader reader = OrcFile.createReader(this.stagingFile, new OrcFile.ReaderOptions(conf))) { - } catch (Exception e) { - log.error("Found error when validating staging ORC file {} during commit phase. " - + "Will delete the malformed file and terminate the commit", this.stagingFile, e); - HadoopUtils.deletePath(this.fs, this.stagingFile, false); - throw e; - } + assertOrcFileIsValid(this.fs, this.stagingFile, new OrcFile.ReaderOptions(conf), this.metricContext); } + super.commit(); if (this.selfTuningWriter) { @@ -375,4 +382,26 @@ public void write(D record) throws IOException { this.flush(); } } + + protected MetricContext getMetricContext() { + return Instrumented.getMetricContext(new State(properties), this.getClass()); + } + + @VisibleForTesting + @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE", + justification = "Find bugs believes the eventBuilder is always null and that there is a null check, " + + "but both are not true.") + static void assertOrcFileIsValid(FileSystem fs, Path filePath, OrcFile.ReaderOptions readerOptions, MetricContext metricContext) throws IOException { + try (Reader ignored = OrcFile.createReader(filePath, readerOptions)) { + } catch (Exception e) { + log.error("Found error when validating staging ORC file {} during the commit phase. " + + "Will delete the malformed file and abort the commit by throwing an exception", filePath, e); + HadoopUtils.deletePath(fs, filePath, false); + GobblinEventBuilder eventBuilder = new GobblinEventBuilder(CORRUPTED_ORC_FILE_DELETION_EVENT, GobblinBaseOrcWriter.ORC_WRITER_NAMESPACE); + eventBuilder.addMetadata("filePath", filePath.toString()); + EventSubmitter.submit(metricContext, eventBuilder); + + throw e; + } + } } diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java index c32f1e4e38a..ff2fa26b173 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java @@ -28,8 +28,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.State; -import org.apache.gobblin.instrumented.Instrumented; -import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.metrics.event.GobblinEventBuilder; @@ -39,18 +37,16 @@ */ @Slf4j public class InstrumentedGobblinOrcWriter extends GobblinOrcWriter { - MetricContext metricContext; + public static final String METRICS_SCHEMA_NAME = "schemaName"; public static final String METRICS_BYTES_WRITTEN = "bytesWritten"; public static final String METRICS_RECORDS_WRITTEN = "recordsWritten"; public static final String METRICS_BUFFER_RESIZES = "bufferResizes"; public static final String METRICS_BUFFER_SIZE = "bufferSize"; public static final String ORC_WRITER_METRICS_NAME = "OrcWriterMetrics"; - private static final String ORC_WRITER_NAMESPACE = "gobblin.orc.writer"; public InstrumentedGobblinOrcWriter(FsDataWriterBuilder builder, State properties) throws IOException { super(builder, properties); - metricContext = Instrumented.getMetricContext(new State(properties), this.getClass()); } @Override diff --git a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinBaseOrcWriterTest.java b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinBaseOrcWriterTest.java new file mode 100644 index 00000000000..81387059787 --- /dev/null +++ b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinBaseOrcWriterTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.writer; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.orc.FileFormatException; +import org.apache.orc.OrcFile; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.io.Files; + +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.event.GobblinEventBuilder; + +import static org.apache.gobblin.writer.GobblinBaseOrcWriter.CORRUPTED_ORC_FILE_DELETION_EVENT; + + +public class GobblinBaseOrcWriterTest { + + @Test + public void testOrcValidation() + throws IOException { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + File tmpDir = Files.createTempDir(); + File corruptedOrcFile = new File(tmpDir, "test.orc"); + try (FileWriter writer = new FileWriter(corruptedOrcFile)) { + // write a corrupted ORC file that only contains the header but without content + writer.write(OrcFile.MAGIC); + } + + OrcFile.ReaderOptions readerOptions = new OrcFile.ReaderOptions(conf); + + MetricContext mockContext = Mockito.mock(MetricContext.class); + Path p = new Path(corruptedOrcFile.getAbsolutePath()); + Assert.assertThrows(FileFormatException.class, + () -> GobblinBaseOrcWriter.assertOrcFileIsValid(fs, p, readerOptions, mockContext)); + + GobblinEventBuilder eventBuilder = new GobblinEventBuilder(CORRUPTED_ORC_FILE_DELETION_EVENT, GobblinBaseOrcWriter.ORC_WRITER_NAMESPACE); + eventBuilder.addMetadata("filePath", p.toString()); + Mockito.verify(mockContext, Mockito.times(1)) + .submitEvent(eventBuilder.build()); + } +}