Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][fn] Add API endpoint for function-worker for liveness check with configurable flag #358

Open
wants to merge 6 commits into
base: 3.1_ds
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class FunctionMetaDataManager implements AutoCloseable {
@Getter
private CompletableFuture<Void> isInitialized = new CompletableFuture<>();

private boolean isFunctionWorkerAlive = true;

public FunctionMetaDataManager(WorkerConfig workerConfig,
SchedulerManager schedulerManager,
PulsarClient pulsarClient,
Expand Down Expand Up @@ -243,6 +245,10 @@ public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaDat
needsScheduling = processUpdate(functionMetaData);
}
} catch (Exception e) {
if (e.getCause() instanceof PulsarClientException.ProducerFencedException) {
log.error("Function worker status has been set to false due to ProducerFencedException.");
this.isFunctionWorkerAlive = false;
}
log.error("Could not write into Function Metadata topic", e);
throw new IllegalStateException("Internal Error updating function at the leader", e);
}
Expand Down Expand Up @@ -500,4 +506,8 @@ private void initializeTailer() throws PulsarClientException {
this.functionMetaDataTopicTailer.start();
log.info("MetaData Manager Tailer started");
}

public boolean checkLiveliness() {
return this.isFunctionWorkerAlive;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1885,4 +1885,10 @@ protected ValidatableFunctionPackage getBuiltinFunctionPackage(String archive) {
}
return null;
}

@Override
public boolean checkLiveliness() {
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
return functionMetaDataManager.checkLiveliness();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionConfig;
Expand Down Expand Up @@ -429,4 +430,24 @@ public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenan
functions().updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream,
delete, uri.getRequestUri(), authParams());
}

@GET
@Path("/healthz")
@ApiOperation(value = "Run a healthCheck against the function worker")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Everything is OK"),
@ApiResponse(code = 503, message = "Service not available")
})
public Response healthCheck() {
boolean isAlive = functions().checkLiveliness();
if (!isAlive) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
.entity("There is IllegalStateException, Service is not running. Need to restart.")
.build();
} else {
return Response.status(Response.Status.OK)
.entity("There is no IllegalStateException, Service is running.")
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,6 @@ StreamingOutput downloadFunction(String tenant, String namespace, String compone
List<ConnectorDefinition> getListOfConnectors();

void reloadConnectors(AuthenticationParameters authParams);

boolean checkLiveliness();
}
Loading