Skip to content

Commit

Permalink
[JBPM-10242] Allow the possibility of disabling linear search for rem…
Browse files Browse the repository at this point in the history
…oveJob and getTimerByName operations (#2441)

* Try to cancel timer always, and only search if timer cancellation fails

* Skipping list search completely if !searchIfFailed

* [JBPM-10242] Always skipping linear search

Unless explicilty stated with property org.jbpm.ejb.timer.linear.search

* [JBPM-10242] Disable linear search on condition

Setting org.jbpm.ejb.timer.disable.linear.search and
org.jbpm.ejb.timer.disable.linear.remove to true

* [JBPM-10242] Fixing integration tests

---------

Co-authored-by: krisv <[email protected]>
  • Loading branch information
fjtirado and krisv authored Oct 2, 2024
1 parent b00c8b1 commit 822a0bd
Showing 1 changed file with 73 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
Expand Down Expand Up @@ -114,6 +118,7 @@ public void executeTimerJob(Timer timer) {
try {
executeTimerJobInstance(timerJobInstance);
} catch (Exception e) {
logger.error("Error executing timer handle {}", timerJobInstance.getJobHandle(), e);
recoverTimerJobInstance(timerJob, timer, e);
}
}
Expand All @@ -139,7 +144,7 @@ else if (ejbTimerJob.getTimerJobInstance().getTrigger().hasNextFireTime() != nul
// because of the transaction, so we need to do this here.
tx = timerJobInstance -> {
logger.warn("Execution of time failed Interval Trigger failed. Skipping {}", timerJobInstance);
if (removeJob(timerJobInstance.getJobHandle(), null)) {
if (removeJob(timerJobInstance.getJobHandle(), timer)) {
internalSchedule(timerJobInstance);
} else {
logger.debug("Interval trigger {} was removed before rescheduling", timerJobInstance);
Expand Down Expand Up @@ -246,107 +251,98 @@ private Serializable removeTransientFields(Serializable info) {
return info;
}

public boolean removeJob(JobHandle jobHandle, Timer ejbTimer) {
private boolean disableLinearSearch(String suffix) {
return Boolean.getBoolean("org.jbpm.ejb.timer.disable.linear." + suffix);
}


public boolean removeJob(JobHandle jobHandle, Timer ejbTimer) {
EjbGlobalJobHandle ejbHandle = (EjbGlobalJobHandle) jobHandle;
if (useLocalCache) {
boolean removedFromCache = localCache.remove(ejbHandle.getUuid()) != null;
logger.debug("Job handle {} is {} removed from cache ", jobHandle, removedFromCache ? "" : "not" );
logger.debug("Job handle {} is {} removed from cache ", jobHandle, removedFromCache ? "" : "not");
}

if (ejbTimer != null) {
try {
ejbTimer.cancel();
return true;
} catch (Throwable e) {
logger.debug("Timer cancel error due to {}", e.getMessage());
return false;
}
return cancelTimer(ejbTimer, ejbHandle);
}

// small speed improvement using the ejb serializable info handler
GlobalJpaTimerJobInstance timerJobInstance = (GlobalJpaTimerJobInstance) ejbHandle.getTimerJobInstance();
if (timerJobInstance != null) {
Object ejbTimerHandle = timerJobInstance.getTimerInfo();
if(ejbTimerHandle instanceof TimerHandle) {
Object ejbTimerHandle = timerJobInstance.getTimerInfo();
if (ejbTimerHandle instanceof TimerHandle) {
try {
((TimerHandle) ejbTimerHandle).getTimer().cancel();
} catch (Throwable e) {
logger.debug("Timer cancel error due to {}", e.getMessage());
return true;
} catch (Exception ex) {
logger.warn("Cancelling timer failed for handle {}", ejbHandle, ex);
return false;
}
return true;
}
} else {
logger.warn("No timerJobInstance available for {}", ejbHandle);
}
logger.debug("No valid TimerJob instance {} available for Job handle {}", timerJobInstance, ejbHandle);
return linearSearch("remove", ejbHandle.getUuid(),
(timer, job) -> cancelTimer(timer, (EjbGlobalJobHandle) job.getJobHandle())).orElse(false);
}

for (Timer timer : timerService.getTimers()) {
try {
Serializable info = timer.getInfo();
if (info instanceof EjbTimerJob) {
EjbTimerJob job = (EjbTimerJob) info;

EjbGlobalJobHandle handle = (EjbGlobalJobHandle) job.getTimerJobInstance().getJobHandle();
if (handle.getUuid().equals(ejbHandle.getUuid())) {
logger.debug("Job handle {} does match timer and is going to be canceled", jobHandle);

try {
timer.cancel();
} catch (Throwable e) {
logger.debug("Timer cancel error due to {}", e.getMessage());
return false;
}
return true;
}
}
} catch (NoSuchObjectLocalException e) {
logger.debug("Timer {} has already expired or was canceled ", timer);
}
}
logger.debug("Job handle {} does not match any timer on {} scheduler service", jobHandle, this);
return false;
}
public TimerJobInstance getTimerByName(String jobName) {
if (useLocalCache) {
TimerJobInstance found = localCache.get(jobName);
if (found != null) {
logger.debug("Found timer job instance with name {} in cache, returning {}", jobName, found);
return found;
}
logger.debug("Timer Job Instance with name {} not found in cache", jobName);
}
return linearSearch("search", jobName, (timer, job) -> {
if (useLocalCache && job != null) {
localCache.putIfAbsent(jobName, job);
}
return job;
}).orElse(null);
}


private boolean cancelTimer(Timer timer, EjbGlobalJobHandle ejbHandle) {
try {
timer.cancel();
return true;
} catch (Exception ex) {
logger.warn("Cancelling timer failed for handle {}", ejbHandle, ex);
return false;
}
}

public TimerJobInstance getTimerByName(String jobName) {
if (useLocalCache) {
if (localCache.containsKey(jobName)) {
logger.debug("Found job {} in cache returning", jobName);
return localCache.get(jobName);
}
}
TimerJobInstance found = null;

for (Timer timer : timerService.getTimers()) {
try {
Serializable info = timer.getInfo();
if (info instanceof EjbTimerJob) {
EjbTimerJob job = (EjbTimerJob) info;

EjbGlobalJobHandle handle = (EjbGlobalJobHandle) job.getTimerJobInstance().getJobHandle();

if (handle.getUuid().equals(jobName)) {
found = handle.getTimerJobInstance();
if (useLocalCache) {
localCache.putIfAbsent(jobName, found);
}
logger.debug("Job {} does match timer and is going to be returned {}", jobName, found);

break;
}
}
} catch (NoSuchObjectLocalException e) {
logger.debug("Timer info for {} was not found ", timer);
private <T> Optional<T> linearSearch(String suffix, String uuid, BiFunction<Timer, TimerJobInstance, T> function) {
if (disableLinearSearch(suffix)) {
logger.warn("Skipping linear search to {} UUID {}", suffix, uuid);
} else {
logger.info("Searching UUID {} on {} scheduler service", uuid, this);
for (Timer timer : timerService.getTimers()) {
try {
Serializable info = timer.getInfo();
if (info instanceof EjbTimerJob) {
EjbTimerJob job = (EjbTimerJob) info;
EjbGlobalJobHandle handle = (EjbGlobalJobHandle) job.getTimerJobInstance().getJobHandle();
if (handle.getUuid().equals(uuid)) {
logger.debug("UIID {} does match timer {} and handle {}", uuid, timer,
job.getTimerJobInstance());
return Optional.ofNullable(function.apply(timer, job.getTimerJobInstance()));
}
}
} catch (NoSuchObjectLocalException e) {
logger.info("Info for timer {} is not there ", timer, e);
}
}
}

return found;
}
logger.info("UUID {} does not match any timer on {} scheduler service", uuid, this);
}
return Optional.empty();
}

public void evictCache(JobHandle jobHandle) {
String jobName = ((EjbGlobalJobHandle) jobHandle).getUuid();
logger.debug("Invalidate job {} with job name {} in cache", jobName, localCache.remove(jobName));
logger.debug("Invalidate job {} with job name {} in cache", localCache.remove(jobName), jobName);
}

}

0 comments on commit 822a0bd

Please sign in to comment.