Skip to content

Commit

Permalink
feat(interactive): Integrate otel into groot, and add charts for one-…
Browse files Browse the repository at this point in the history
…pod (#3641)

`helm install demo graphscope/graphscope-store --set otel.enabled` could
enable tracing in groot, and carry a jaeger sidecar with it.
  • Loading branch information
siyuan0322 authored Mar 19, 2024
1 parent 292b636 commit 7bb5d3f
Show file tree
Hide file tree
Showing 19 changed files with 244 additions and 23 deletions.
6 changes: 5 additions & 1 deletion charts/graphscope-store-one-pod/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ Return the proper graphscope-store image name
{{ include "common.images.image" (dict "imageRoot" .Values.image "global" .Values.global ) }}
{{- end -}}

{{- define "graphscope-store.otel.collector.image" -}}
{{ include "common.images.image" (dict "imageRoot" .Values.otel.collector.image "global" .Values.global ) }}
{{- end -}}

{{/*
Return the proper graphscope-store test image name
*/}}
Expand Down Expand Up @@ -164,4 +168,4 @@ We truncate at 63 chars because some Kubernetes name fields are limited to this
*/}}
{{- define "graphscope-store.zookeeper.fullname" -}}
{{- printf "%s-%s" .Release.Name "zookeeper" | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{- end -}}
10 changes: 10 additions & 0 deletions charts/graphscope-store-one-pod/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ data:
store.data.secondary.path={{ .Values.secondary.storeDataPath }}
store.gc.interval.ms={{ .Values.storeGcIntervalMs }}
tracing.enabled={{ .Values.otel.enabled }}
## Extra Config
{{- if .Values.extraConfig }}
{{- $config_list := regexSplit ";" .Values.extraConfig -1 }}
Expand All @@ -102,5 +104,13 @@ data:
export LOG_NAME=graphscope-store
export GROOT_CONF_FILE=/etc/groot/groot.config
{{- if .Values.otel.enabled }}
export JAVA_TOOL_OPTIONS="-javaagent:/home/graphscope/opentelemetry-javaagent.jar"
export OTEL_SERVICE_NAME="compiler"
export OTEL_TRACES_SAMPLER={{ .Values.otel.traces.sampler.name }}
export OTEL_TRACES_SAMPLER_ARG={{ .Values.otel.traces.sampler.arg }}
{{- end }}
${GRAPHSCOPE_HOME}/groot/bin/store_ctl.sh start
{{- end -}}
52 changes: 52 additions & 0 deletions charts/graphscope-store-one-pod/templates/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,58 @@ spec:
- name: config
mountPath: /etc/groot/setup.sh
subPath: setup.sh
{{- if .Values.otel.enabled }}
- env:
- name: METRICS_STORAGE_TYPE
value: prometheus
- name: SPAN_STORAGE_TYPE
value: memory
- name: JAEGER_DISABLED
value: "false"
- name: COLLECTOR_OTLP_ENABLED
value: "true"
image: {{ include "graphscope-store.otel.collector.image" . }}
imagePullPolicy: IfNotPresent
name: jaeger
args:
- "--memory.max-traces=8000"
- "--query.base-path=/jaeger/ui"
- "--prometheus.server-url=http://opentelemetry-demo-prometheus-server:9090"
- "--prometheus.query.normalize-calls=true"
- "--prometheus.query.normalize-duration=true"
ports:
- containerPort: 5778
protocol: TCP
- containerPort: 16686
protocol: TCP
- containerPort: 4317
protocol: TCP
- containerPort: 4318
protocol: TCP
livenessProbe:
failureThreshold: 5
httpGet:
path: /
port: 14269
scheme: HTTP
initialDelaySeconds: 5
periodSeconds: 15
successThreshold: 1
timeoutSeconds: 1
readinessProbe:
failureThreshold: 3
httpGet:
path: /
port: 14269
scheme: HTTP
initialDelaySeconds: 1
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
resources:
limits:
memory: 300Mi
{{- end }}
volumes:
- name: config
configMap:
Expand Down
13 changes: 13 additions & 0 deletions charts/graphscope-store-one-pod/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,16 @@ pegasus:
secondary:
enabled: false
storeDataPath: "./data_secondary"

otel:
enabled: false
collector:
image:
registry: docker.io
repository: jaegertracing/all-in-one
tag: "latest"
traces:
sampler:
name: "traceidratio"
arg: "0.1"

3 changes: 3 additions & 0 deletions docs/storage_engine/groot.md
Original file line number Diff line number Diff line change
Expand Up @@ -689,3 +689,6 @@ And use a different `zk.base.path` for each secondary instance to avoid conflict

`storeGcIntervalMs` controls how often should the secondary perform a `try_catch_up_with_primary` call, default to `5000` which is 5 seconds.

### Traces

use `--set otel.enabled=true` to enable trace export.
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,7 @@ public class CommonConfig {
// Only available in multi pod mode.
public static final Config<Boolean> WRITE_HIGH_AVAILABILITY_ENABLED =
Config.boolConfig("write.high.availability.enabled", false);

public static final Config<Boolean> TRACING_ENABLED =
Config.boolConfig("tracing.enabled", false);
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;

import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
Expand Down Expand Up @@ -101,6 +108,7 @@ public class IrStandardOpProcessor extends StandardOpProcessor {
protected final QueryIdGenerator idGenerator;
protected final QueryCache queryCache;
protected final ExecutionClient executionClient;
Tracer tracer;

public IrStandardOpProcessor(
Configs configs,
Expand Down Expand Up @@ -129,6 +137,7 @@ public IrStandardOpProcessor(
this.idGenerator = idGenerator;
this.queryCache = queryCache;
this.executionClient = executionClient;
this.tracer = GlobalOpenTelemetry.getTracer("compiler");
}

@Override
Expand Down Expand Up @@ -359,7 +368,11 @@ protected void processTraversal(
long jobId = queryLogger.getQueryId();
IrPlan irPlan = new IrPlan(irMeta, opCollection);
// print script and jobName with ir plan
queryLogger.info("ir plan {}", irPlan.getPlanAsJson());
queryLogger.info("Submitted query");
// Too verbose, since all identical queries produce identical plans, it's no need to print
// every plan in production.
String irPlanStr = irPlan.getPlanAsJson();
queryLogger.debug("ir plan {}", irPlanStr);
byte[] physicalPlanBytes = irPlan.toPhysicalBytes(queryConfigs);
irPlan.close();

Expand All @@ -380,10 +393,22 @@ protected void processTraversal(
.setAll(PegasusClient.Empty.newBuilder().build())
.build();
request = request.toBuilder().setConf(jobConfig).build();

this.rpcClient.submit(request, resultProcessor, timeoutConfig.getChannelTimeoutMS());
// request results from remote engine service in blocking way
resultProcessor.request();
Span outgoing =
tracer.spanBuilder("/evalOpInternal").setSpanKind(SpanKind.CLIENT).startSpan();
try (Scope scope = outgoing.makeCurrent()) {
outgoing.setAttribute("query.id", queryLogger.getQueryId());
outgoing.setAttribute("query.statement", queryLogger.getQuery());
outgoing.setAttribute("query.plan.logical", irPlanStr);
this.rpcClient.submit(request, resultProcessor, timeoutConfig.getChannelTimeoutMS());
// request results from remote engine service in blocking way
resultProcessor.request();
} catch (Throwable t) {
outgoing.setStatus(StatusCode.ERROR, "Submit failed!");
outgoing.recordException(t);
throw t;
} finally {
outgoing.end();
}
}

private Configs getQueryConfigs(Traversal traversal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,14 @@ fn make_gaia_config(graph_config: Arc<GraphConfig>) -> GaiaConfig {
.no_delay(no_delay)
.send_buffer(send_buffer)
.heartbeat_sec(heartbeat_sec);
GaiaConfig { network: Some(network_config), max_pool_size }
let enable_tracing = graph_config
.get_storage_option("tracing.enabled")
.map(|config_str| {
config_str
.parse()
.expect("parse tracing.enabled failed")
});
GaiaConfig { network: Some(network_config), max_pool_size, enable_tracing }
}

fn make_gaia_rpc_config(graph_config: Arc<GraphConfig>) -> RPCServerConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ nohash-hasher = "0.2.0"
ahash = "0.7.2"
dot = "0.1.4"
dyn-clonable = "0.9.0"
opentelemetry = { version = "0.22.0", features = ["trace"] }

[features]
mem = ["pegasus_memory/mem"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ macro_rules! configure_with_default {
pub struct Configuration {
pub network: Option<NetworkConfig>,
pub max_pool_size: Option<u32>,
pub enable_tracing: Option<bool>,
}

impl Configuration {
Expand All @@ -45,11 +46,11 @@ impl Configuration {
}

pub fn singleton() -> Self {
Configuration { network: None, max_pool_size: None }
Configuration { network: None, max_pool_size: None, enable_tracing: None }
}

pub fn with(network: NetworkConfig) -> Self {
Configuration { network: Some(network), max_pool_size: None }
Configuration { network: Some(network), max_pool_size: None, enable_tracing: None }
}

pub fn server_id(&self) -> u64 {
Expand Down
18 changes: 15 additions & 3 deletions interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Once;

pub use config::{read_from, Configuration, JobConf, ServerConf};
pub use data::Data;
use opentelemetry::trace::{TraceContextExt, Tracer};
use opentelemetry::{global, KeyValue};
pub use pegasus_common::codec;
pub use pegasus_memory::alloc::check_current_task_memory;
pub use pegasus_network::ServerDetect;
Expand Down Expand Up @@ -280,9 +283,18 @@ where
return Ok(());
}
let worker_ids = workers.unwrap();
let tracer = global::tracer("executor");

let mut workers = Vec::new();
for id in worker_ids {
let mut worker = Worker::new(&conf, id, &peer_guard, sink.clone());
for worker_id in worker_ids {
let mut worker = tracer.in_span(format!("/pegasus::run_opt"), |cx| {
cx.span()
.set_attribute(KeyValue::new("worker-id", worker_id.index.to_string()));
let span = tracer
.span_builder(format!("/worker-{}", worker_id.index))
.start_with_context(&tracer, &cx);
Worker::new(&conf, worker_id, &peer_guard, sink.clone(), span)
});
let _g = crate::worker_id::guard(worker.id);
logic(&mut worker)?;
workers.push(worker);
Expand All @@ -293,6 +305,7 @@ where
}

info!("spawn job_{}({}) with {} workers;", conf.job_name, conf.job_id, workers.len());

match pegasus_executor::spawn_batch(workers) {
Ok(_) => Ok(()),
Err(e) => {
Expand Down Expand Up @@ -371,7 +384,6 @@ fn allocate_local_worker(conf: &Arc<JobConf>) -> Result<Option<WorkerIdIter>, Bu
}
}

use std::sync::Once;
lazy_static! {
static ref SINGLETON_INIT: Once = Once::new();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ impl<T: 'static> ResultSink<T> {
if is_canceled {
match &mut self.kind {
ResultSinkKind::Customized(tx) => {
info_worker!("Job is canceled");
let msg = "Job is canceled".to_string();
info_worker!("Job is canceled");
let err = JobExecError::from(msg);
warn_worker!("Job is canceled");
tx.on_error(Box::new(err));
}
_ => (),
Expand Down
Loading

0 comments on commit 7bb5d3f

Please sign in to comment.