-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#21431 Pubsub dynamic topic destinations #26063
#21431 Pubsub dynamic topic destinations #26063
Conversation
Assigning reviewers. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
ba4dd0b
to
e11a5ac
Compare
Run Spotless PreCommit |
96ed323
to
39b56fc
Compare
R: @Naireen |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
R: @scwhittle |
public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription"; | ||
public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override"; | ||
public static final String PUBSUB_TIMESTAMP_ATTRIBUTE = "pubsub_timestamp_label"; | ||
public static final String PUBSUB_TOPIC = "pubsub_topic"; | ||
public static final String PUBSUB_TOPIC_OVERRIDE = "pubsub_topic_runtime_override"; | ||
|
||
public static final String PUBSUB_DYNAMIC_DESTINATIONS = "pubsub_with_dynamic_destinations"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: pubsub_dynamic_destinations to match enum? ie remove "with"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has to match the string on the Dataflow backend, which is pubsub_with_dynamic_destinations
new PipelineVisitor.Defaults() { | ||
|
||
@Override | ||
public CompositeBehavior enterCompositeTransform(Node node) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should only be a composite or a primitive right?
can you simplify this test and remove the unnecessary override here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
new PipelineVisitor.Defaults() { | ||
|
||
@Override | ||
public CompositeBehavior enterCompositeTransform(Node node) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; | ||
import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
||
@SuppressWarnings({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is new, can you avoid supressing these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the nullness. Raw types suppression has to stay, as it's due to the base class
} | ||
} | ||
|
||
public static class Factory implements SinkFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be private or package visible? or perhaps even inlined in factories() above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -1350,23 +1385,15 @@ public void startBundle(StartBundleContext c) throws IOException { | |||
@ProcessElement | |||
public void processElement(ProcessContext c) throws IOException, SizeLimitExceededException { | |||
PubsubMessage message = getFormatFn().apply(c.element()); | |||
int messageSize = validateAndGetPubsubMessageSize(message); | |||
if (messageSize > maxPublishBatchByteSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep this as sanity check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, done
} else { | ||
Map<TopicPath, List<OutgoingMessage>> messagesPerTopic = Maps.newHashMap(); | ||
for (OutgoingMessage message : messages) { | ||
TopicPath topicPath = PubsubClient.topicPathFromPath(message.topic()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could do this translation during publish loop so it's per topic instead of per message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -495,6 +604,10 @@ static class PubsubSink extends PTransform<PCollection<byte[]>, PDone> { | |||
this.outer = outer; | |||
} | |||
|
|||
boolean isDynamic() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove? seems PubsubUnboundedSink would be used if this was true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -133,6 +132,11 @@ | |||
<td>Sets the behavior of reusing objects.</td> | |||
<td>Default: <code>false</code></td> | |||
</tr> | |||
<tr> | |||
<td><code>operatorChaining</code></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -133,6 +132,11 @@ | |||
<td>Sets the behavior of reusing objects.</td> | |||
<td>Default: <code>false</code></td> | |||
</tr> | |||
<tr> | |||
<td><code>operator_chaining</code></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this file still seems there
6660fdc
to
b394cf4
Compare
private final String timestampLabel; | ||
private final String idLabel; | ||
private final StreamingModeExecutionContext context; | ||
// Function used to convert PCollection elements to PubsubMessage objects. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public ByteString getDataFromMessage(PubsubMessage formatted, ByteStringOutputStream stream) | ||
throws IOException { | ||
Pubsub.PubsubMessage.Builder pubsubMessageBuilder = | ||
Pubsub.PubsubMessage.newBuilder().setData(ByteString.copyFrom(formatted.getPayload())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should separately look into switching to ByteString within beam's PubsubMessage.
@Override | ||
public long add(WindowedValue<PubsubMessage> data) throws IOException { | ||
String dataTopic = | ||
org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import static?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
import static org.junit.Assert.assertEquals; | ||
import static org.mockito.Mockito.when; | ||
|
||
import avro.shaded.com.google.common.collect.Lists; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use shaded version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if (!pubsubMessages.isEmpty() | ||
&& bytes + message.message().getData().size() > publishBatchBytes) { | ||
&& bytes + message.getMessage().getData().size() > publishBatchBytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this limit is currently across all topics but could be per-topic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another option would be to change so we group by topic+shard instead of just shard
that would keep this simple here, and avoid sequential blocking publishes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added grouping by topic+shard
// dynamic destinations. | ||
return input | ||
.apply( | ||
"WithDynamicKeys", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: WithDynamicTopics? DynamicDestinations?
keys seems implementation detail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -133,6 +132,11 @@ | |||
<td>Sets the behavior of reusing objects.</td> | |||
<td>Default: <code>false</code></td> | |||
</tr> | |||
<tr> | |||
<td><code>operator_chaining</code></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this file still seems there
switch (input.isBounded()) { | ||
case BOUNDED: | ||
input.apply( | ||
pubsubMessages.apply( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enforce no dynamic destinations in batch? Or I think the BoundedWriter needs to group output by topic as publish is just all messages to single topic currently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point - fixed BoundedWriter
} | ||
|
||
public void testDynamicTopics(boolean isBounded) throws IOException { | ||
List<OutgoingMessage> outgoing = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: name expectedOutgoing, it wasn't obvious that the test factory for publish took these as expectations when created. I think naming would help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -1355,9 +1377,10 @@ public void populateDisplayData(DisplayData.Builder builder) { | |||
* <p>Public so can be suppressed by runners. | |||
*/ | |||
public class PubsubBoundedWriter extends DoFn<PubsubMessage, Void> { | |||
private transient List<OutgoingMessage> output; | |||
private transient Map<PubsubTopic, List<OutgoingMessage>> output; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could have a single map by topic, reduces lookup and prevents possibility of entry in one map and not the other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pubsubClient.close(); | ||
pubsubClient = null; | ||
} | ||
|
||
private void publish() throws IOException { | ||
PubsubTopic topic = getTopicProvider().get(); | ||
for (Map.Entry<PubsubTopic, List<OutgoingMessage>> entry : output.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use visitor instead of iterator?
forEach(e -> publish(e.getKey(), e.getValue());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately this does not work for function references that throw exceptions. I would have to wrap a try/catch inside the lambda (i.e. catch the IOException and rethrow as a RuntimeException), which gets ugly.
publish(pubsubTopic, currentTopicOutput); | ||
currentTopicOutput.clear(); | ||
currentTopicOutputBytes = 0; | ||
currentOutputBytes.put(pubsubTopic, 0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think the single map would make this simpler to since you would be updating the value in the entry directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// A message's attributes can be null. | ||
private static final Coder<Map<String, String>> ATTRIBUTES_CODER = | ||
NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); | ||
// A message's messageId cannot be null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"cannot be null" is inconsistent with using NullableCoder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@Override | ||
public PubsubMessage decode(InputStream inStream) throws IOException { | ||
byte[] payload = PAYLOAD_CODER.decode(inStream); | ||
Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add Nullable annotation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
/** A coder for PubsubMessage including the topic from the PubSub server. */ | ||
@SuppressWarnings({ | ||
"nullness" // TODO(https://github.com/apache/beam/issues/20497) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Run Java PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with one comment
} | ||
|
||
@FinishBundle | ||
public void finishBundle() throws IOException { | ||
publish(); | ||
output = null; | ||
currentOutputBytes = null; | ||
pubsubClient.close(); | ||
pubsubClient = null; | ||
} | ||
|
||
private void publish() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline this in finishBundle? otherwise the topic specific publish should be called
7809f08
to
1cb9649
Compare
1cb9649
to
ca67490
Compare
…ent in apache/beam#26063. PiperOrigin-RevId: 528517450
In authoring #30604 some test from this PR failed. Specifically something that appears to expect client-side overrides (aka the v1 path) to have happened when using v2. Does that make sense to you all? |
} | ||
} | ||
}); | ||
assertTrue(sawPubsubOverride.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should never be using client-side overrides for v2, where we want it all to happen in the service. Is there necessary functionality in the override? It happens after the v2 pipeline is created anyhow I think...
If you are talking about Dataflow runner v2, those overrides were
implemented service side not client side.
…On Mon, Mar 11, 2024 at 4:53 PM Kenn Knowles ***@***.***> wrote:
In authoring #30604 <#30604> some test
from this PR failed. Specifically something that appears to expect
client-side overrides (aka the v1 path) to have happened when using v2.
Does that make sense to you all?
—
Reply to this email directly, view it on GitHub
<#26063 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFAYJVMKPVSUBYVUWWSVE53YXZGZZAVCNFSM6AAAAAAWPIJDGCVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSOBZGY2TSMRQGU>
.
You are receiving this because you modified the open/close state.Message
ID: ***@***.***>
|
No description provided.