diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java index 9659940e02bd..3a7c08897214 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java @@ -22,16 +22,25 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; @DoFn.UnboundedPerElement @SuppressWarnings({"rawtypes", "nullness"}) public class WriteToPulsarDoFn extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(WriteToPulsarDoFn.class); + private Producer producer; private PulsarClient client; private String clientUrl; private String topic; + private transient Exception sendException = null; + private transient long numSendFailures = 0; + WriteToPulsarDoFn(PulsarIO.Write transform) { this.clientUrl = transform.getClientUrl(); this.topic = transform.getTopic(); @@ -45,7 +54,27 @@ public void setup() throws PulsarClientException { @ProcessElement public void processElement(@Element byte[] messageToSend) throws Exception { - producer.send(messageToSend); + producer.sendAsync(messageToSend) + .whenComplete((mid, exception) -> { + if (exception == null) { + return; + } + + synchronized (WriteToPulsarDoFn.this) { + if (sendException == null) { + sendException = (Exception) exception; + } + numSendFailures++; + } + // don't log exception stacktrace here, exception will be propagated up. + LOG.warn("send failed : '{}'", exception.getMessage()); + }); + } + + @FinishBundle + public void finishBundle() throws IOException { + producer.flush(); + checkForFailures(); } @Teardown @@ -53,4 +82,20 @@ public void teardown() throws PulsarClientException { producer.close(); client.close(); } + + private synchronized void checkForFailures() throws IOException { + if (numSendFailures == 0) { + return; + } + + String msg = String.format( + "Pulsar Write DoFn: failed to send %d records (since last report)", numSendFailures); + + Exception e = sendException; + sendException = null; + numSendFailures = 0; + + LOG.warn(msg); + throw new IOException(msg, e); + } }