Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][fn] Add API endpoint for function-worker for liveness check with configurable flag #358

Open
wants to merge 6 commits into
base: 3.1_ds
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
@Slf4j
public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWorkerService> {

private boolean isFunctionWorkerAlive = true;

public FunctionsImpl(Supplier<PulsarWorkerService> workerServiceSupplier) {
super(workerServiceSupplier, Function.FunctionDetails.ComponentType.FUNCTION);
}
Expand Down Expand Up @@ -693,6 +695,8 @@ public void updateFunctionOnWorkerLeader(final String tenant,
try {
functionMetaDataManager.updateFunctionOnLeader(functionMetaData, delete);
} catch (IllegalStateException e) {
log.error("Function worker status has been set to false due to ProducerFencedException.");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens here if we get an IllegalStateException which is not a ProducerFencedException? Maybe it would be better to catch the specific exception type.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The block of code has been moved to a location where we can check if the cause is a ProducerFencedException.

this.isFunctionWorkerAlive = false;
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
} catch (IllegalArgumentException e) {
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
Expand Down Expand Up @@ -788,4 +792,8 @@ private Function.FunctionDetails validateUpdateRequestParams(final String tenant
}
}
}

public boolean checkLiveliness() {
return this.isFunctionWorkerAlive;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionConfig;
Expand Down Expand Up @@ -429,4 +430,18 @@ public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenan
functions().updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream,
delete, uri.getRequestUri(), authParams());
}

@GET
@Path("/live")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think /healthz is the common convention for this endpoint name.
Also, I think it would make more sense to put it directly under the root path instead of under /admin/v3/functions.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API has been updated to include a /healthz endpoint.
We cannot place this directly under the root path, as the root path does not have access to the necessary flag. The flag is available at the function level instead. Additionally, v2 does not contain any logic that could trigger a ProducerFencedException; this issue is specific to v3.

public Response checkLiveliness() {
boolean isAlive = functions().checkLiveliness();
if (!isAlive) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
.entity("There is IllegalStateException, Service is not running. Need to restart.")
.build();
} else {
return Response.ok("There is no IllegalStateException, Service is running.")
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,6 @@ FunctionInstanceStatusData getFunctionInstanceStatus(String tenant,
void reloadBuiltinFunctions(AuthenticationParameters authParams) throws IOException;

List<FunctionDefinition> getBuiltinFunctions(AuthenticationParameters authParams);

boolean checkLiveliness();
}
Loading