Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Date type in HCatToRow #32695

Merged
merged 3 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.hcatalog;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -25,6 +27,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.joda.time.Instant;

/** Utilities to convert {@link HCatRecord HCatRecords} to {@link Row Rows}. */
@SuppressWarnings({
Expand Down Expand Up @@ -74,14 +77,26 @@ public PCollection<Row> expand(PBegin input) {
private static class HCatToRowFn extends DoFn<HCatRecord, Row> {
private final Schema schema;

private Object maybeCastHDate(Object obj) {
if (obj instanceof org.apache.hadoop.hive.common.type.Date) {
return new Instant(((org.apache.hadoop.hive.common.type.Date) obj).toEpochMilli());
}
return obj;
}

/** Cast objects of the types that aren't supported by {@link Row}. */
private List<Object> castTypes(List<Object> values) {
return values.stream().map(this::maybeCastHDate).collect(Collectors.toList());
}

HCatToRowFn(Schema schema) {
this.schema = schema;
}

@ProcessElement
public void processElement(ProcessContext c) {
HCatRecord hCatRecord = c.element();
c.output(Row.withSchema(schema).addValues(hCatRecord.getAll()).build());
c.output(Row.withSchema(schema).addValues(castTypes(hCatRecord.getAll())).build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.TEST_RECORDS_COUNT;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.TEST_TABLE;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.buildHCatRecords;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.buildHCatRecordsWithDate;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getConfigPropertiesAsMap;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getExpectedRecords;
import static org.apache.beam.sdk.io.hcatalog.test.HCatalogIOTestUtils.getReaderContext;
Expand Down Expand Up @@ -54,12 +55,14 @@
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
Expand Down Expand Up @@ -230,6 +233,44 @@ public void processElement(ProcessContext c) {
readAfterWritePipeline.run();
}

/** Perform test for reading Date column type from an hcatalog. */
@Test
public void testReadHCatalogDateType() throws Exception {
service.executeQuery("drop table if exists " + TEST_TABLE);
service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, mycol2 date)");

defaultPipeline
.apply(Create.of(buildHCatRecordsWithDate(TEST_RECORDS_COUNT)))
.apply(
HCatalogIO.write()
.withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
.withDatabase(TEST_DATABASE)
.withTable(TEST_TABLE)
.withPartition(new java.util.HashMap<>()));
defaultPipeline.run().waitUntilFinish();

final PCollection<String> output =
readAfterWritePipeline
.apply(
HCatToRow.fromSpec(
HCatalogIO.read()
.withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
.withDatabase(TEST_DATABASE)
.withTable(TEST_TABLE)
.withFilter(TEST_FILTER)))
.apply(
ParDo.of(
new DoFn<Row, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().getDateTime("mycol2").toString("yyyy-MM-dd HH:mm:ss"));
}
}))
.apply(Distinct.create());
PAssert.that(output).containsInAnyOrder(ImmutableList.of("2014-01-20 00:00:00"));
readAfterWritePipeline.run();
}

/** Test of Write to a non-existent table. */
@Test
public void testWriteFailureTableDoesNotExist() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map.Entry;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.values.KV;
import org.apache.hadoop.hive.common.type.Date;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
Expand Down Expand Up @@ -120,4 +121,13 @@ public static Map<String, String> getConfigPropertiesAsMap(HiveConf hiveConf) {
private static DefaultHCatRecord toHCatRecord(int value) {
return new DefaultHCatRecord(Arrays.asList("record " + value, value));
}

/** Returns a list of HCatRecords of passed size with some dummy date as a field. */
public static List<HCatRecord> buildHCatRecordsWithDate(int size) {
List<HCatRecord> expected = new ArrayList<>();
for (int i = 0; i < size; i++) {
expected.add(new DefaultHCatRecord(Arrays.asList("record " + i, Date.valueOf("2014-01-20"))));
}
return expected;
}
}
Loading