Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX_LOGGING] add some debug lines and throwable catch to avoid data loss #2423

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@
import com.fasterxml.jackson.databind.SerializationFeature;

/**
* Kafka implementation of EventEmitter that simply pushes out data to several Kafka topics depending on InstanceView type.
* Kafka implementation of EventEmitter that simply pushes out data to several Kafka topics depending on InstanceView type.
*
* Expects following parameters to configure itself - via system properties
* <ul>
* <li>org.kie.jbpm.event.emitters.kafka.date_format - date and time format to be sent to Kafka - default format is yyyy-MM-dd'T'HH:mm:ss.SSSZ</li>
* <li>org.kie.jbpm.event.emitters.kafka.bootstrap.servers - Kafka server ip, default is localhost:9092</li>
* <li>org.kie.jbpm.event.emitters.kafka.client.id - Kafka client id</li>
* <li>org.kie.jbpm.event.emitters.kafka.acks - Kafka acknowledge policy, check <a href="http://kafka.apache.org/documentation.html#producerconfigs">Kafka documentation</a></li>
* <li>org.kie.jbpm.event.emitters.kafka.topic.<processes|tasks|cases>. Topic name for subscribing to these events. Defaults are "jbpm-<processes|tasks|cases>-events"</li>
* </ul>
* <li>org.kie.jbpm.event.emitters.kafka.date_format - date and time format to be sent to Kafka - default format is yyyy-MM-dd'T'HH:mm:ss.SSSZ</li>
* <li>org.kie.jbpm.event.emitters.kafka.bootstrap.servers - Kafka server ip, default is localhost:9092</li>
* <li>org.kie.jbpm.event.emitters.kafka.client.id - Kafka client id</li>
* <li>org.kie.jbpm.event.emitters.kafka.acks - Kafka acknowledge policy, check <a href="http://kafka.apache.org/documentation.html#producerconfigs">Kafka documentation</a></li>
* <li>org.kie.jbpm.event.emitters.kafka.topic.<processes|tasks|cases>. Topic name for subscribing to these events. Defaults are "jbpm-<processes|tasks|cases>-events"</li>
* </ul>
*/
public class KafkaEventEmitter implements EventEmitter {

Expand All @@ -62,7 +62,7 @@ public class KafkaEventEmitter implements EventEmitter {
protected static final String KAFKA_EMITTER_PREFIX = "org.kie.jbpm.event.emitters.kafka.";

private ObjectMapper mapper;

private KafkaSender sender;

private Producer<String, byte[]> producer;
Expand All @@ -73,10 +73,10 @@ public KafkaEventEmitter() {

KafkaEventEmitter(Producer<String, byte[]> producer) {
this.producer = producer;
this.sender = Boolean.getBoolean(KAFKA_EMITTER_PREFIX+"sync") ? this::sendSync : this::sendAsync;
mapper = new ObjectMapper()
this.sender = Boolean.getBoolean(KAFKA_EMITTER_PREFIX + "sync") ? this::sendSync : this::sendAsync;
this.mapper = new ObjectMapper()
.setDateFormat(new SimpleDateFormat(System.getProperty(
KAFKA_EMITTER_PREFIX+"date_format", System.getProperty(
KAFKA_EMITTER_PREFIX + "date_format", System.getProperty(
"org.kie.server.json.date_format",
"yyyy-MM-dd'T'HH:mm:ss.SSSZ"))))
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
Expand All @@ -95,59 +95,69 @@ public void apply(Collection<InstanceView<?>> data) {
}

for (InstanceView<?> view : data) {
String processId;
long processInstanceId;
String type;
String topic;
if (view instanceof ProcessInstanceView) {
ProcessInstanceView processInstanceView = (ProcessInstanceView) view;
topic = "processes";
type = "process";
processInstanceId = processInstanceView.getId();
processId = processInstanceView.getProcessId();
} else if (view instanceof TaskInstanceView) {
TaskInstanceView taskInstanceView = (TaskInstanceView) view;
topic = "tasks";
type = "task";
processInstanceId = taskInstanceView.getProcessInstanceId();
processId = taskInstanceView.getProcessId();
} else if (view instanceof CaseInstanceView) {
CaseInstanceView caseInstanceView = (CaseInstanceView) view;
topic = "cases";
type = "case";
processInstanceId = caseInstanceView.getId();
processId = caseInstanceView.getCaseDefinitionId();
} else {
logger.warn("Unsupported view type {}", view.getClass());
continue;
try {
String processId;
long processInstanceId;
String type;
String topic;
if (view instanceof ProcessInstanceView) {
ProcessInstanceView processInstanceView = (ProcessInstanceView) view;
topic = "processes";
type = "process";
processInstanceId = processInstanceView.getId();
processId = processInstanceView.getProcessId();
} else if (view instanceof TaskInstanceView) {
TaskInstanceView taskInstanceView = (TaskInstanceView) view;
topic = "tasks";
type = "task";
processInstanceId = taskInstanceView.getProcessInstanceId();
processId = taskInstanceView.getProcessId();
} else if (view instanceof CaseInstanceView) {
CaseInstanceView caseInstanceView = (CaseInstanceView) view;
topic = "cases";
type = "case";
processInstanceId = caseInstanceView.getId();
processId = caseInstanceView.getCaseDefinitionId();
} else {
logger.warn("Unsupported view type {}", view.getClass());
continue;
}
sender.send(topic, type, processId, processInstanceId, view);
} catch (Throwable th) {
logError(view, th);
}
sender.send(topic, type, processId, processInstanceId, view);

}
}

private interface KafkaSender {
void send (String topic, String type, String processId, long processInstanceId, InstanceView<?> view);
void send(String topic, String type, String processId, long processInstanceId, InstanceView<?> view);
}

private byte[] viewToPayload(String type, String processId, long processInstanceId, InstanceView<?> view) throws JsonProcessingException {
return mapper.writeValueAsBytes(new CloudEventSpec1(type, String.format(SOURCE_FORMATTER, processId, processInstanceId), view));
}

private void sendAsync(String topic, String type, String processId, long processInstanceId, InstanceView<?> view) {
logger.debug("Sending async view to topic {} type {} processId {} pid {} and view {}", topic, type, processId, processInstanceId, view);
try {
producer.send(new ProducerRecord<>(getTopic(topic), viewToPayload(type, processId, processInstanceId, view)), (m, e) -> {
if (e != null) {
logError(view, e);
} else {
logger.debug("Sucessfuly async view sent view to topic {} type {} processId {} pid {} and view {}", topic, type, processId, processInstanceId, view);
}
});
} catch (Exception e) {
} catch (Throwable e) {
logError(view, e);
}
}

private void sendSync(String topic, String type, String processId, long processInstanceId, InstanceView<?> view) {
try {
logger.debug("Sending sync view to topic {} type {} processId {} pid {} and view {}", topic, type, processId, processInstanceId, view);
producer.send(new ProducerRecord<>(getTopic(topic), viewToPayload(type, processId, processInstanceId, view))).get();
logger.debug("Sucessfuly sync view sent view to topic {} type {} processId {} pid {} and view {}", topic, type, processId, processInstanceId, view);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
} catch (InterruptedException e) {
Expand All @@ -162,11 +172,10 @@ private void sendSync(String topic, String type, String processId, long processI
}
}

private void logError(InstanceView<?> view, Exception e) {
private void logError(InstanceView<?> view, Throwable e) {
logger.error("Error publishing view {}", view, e);
}


@Override
public void drop(Collection<InstanceView<?>> data) {
// nothing to do
Expand All @@ -191,7 +200,7 @@ private static Producer<String, byte[]> getProducer() {

private static String getTopic(String eventType) {
return System.getProperty("org.kie.jbpm.event.emitters.kafka.topic." + eventType, "jbpm-" + eventType +
"-events");
"-events");
}

protected static Map<String, Object> getProducerProperties() {
Expand Down
Loading