Skip to content

Commit

Permalink
feat: alter the schema registry of table with connector (#15025)
Browse files Browse the repository at this point in the history
Co-authored-by: Eric Fu <[email protected]>
  • Loading branch information
Rossil2012 and fuyufjh authored Mar 8, 2024
1 parent 1f8ffa1 commit 3f102e1
Show file tree
Hide file tree
Showing 14 changed files with 371 additions and 163 deletions.
71 changes: 40 additions & 31 deletions e2e_test/schema_registry/alter_sr.slt
Original file line number Diff line number Diff line change
Expand Up @@ -16,59 +16,68 @@ FORMAT PLAIN ENCODE PROTOBUF(
statement ok
CREATE MATERIALIZED VIEW mv_user AS SELECT * FROM src_user;

# Changing type is not allowed
statement error Feature is not yet implemented: this altering statement will drop columns, which is not supported yet: \(city: character varying\)
ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.UserWithNewType'
);

# Changing format/encode is not allowed
statement error Feature is not yet implemented: the original definition is FORMAT Plain ENCODE Protobuf, and altering them is not supported yet
ALTER SOURCE src_user FORMAT NATIVE ENCODE PROTOBUF(
statement ok
CREATE TABLE t_user WITH (
connector = 'kafka',
topic = 'sr_pb_test',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
)
FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.User'
);

statement ok
ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.UserWithMoreFields'
);
statement error
SELECT age FROM mv_user;

# Dropping columns is not allowed
statement error Feature is not yet implemented: this altering statement will drop columns, which is not supported yet: \(age: integer\)
statement error
SELECT age FROM t_user;

# Push more events with extended fields
system ok
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields

sleep 5s

# Refresh source schema
statement ok
ALTER SOURCE src_user FORMAT PLAIN ENCODE PROTOBUF(
schema.registry = 'http://message_queue:8081',
message = 'test.User'
);

statement ok
CREATE MATERIALIZED VIEW mv_more_fields AS SELECT * FROM src_user;
CREATE MATERIALIZED VIEW mv_user_more AS SELECT * FROM src_user;

# Refresh table schema
statement ok
ALTER TABLE t_user REFRESH SCHEMA;

query IIII
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM mv_user_more;
----
25 4 0 10

# Push more events with extended fields
system ok
python3 e2e_test/schema_registry/pb.py "message_queue:29092" "http://message_queue:8081" "sr_pb_test" 5 user_with_more_fields

sleep 10s
sleep 5s

query I
SELECT COUNT(*) FROM mv_user;
query IIII
SELECT COUNT(*), MAX(age), MIN(age), SUM(age) FROM t_user;
----
25

statement error
SELECT SUM(age) FROM mv_user;
30 4 0 10

query III
SELECT COUNT(*), MAX(age), MIN(age) FROM mv_more_fields;
----
25 4 0
statement ok
DROP MATERIALIZED VIEW mv_user_more;

statement ok
DROP MATERIALIZED VIEW mv_user;
DROP TABLE t_user;

statement ok
DROP MATERIALIZED VIEW mv_more_fields;
DROP MATERIALIZED VIEW mv_user;

statement ok
DROP SOURCE src_user;
27 changes: 9 additions & 18 deletions e2e_test/schema_registry/pb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from protobuf import user_pb2
from google.protobuf.source_context_pb2 import SourceContext
import sys
import importlib
from google.protobuf.source_context_pb2 import SourceContext
from confluent_kafka import Producer
from confluent_kafka.serialization import (
SerializationContext,
Expand All @@ -26,7 +26,7 @@ def get_user(i):
)

def get_user_with_more_fields(i):
return user_pb2.UserWithMoreFields(
return user_pb2.User(
id=i,
name="User_{}".format(i),
address="Address_{}".format(i),
Expand All @@ -36,16 +36,6 @@ def get_user_with_more_fields(i):
age=i,
)

def get_user_with_new_type(i):
return user_pb2.UserWithNewType(
id=i,
name="User_{}".format(i),
address="Address_{}".format(i),
city=i,
gender=user_pb2.MALE if i % 2 == 0 else user_pb2.FEMALE,
sc=SourceContext(file_name="source/context_{:03}.proto".format(i)),
)

def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_user_fn, pb_message):
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
serializer = ProtobufSerializer(
Expand All @@ -69,7 +59,7 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u


if __name__ == "__main__":
if len(sys.argv) < 5:
if len(sys.argv) < 6:
print("pb.py <brokerlist> <schema-registry-url> <topic> <num-records> <pb_message>")
exit(1)

Expand All @@ -79,10 +69,11 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u
num_records = int(sys.argv[4])
pb_message = sys.argv[5]

user_pb2 = importlib.import_module(f'protobuf.{pb_message}_pb2')

all_pb_messages = {
'user': (get_user, user_pb2.User),
'user_with_more_fields': (get_user_with_more_fields, user_pb2.UserWithMoreFields),
'user_with_new_type': (get_user_with_new_type, user_pb2.UserWithNewType),
'user': get_user,
'user_with_more_fields': get_user_with_more_fields,
}

assert pb_message in all_pb_messages, f'pb_message must be one of {list(all_pb_messages.keys())}'
Expand All @@ -91,7 +82,7 @@ def send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, get_u
producer_conf = {"bootstrap.servers": broker_list}

try:
send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, *all_pb_messages[pb_message])
send_to_kafka(producer_conf, schema_registry_conf, topic, num_records, all_pb_messages[pb_message], user_pb2.User)
except Exception as e:
print("Send Protobuf data to schema registry and kafka failed {}", e)
exit(1)
19 changes: 0 additions & 19 deletions e2e_test/schema_registry/protobuf/user.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,3 @@ enum Gender {
MALE = 0;
FEMALE = 1;
}

message UserWithMoreFields {
int32 id = 1;
string name = 2;
string address = 3;
string city = 4;
Gender gender = 5;
google.protobuf.SourceContext sc = 6;
int32 age = 7; // new field here
}

message UserWithNewType {
int32 id = 1;
string name = 2;
string address = 3;
int32 city = 4; // change the type from string to int32
Gender gender = 5;
google.protobuf.SourceContext sc = 6;
}
10 changes: 3 additions & 7 deletions e2e_test/schema_registry/protobuf/user_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions e2e_test/schema_registry/protobuf/user_with_more_fields.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
syntax = "proto3";

package test;

import "google/protobuf/source_context.proto";

message User {
int32 id = 1;
string name = 2;
string address = 3;
string city = 4;
Gender gender = 5;
google.protobuf.SourceContext sc = 6;
int32 age = 7; // new field here
}

enum Gender {
MALE = 0;
FEMALE = 1;
}
29 changes: 29 additions & 0 deletions e2e_test/schema_registry/protobuf/user_with_more_fields_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3f102e1

Please sign in to comment.