diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java index 7b3a3e8974b9..daa8b5e1b925 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java @@ -108,6 +108,7 @@ * {@link TableSchema.TypeName#ARRAY} {@link Schema.TypeName#ARRAY} * {@link TableSchema.TypeName#ENUM8} {@link Schema.TypeName#STRING} * {@link TableSchema.TypeName#ENUM16} {@link Schema.TypeName#STRING} + * {@link TableSchema.TypeName#BOOL} {@link Schema.TypeName#BOOLEAN} * * * Nullable row columns are supported through Nullable type in ClickHouse. diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java index 97eea2b82558..8ed62eee3b59 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java @@ -143,6 +143,9 @@ static void writeValue(ClickHouseOutputStream stream, ColumnType columnType, Obj writeValue(stream, columnType.arrayElementType(), arrayValue); } break; + case BOOL: + BinaryStreamUtils.writeBoolean(stream, (Boolean) value); + break; } } diff --git a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java index 353be8c537d5..06ba2399a3cd 100644 --- a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java +++ b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java @@ -109,6 +109,8 @@ public static Schema.FieldType getEquivalentFieldType(ColumnType columnType) { case ENUM8: case ENUM16: return Schema.FieldType.STRING; + case BOOL: + return Schema.FieldType.BOOLEAN; } // not possible, errorprone checks for exhaustive switch @@ -163,8 +165,10 @@ public enum TypeName { UINT16, UINT32, UINT64, - // Composite types - ARRAY + // Composite type + ARRAY, + // Primitive type + BOOL } /** @@ -203,6 +207,7 @@ public abstract static class ColumnType implements Serializable { public static final ColumnType UINT16 = ColumnType.of(TypeName.UINT16); public static final ColumnType UINT32 = ColumnType.of(TypeName.UINT32); public static final ColumnType UINT64 = ColumnType.of(TypeName.UINT64); + public static final ColumnType BOOL = ColumnType.of(TypeName.BOOL); // ClickHouse doesn't allow nested nullables, so boolean flag is enough public abstract boolean nullable(); @@ -308,6 +313,8 @@ public static Object parseDefaultExpression(ColumnType columnType, String value) return Long.valueOf(value); case UINT64: return Long.valueOf(value); + case BOOL: + return Boolean.valueOf(value); default: throw new UnsupportedOperationException("Unsupported type: " + columnType); } diff --git a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj index fca06e943f8b..830499d3207b 100644 --- a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj +++ b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj @@ -97,6 +97,7 @@ TOKEN : | < AS : "AS" > | < COMMA : "," > | < EQ : "=" > + | < BOOL : "BOOL" > } public ColumnType columnType() : @@ -189,6 +190,7 @@ private TypeName typeName() : | { return TypeName.UINT16; } | { return TypeName.UINT32; } | { return TypeName.UINT64; } + | { return TypeName.BOOL; } ) } diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java index d557e33f5b84..8d4f9ab041c2 100644 --- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java @@ -159,7 +159,8 @@ public void testPrimitiveTypes() throws Exception { Schema.Field.of("f14", FieldType.STRING), Schema.Field.of("f15", FieldType.STRING), Schema.Field.of("f16", FieldType.BYTES), - Schema.Field.of("f17", FieldType.logicalType(FixedBytes.of(3)))); + Schema.Field.of("f17", FieldType.logicalType(FixedBytes.of(3))), + Schema.Field.of("f18", FieldType.BOOLEAN)); Row row1 = Row.withSchema(schema) .addValue(new DateTime(2030, 10, 1, 0, 0, 0, DateTimeZone.UTC)) @@ -180,6 +181,7 @@ public void testPrimitiveTypes() throws Exception { .addValue("qwe") .addValue(new byte[] {'a', 's', 'd'}) .addValue(new byte[] {'z', 'x', 'c'}) + .addValue(true) .build(); executeSql( @@ -201,7 +203,8 @@ public void testPrimitiveTypes() throws Exception { + "f14 Enum16('abc' = -1, 'cde' = -2)," + "f15 FixedString(3)," + "f16 FixedString(3)," - + "f17 FixedString(3)" + + "f17 FixedString(3)," + + "f18 Bool" + ") ENGINE=Log"); pipeline.apply(Create.of(row1).withRowSchema(schema)).apply(write("test_primitive_types")); @@ -229,6 +232,7 @@ public void testPrimitiveTypes() throws Exception { assertArrayEquals(new byte[] {'q', 'w', 'e'}, rs.getBytes("f15")); assertArrayEquals(new byte[] {'a', 's', 'd'}, rs.getBytes("f16")); assertArrayEquals(new byte[] {'z', 'x', 'c'}, rs.getBytes("f17")); + assertEquals("true", rs.getString("f18")); } } @@ -250,7 +254,8 @@ public void testArrayOfPrimitiveTypes() throws Exception { Schema.Field.of("f11", FieldType.array(FieldType.INT64)), Schema.Field.of("f12", FieldType.array(FieldType.INT64)), Schema.Field.of("f13", FieldType.array(FieldType.STRING)), - Schema.Field.of("f14", FieldType.array(FieldType.STRING))); + Schema.Field.of("f14", FieldType.array(FieldType.STRING)), + Schema.Field.of("f15", FieldType.array(FieldType.BOOLEAN))); Row row1 = Row.withSchema(schema) .addArray( @@ -272,6 +277,7 @@ public void testArrayOfPrimitiveTypes() throws Exception { .addArray(12L, 13L) .addArray("abc", "cde") .addArray("cde", "abc") + .addArray(true, false) .build(); executeSql( @@ -290,7 +296,8 @@ public void testArrayOfPrimitiveTypes() throws Exception { + "f11 Array(UInt32)," + "f12 Array(UInt64)," + "f13 Array(Enum8('abc' = 1, 'cde' = 2))," - + "f14 Array(Enum16('abc' = -1, 'cde' = -2))" + + "f14 Array(Enum16('abc' = -1, 'cde' = -2))," + + "f15 Array(Bool)" + ") ENGINE=Log"); pipeline @@ -317,6 +324,7 @@ public void testArrayOfPrimitiveTypes() throws Exception { assertEquals("[12,13]", rs.getString("f12")); assertEquals("['abc','cde']", rs.getString("f13")); assertEquals("['cde','abc']", rs.getString("f14")); + assertEquals("[true,false]", rs.getString("f15")); } } diff --git a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java index bcc2c5c287ad..174761403471 100644 --- a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java +++ b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java @@ -88,6 +88,11 @@ public void testParseUInt64() { assertEquals(ColumnType.UINT64, ColumnType.parse("UInt64")); } + @Test + public void testParseBool() { + assertEquals(ColumnType.BOOL, ColumnType.parse("Bool")); + } + @Test public void testParseString() { assertEquals(ColumnType.STRING, ColumnType.parse("String"));