Skip to content

Commit

Permalink
feat(actuator): add readiness indicator for Zeebe & Operate connectio…
Browse files Browse the repository at this point in the history
…ns (#1880)

* feat(actuator): add readiness indicator for Zeebe & Operate connections

* modify readiness indicators
  • Loading branch information
chillleader authored Feb 12, 2024
1 parent a621dc2 commit b14e541
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ server.port=8080
management.server.port=9080
management.context-path=/actuator
management.endpoints.web.exposure.include=metrics,health,prometheus
management.endpoint.health.group.readiness.include[]=zeebeClient,operate

camunda.connector.polling.enabled=true
camunda.connector.polling.interval=5000
Expand Down
10 changes: 7 additions & 3 deletions bundle/default-bundle/src/test/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ zeebe.client.security.plaintext=true
# Operate config for use with docker-compose.yml
camunda.operate.client.url=http://localhost:8081

management.context-path=/actuator
management.endpoints.web.exposure.include=metrics,health,prometheus
management.endpoint.health.group.readiness.include[]=zeebeClient,operate
management.endpoint.health.show-components=always
management.endpoint.health.show-details=always

camunda.identity.type=keycloak
camunda.identity.base-url=http://localhost:8084
camunda.identity.issuer=http://localhost:18080/auth/realms/camunda-platform
camunda.identity.issuer-backend-url=http://localhost:18080/auth/realms/camunda-platform
camunda.identity.audience=connectors
# camunda.identity.client-id=xxx
# camunda.identity.client-secret=xxx
camunda.identity.client-id=connectors
camunda.identity.client-secret=XALaRPl5qwTEItdwCMiPS62nVpKs7dL7

debug=true
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ camunda.connector.webhook.enabled=true

management.context-path=/actuator
management.endpoints.web.exposure.include=metrics,health,prometheus
management.endpoint.health.group.readiness.include[]=zeebeClient,operate
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class ProcessDefinitionImporter {
private final Set<Long> registeredProcessDefinitionKeys = new HashSet<>();
private final Map<String, ProcessDefinition> versionByBpmnProcessId = new HashMap<>();

private boolean ready = false;

@Autowired
public ProcessDefinitionImporter(
InboundConnectorManager inboundManager,
Expand All @@ -49,7 +51,13 @@ public ProcessDefinitionImporter(

@Scheduled(fixedDelayString = "${camunda.connector.polling.interval:5000}")
public synchronized void scheduleImport() {
search.query(this::handleImportedDefinitions);
try {
search.query(this::handleImportedDefinitions);
ready = true;
} catch (Exception e) {
LOG.error("Failed to import process definitions", e);
ready = false;
}
}

public void handleImportedDefinitions(List<ProcessDefinition> unprocessedDefinitions) {
Expand Down Expand Up @@ -154,4 +162,8 @@ private void meter(int count) {
Inbound.METRIC_NAME_INBOUND_PROCESS_DEFINITIONS_CHECKED, null, null, count);
}
}

public boolean isReady() {
return ready;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,35 @@
*/
package io.camunda.connector.runtime;

import io.camunda.connector.runtime.inbound.importer.ProcessDefinitionImporter;
import io.camunda.connector.runtime.metrics.ContextAwareLogbackMetrics;
import io.camunda.zeebe.client.ZeebeClient;
import io.micrometer.core.instrument.binder.logging.LogbackMetrics;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.autoconfigure.metrics.LogbackMetricsAutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;

@ConditionalOnClass(name = "ch.qos.logback.classic.LoggerContext")
@AutoConfiguration
@AutoConfigureBefore(LogbackMetricsAutoConfiguration.class)
public class ContextAwareLogbackMetricsAutoConfiguration {
public class ConnectorsObservabilityAutoConfiguration {

@Bean
@ConditionalOnClass(name = "ch.qos.logback.classic.LoggerContext")
public LogbackMetrics logbackMetrics() {
return new ContextAwareLogbackMetrics();
}

@Bean(name = "zeebeClientHealthIndicator") // overrides the health indicator from Spring Zeebe
public ZeebeHealthIndicator zeebeClientHealthIndicator(ZeebeClient zeebeClient) {
return new ZeebeHealthIndicator(zeebeClient);
}

@Bean
public OperateHealthIndicator operateHealthIndicator(
@Autowired(required = false) ProcessDefinitionImporter processDefinitionImporter) {
return new OperateHealthIndicator(processDefinitionImporter);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.camunda.connector.runtime;

import io.camunda.connector.runtime.inbound.importer.ProcessDefinitionImporter;
import jakarta.annotation.Nullable;
import java.util.Map;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health.Builder;

public class OperateHealthIndicator extends AbstractHealthIndicator {

private final ProcessDefinitionImporter processDefinitionImporter;

public OperateHealthIndicator(@Nullable ProcessDefinitionImporter processDefinitionImporter) {
this.processDefinitionImporter = processDefinitionImporter;
}

@Override
protected void doHealthCheck(Builder builder) {
var details = Map.of("operateEnabled", processDefinitionImporter != null);
if (processDefinitionImporter == null || processDefinitionImporter.isReady()) {
builder.up().withDetails(details);
} else {
builder.down().withDetails(details);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.camunda.connector.runtime;

import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.BrokerInfo;
import io.camunda.zeebe.client.api.response.PartitionBrokerHealth;
import io.camunda.zeebe.client.api.response.PartitionInfo;
import java.util.Collection;
import java.util.Map;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health.Builder;

public class ZeebeHealthIndicator extends AbstractHealthIndicator {

private final ZeebeClient zeebeClient;

public ZeebeHealthIndicator(ZeebeClient zeebeClient) {
this.zeebeClient = zeebeClient;
}

@Override
protected void doHealthCheck(Builder builder) {
var topology = zeebeClient.newTopologyRequest().send().join();
var numBrokers = topology.getBrokers().size();
boolean anyPartitionHealthy =
topology.getBrokers().stream()
.map(BrokerInfo::getPartitions)
.flatMap(Collection::stream)
.map(PartitionInfo::getHealth)
.anyMatch(health -> health == PartitionBrokerHealth.HEALTHY);
var details = Map.of("numBrokers", numBrokers, "anyPartitionHealthy", anyPartitionHealthy);
if (numBrokers > 0 && anyPartitionHealthy) {
builder.up().withDetails(details);
} else {
builder.down().withDetails(details);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.camunda.connector.runtime.InboundConnectorsAutoConfiguration,\
io.camunda.connector.runtime.OutboundConnectorsAutoConfiguration,\
io.camunda.connector.runtime.WebhookConnectorAutoConfiguration,\
io.camunda.connector.runtime.ContextAwareLogbackMetricsAutoConfiguration
io.camunda.connector.runtime.ConnectorsObservabilityAutoConfiguration
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
io.camunda.connector.runtime.OutboundConnectorsAutoConfiguration
io.camunda.connector.runtime.InboundConnectorsAutoConfiguration
io.camunda.connector.runtime.WebhookConnectorAutoConfiguration
io.camunda.connector.runtime.ContextAwareLogbackMetricsAutoConfiguration
io.camunda.connector.runtime.ConnectorsObservabilityAutoConfiguration

0 comments on commit b14e541

Please sign in to comment.