Skip to content

Commit

Permalink
[YAML] - Pick file descriptor based on messageName
Browse files Browse the repository at this point in the history
  • Loading branch information
ffernandez92 committed Feb 14, 2024
1 parent 9a4e114 commit 963a0ad
Showing 1 changed file with 29 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult;
Expand Down Expand Up @@ -72,7 +73,7 @@ public class ProtoByteUtils {
* @return The Beam Schema representing the Protocol Buffer message.
*/
public static Schema getBeamSchemaFromProto(String fileDescriptorPath, String messageName) {
ProtoSchemaInfo dpd = getProtoDomain(fileDescriptorPath);
ProtoSchemaInfo dpd = getProtoDomain(fileDescriptorPath, messageName);
ProtoDomain protoDomain = dpd.getProtoDomain();
return ProtoDynamicMessageSchema.forDescriptor(protoDomain, messageName).getSchema();
}
Expand Down Expand Up @@ -146,7 +147,7 @@ public Row apply(byte[] input) {
public static SerializableFunction<byte[], Row> getProtoBytesToRowFunction(
String fileDescriptorPath, String messageName) {

ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath);
ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath, messageName);
ProtoDomain protoDomain = dynamicProtoDomain.getProtoDomain();
@SuppressWarnings("unchecked")
ProtoDynamicMessageSchema<DynamicMessage> protoDynamicMessageSchema =
Expand Down Expand Up @@ -192,7 +193,7 @@ public byte[] apply(Row input) {

public static SerializableFunction<Row, byte[]> getRowToProtoBytes(
String fileDescriptorPath, String messageName) {
ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath);
ProtoSchemaInfo dynamicProtoDomain = getProtoDomain(fileDescriptorPath, messageName);
ProtoDomain protoDomain = dynamicProtoDomain.getProtoDomain();
@SuppressWarnings("unchecked")
ProtoDynamicMessageSchema<DynamicMessage> protoDynamicMessageSchema =
Expand All @@ -213,16 +214,38 @@ public byte[] apply(Row input) {
* file.
*
* @param fileDescriptorPath The path to the File Descriptor Set file.
* @param messageName The name of the message type for which the descriptor is desired.
* @return ProtoSchemaInfo containing the associated ProtoDomain and File Name.
* @throws RuntimeException if an error occurs during schema retrieval.
*/
private static ProtoSchemaInfo getProtoDomain(String fileDescriptorPath) {
private static ProtoSchemaInfo getProtoDomain(String fileDescriptorPath, String messageName) {
byte[] from = getFileAsBytes(fileDescriptorPath);
try {
List<String> messageElements = Splitter.on('.').splitToList(messageName);
String messageTypeByName = messageElements.get(messageElements.size() - 1);

DescriptorProtos.FileDescriptorSet descriptorSet =
DescriptorProtos.FileDescriptorSet.parseFrom(from);
return new ProtoSchemaInfo(
descriptorSet.getFile(0).getName(), ProtoDomain.buildFrom(descriptorSet));

ProtoDomain protoDomain = ProtoDomain.buildFrom(descriptorSet);
List<String> fileProtoNames = new ArrayList<>();

descriptorSet
.getFileList()
.forEach(fileDescriptorProto -> fileProtoNames.add(fileDescriptorProto.getName()));

String fullName =
fileProtoNames.stream()
.filter(
name ->
protoDomain.getFileDescriptor(name).findMessageTypeByName(messageTypeByName)
!= null)
.findFirst()
.orElseThrow(
() ->
new NullPointerException("Couldn't locate the proto for that message name"));

return new ProtoSchemaInfo(fullName, protoDomain);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit 963a0ad

Please sign in to comment.