From 1199f20e718c4a744e09d2703b08fc7d6ee5d9af Mon Sep 17 00:00:00 2001 From: Xiaoli Zhou Date: Tue, 3 Dec 2024 15:39:17 +0800 Subject: [PATCH] fix(interactive): Introduce `MetricsTool` to profile memory usage, pending tasks and qps (#4332) ## What do these changes do? the `MetricsTool` will print the following metrics periodically (setting of `metrics.tool.interval.ms`, default is 5 mins): 1. `memory.usage`: memory usage of jvm and direct memory 2. `rpc.channels.executor.queue`: pending tasks in each grpc (netty) channel 3. `gremlin.executor.queue`: pending tasks in gremlin executor 4. `gremlin.qps`: gremlin qps These metrics will be logged in a separate file, configured by `PerfMetricLog` in [logback.xml](https://github.com/shirly121/GraphScope/blob/ir_metric_tool/interactive_engine/assembly/src/conf/groot/logback.xml#L37) ## Related issue number Fixes --- .../assembly/src/conf/groot/logback.xml | 17 ++++ .../com/alibaba/graphscope/GraphServer.java | 11 ++- .../common/client/ExecutionClient.java | 6 +- .../common/client/RpcExecutionClient.java | 53 +++++++---- .../common/client/RpcInterceptor.java | 92 +++++++++++++++++++ .../client/channel/HostURIChannelFetcher.java | 16 ++-- .../channel/HostsRpcChannelFetcher.java | 24 ++--- .../client/metric/RpcExecutorMetric.java | 66 +++++++++++++ .../common/config/FrontendConfig.java | 3 + .../common/ir/tools/GraphPlanner.java | 28 +++++- .../common/manager/RateLimitExecutor.java | 23 ++++- .../common/metric/MemoryMetric.java | 62 +++++++++++++ .../graphscope/common/metric/Metric.java | 52 +++++++++++ .../graphscope/common/metric/MetricsTool.java | 70 ++++++++++++++ .../cypher/executor/GraphQueryExecutor.java | 28 +++--- .../cypher/result/CypherRecordProcessor.java | 2 +- .../metric/GremlinExecutorQueueMetric.java | 48 ++++++++++ .../gremlin/metric/GremlinQPSMetric.java | 62 +++++++++++++ .../processor/IrStandardOpProcessor.java | 13 ++- .../plugin/processor/LifeCycleSupplier.java | 13 ++- .../GremlinCalciteScriptEngineFactory.java | 4 +- .../processor/AbstractResultProcessor.java | 4 +- .../resultx/GremlinResultProcessor.java | 3 + .../gremlin/service/IrGremlinServer.java | 11 ++- .../main/java/io/grpc/internal/RpcUtils.java | 56 +++++++++++ .../main/java/io/grpc/netty/NettyUtils.java | 37 ++++++++ .../java/com/alibaba/pegasus/RpcChannel.java | 14 ++- .../java/com/alibaba/pegasus/RpcClient.java | 18 ++++ 28 files changed, 761 insertions(+), 75 deletions(-) create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcInterceptor.java create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/metric/RpcExecutorMetric.java create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/MemoryMetric.java create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/Metric.java create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/MetricsTool.java create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/metric/GremlinExecutorQueueMetric.java create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/metric/GremlinQPSMetric.java create mode 100644 interactive_engine/compiler/src/main/java/io/grpc/internal/RpcUtils.java create mode 100644 interactive_engine/compiler/src/main/java/io/grpc/netty/NettyUtils.java diff --git a/interactive_engine/assembly/src/conf/groot/logback.xml b/interactive_engine/assembly/src/conf/groot/logback.xml index 93b56c971b09..4eb219fa22ee 100644 --- a/interactive_engine/assembly/src/conf/groot/logback.xml +++ b/interactive_engine/assembly/src/conf/groot/logback.xml @@ -34,6 +34,23 @@ + + ${log_dir}/perf_metric.log + + ${log_dir}/perf_metric.%d{yyyy-MM-dd}.%i.gz + 7 + 100MB + 500MB + + + [%d{ISO8601}][%p][%t][%c:%L] %m%n + + + + + + + diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java index 4ac59f7a7ad9..ab11c9b198b2 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/GraphServer.java @@ -35,6 +35,8 @@ import com.alibaba.graphscope.common.ir.tools.QueryCache; import com.alibaba.graphscope.common.ir.tools.QueryIdGenerator; import com.alibaba.graphscope.common.manager.IrMetaQueryCallback; +import com.alibaba.graphscope.common.metric.MemoryMetric; +import com.alibaba.graphscope.common.metric.MetricsTool; import com.alibaba.graphscope.cypher.service.CypherBootstrapper; import com.alibaba.graphscope.gremlin.integration.result.GraphProperties; import com.alibaba.graphscope.gremlin.integration.result.TestGraphFactory; @@ -62,6 +64,7 @@ public class GraphServer { private final IrMetaQueryCallback metaQueryCallback; private final GraphProperties testGraph; private final GraphRelOptimizer optimizer; + private final MetricsTool metricsTool; private IrGremlinServer gremlinServer; private CypherBootstrapper cypherBootstrapper; @@ -77,10 +80,13 @@ public GraphServer( this.metaQueryCallback = metaQueryCallback; this.testGraph = testGraph; this.optimizer = optimizer; + this.metricsTool = new MetricsTool(configs); + this.metricsTool.registerMetric(new MemoryMetric()); } public void start() throws Exception { - ExecutionClient executionClient = ExecutionClient.Factory.create(configs, channelFetcher); + ExecutionClient executionClient = + ExecutionClient.Factory.create(configs, channelFetcher, metricsTool); QueryIdGenerator idGenerator = new QueryIdGenerator(configs); QueryCache queryCache = new QueryCache(configs); if (!FrontendConfig.GREMLIN_SERVER_DISABLED.get(configs)) { @@ -95,7 +101,8 @@ public void start() throws Exception { executionClient, channelFetcher, metaQueryCallback, - testGraph); + testGraph, + metricsTool); this.gremlinServer.start(); } if (!FrontendConfig.NEO4J_BOLT_SERVER_DISABLED.get(configs)) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java index 089726d7fe0f..9cf88effecaf 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/ExecutionClient.java @@ -21,6 +21,7 @@ import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.QueryTimeoutConfig; +import com.alibaba.graphscope.common.metric.MetricsTool; import com.alibaba.graphscope.gremlin.plugin.QueryLogger; /** @@ -45,10 +46,11 @@ public abstract void submit( public abstract void close() throws Exception; public static class Factory { - public static ExecutionClient create(Configs configs, ChannelFetcher channelFetcher) { + public static ExecutionClient create( + Configs configs, ChannelFetcher channelFetcher, MetricsTool metricsTool) { switch (channelFetcher.getType()) { case RPC: - return new RpcExecutionClient(configs, channelFetcher); + return new RpcExecutionClient(configs, channelFetcher, metricsTool); case HTTP: return new HttpExecutionClient(configs, channelFetcher); default: diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java index 1ef68e17a1a0..ca84658a71d2 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcExecutionClient.java @@ -17,38 +17,41 @@ package com.alibaba.graphscope.common.client; import com.alibaba.graphscope.common.client.channel.ChannelFetcher; +import com.alibaba.graphscope.common.client.metric.RpcExecutorMetric; import com.alibaba.graphscope.common.client.type.ExecutionRequest; import com.alibaba.graphscope.common.client.type.ExecutionResponseListener; import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.PegasusConfig; import com.alibaba.graphscope.common.config.QueryTimeoutConfig; +import com.alibaba.graphscope.common.metric.MetricsTool; import com.alibaba.graphscope.gaia.proto.IrResult; import com.alibaba.graphscope.gremlin.plugin.QueryLogger; import com.alibaba.pegasus.RpcChannel; import com.alibaba.pegasus.RpcClient; import com.alibaba.pegasus.intf.ResultProcessor; import com.alibaba.pegasus.service.protocol.PegasusClient; +import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; +import io.grpc.ClientInterceptors; import io.grpc.Status; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.atomic.AtomicReference; +import java.util.List; +import java.util.stream.Collectors; /** * rpc client to send request to pegasus engine service */ public class RpcExecutionClient extends ExecutionClient { - Logger logger = LoggerFactory.getLogger(RpcExecutionClient.class); private final Configs graphConfig; - private final AtomicReference rpcClientRef; - public RpcExecutionClient(Configs graphConfig, ChannelFetcher channelFetcher) { + public RpcExecutionClient( + Configs graphConfig, + ChannelFetcher channelFetcher, + MetricsTool metricsTool) { super(channelFetcher); this.graphConfig = graphConfig; - this.rpcClientRef = new AtomicReference<>(); + metricsTool.registerMetric(new RpcExecutorMetric(channelFetcher)); } @Override @@ -58,10 +61,18 @@ public void submit( QueryTimeoutConfig timeoutConfig, QueryLogger queryLogger) throws Exception { - if (rpcClientRef.get() == null) { - rpcClientRef.compareAndSet(null, new RpcClient(channelFetcher.fetch())); - } - RpcClient rpcClient = rpcClientRef.get(); + List interceptChannels = + channelFetcher.fetch().stream() + .map( + k -> + new RpcChannel( + ClientInterceptors.intercept( + k.getChannel(), new RpcInterceptor()))) + .collect(Collectors.toList()); + RpcClient rpcClient = + new RpcClient( + interceptChannels, + ImmutableMap.of(RpcInterceptor.QUERY_LOGGER_OPTION, queryLogger)); PegasusClient.JobRequest jobRequest = PegasusClient.JobRequest.newBuilder() .setPlan( @@ -99,7 +110,8 @@ public void process(PegasusClient.JobResponse jobResponse) { @Override public void finish() { listener.onCompleted(); - queryLogger.info("[compile]: received results from engine"); + queryLogger.info( + "[query][response]: received all responses from all servers"); } @Override @@ -113,8 +125,17 @@ public void error(Status status) { @Override public void close() throws Exception { - if (rpcClientRef.get() != null) { - rpcClientRef.get().shutdown(); - } + channelFetcher + .fetch() + .forEach( + k -> { + try { + if (k != null) { + k.shutdown(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcInterceptor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcInterceptor.java new file mode 100644 index 000000000000..647a16a83d23 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/RpcInterceptor.java @@ -0,0 +1,92 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.client; + +import com.alibaba.graphscope.gremlin.plugin.QueryLogger; + +import io.grpc.*; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicBoolean; + +public class RpcInterceptor implements ClientInterceptor { + public static final CallOptions.Key QUERY_LOGGER_OPTION = + CallOptions.Key.create("query-logger"); + + @Override + public ClientCall interceptCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions, + Channel channel) { + return new ForwardingClientCall.SimpleForwardingClientCall( + channel.newCall(methodDescriptor, callOptions)) { + private Instant requestStartTime; + + @Override + public void start(Listener responseListener, Metadata headers) { + requestStartTime = Instant.now(); + QueryLogger queryLogger = callOptions.getOption(QUERY_LOGGER_OPTION); + super.start( + new ForwardingClientCallListener.SimpleForwardingClientCallListener( + responseListener) { + private final AtomicBoolean firstResponseLogged = + new AtomicBoolean(false); + + @Override + public void onMessage(RespT message) { + if (firstResponseLogged.compareAndSet(false, true)) { + long firstResponseTime = + Instant.now().toEpochMilli() + - requestStartTime.toEpochMilli(); + if (queryLogger != null) { + queryLogger.info( + "[query][response]: receive the first response from" + + " the channel {} in {} ms", + channel.authority(), + firstResponseTime); + } + } + super.onMessage(message); + } + + @Override + public void onClose(Status status, Metadata trailers) { + long endTime = Instant.now().toEpochMilli(); + long totalTime = endTime - requestStartTime.toEpochMilli(); + if (queryLogger != null) { + queryLogger.info( + "[query][response]: receive the last response from the" + + " channel {} with status {} in {} ms", + channel.authority(), + status, + totalTime); + } + super.onClose(status, trailers); + } + }, + headers); + if (queryLogger != null) { + queryLogger.info( + "[query][submitted]: submit the query to the task queue of channel {}", + channel.authority()); + } + } + }; + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java index 2e0830276b8f..f25495bc9528 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostURIChannelFetcher.java @@ -18,9 +18,10 @@ import com.alibaba.graphscope.common.config.Configs; import com.alibaba.graphscope.common.config.HiactorConfig; +import com.alibaba.graphscope.common.config.Utils; import java.net.URI; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -29,19 +30,18 @@ */ public class HostURIChannelFetcher implements ChannelFetcher { private static final String schema = "http"; - private Configs graphConfig; + private final List uriChannels; public HostURIChannelFetcher(Configs graphConfig) { - this.graphConfig = graphConfig; + this.uriChannels = + Utils.convertDotString(HiactorConfig.HIACTOR_HOSTS.get(graphConfig)).stream() + .map(k -> URI.create(schema + "://" + k)) + .collect(Collectors.toList()); } @Override public List fetch() { - String hosts = HiactorConfig.HIACTOR_HOSTS.get(graphConfig); - String[] hostsArr = hosts.split(","); - return Arrays.asList(hostsArr).stream() - .map(k -> URI.create(schema + "://" + k)) - .collect(Collectors.toList()); + return Collections.unmodifiableList(uriChannels); } @Override diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostsRpcChannelFetcher.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostsRpcChannelFetcher.java index 63f5544c6857..39f980288ceb 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostsRpcChannelFetcher.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/channel/HostsRpcChannelFetcher.java @@ -21,30 +21,30 @@ import com.alibaba.graphscope.common.config.Utils; import com.alibaba.pegasus.RpcChannel; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; /** * rpc implementation of {@link ChannelFetcher}, init rpc from local config */ public class HostsRpcChannelFetcher implements ChannelFetcher { - private Configs config; + private final List rpcChannels; public HostsRpcChannelFetcher(Configs config) { - this.config = config; + this.rpcChannels = + Utils.convertDotString(PegasusConfig.PEGASUS_HOSTS.get(config)).stream() + .map( + k -> { + String[] host = k.split(":"); + return new RpcChannel(host[0], Integer.valueOf(host[1])); + }) + .collect(Collectors.toList()); } @Override public List fetch() { - List hostAddresses = - Utils.convertDotString(PegasusConfig.PEGASUS_HOSTS.get(config)); - List rpcChannels = new ArrayList<>(); - hostAddresses.forEach( - k -> { - String[] host = k.split(":"); - rpcChannels.add(new RpcChannel(host[0], Integer.valueOf(host[1]))); - }); - return rpcChannels; + return Collections.unmodifiableList(rpcChannels); } @Override diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/metric/RpcExecutorMetric.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/metric/RpcExecutorMetric.java new file mode 100644 index 000000000000..7c9beee321a8 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/client/metric/RpcExecutorMetric.java @@ -0,0 +1,66 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.client.metric; + +import com.alibaba.graphscope.common.client.channel.ChannelFetcher; +import com.alibaba.graphscope.common.metric.Metric; +import com.alibaba.pegasus.RpcChannel; +import com.google.common.collect.Maps; + +import io.grpc.ManagedChannel; +import io.grpc.internal.RpcUtils; +import io.netty.channel.Channel; +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.SingleThreadEventExecutor; + +import java.util.List; +import java.util.Map; + +public class RpcExecutorMetric implements Metric { + private final ChannelFetcher channelFetcher; + + public RpcExecutorMetric(ChannelFetcher channelFetcher) { + this.channelFetcher = channelFetcher; + } + + @Override + public Key getKey() { + return KeyFactory.RPC_CHANNELS_EXECUTOR_QUEUE; + } + + @Override + public Map getValue() { + List channels = channelFetcher.fetch(); + Map values = Maps.newHashMap(); + channels.forEach( + k -> { + ManagedChannel channel = RpcUtils.getDelegateChannel(k.getChannel()); + int queueSize = ValueFactory.INVALID_INT; + Channel nettyChannel = RpcUtils.getNettyChannel(channel); + if (nettyChannel != null) { + EventLoop eventLoop = nettyChannel.eventLoop(); + if (eventLoop instanceof SingleThreadEventExecutor) { + queueSize = ((SingleThreadEventExecutor) eventLoop).pendingTasks(); + } + } + values.put(k.getChannel().authority(), queueSize); + }); + return values; + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java index 6a66c31d6496..bbbbe19b7967 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/config/FrontendConfig.java @@ -63,4 +63,7 @@ public class FrontendConfig { public static final Config QUERY_PRINT_THRESHOLD_MS = Config.longConfig("query.print.threshold.ms", 200l); + + public static final Config METRICS_TOOL_INTERVAL_MS = + Config.longConfig("metrics.tool.interval.ms", 5 * 60 * 1000L); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java index 24c821b2f2e9..948ec068e299 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/tools/GraphPlanner.java @@ -38,6 +38,7 @@ import com.alibaba.graphscope.common.ir.runtime.proto.GraphRelProtoPhysicalBuilder; import com.alibaba.graphscope.common.ir.type.GraphTypeFactoryImpl; import com.alibaba.graphscope.common.utils.ClassUtils; +import com.alibaba.graphscope.gremlin.plugin.QueryLogger; import com.alibaba.graphscope.proto.frontend.Code; import com.google.common.collect.Maps; @@ -85,6 +86,11 @@ public GraphPlanner( } public PlannerInstance instance(String query, IrMeta irMeta) { + return instance(query, irMeta, null); + } + + public PlannerInstance instance( + String query, IrMeta irMeta, @Nullable QueryLogger queryLogger) { GraphOptCluster optCluster = GraphOptCluster.create(this.optimizer.getMatchPlanner(), this.rexBuilder); RelMetadataQuery mq = @@ -101,7 +107,7 @@ public PlannerInstance instance(String query, IrMeta irMeta) { graphConfig, optCluster, new GraphOptSchema(optCluster, schema)); LogicalPlan logicalPlan = logicalPlanFactory.create(graphBuilder, irMeta, query); - return new PlannerInstance(query, logicalPlan, graphBuilder, irMeta); + return new PlannerInstance(query, logicalPlan, graphBuilder, irMeta, queryLogger); } public class PlannerInstance { @@ -109,13 +115,19 @@ public class PlannerInstance { private final LogicalPlan parsedPlan; private final GraphBuilder graphBuilder; private final IrMeta irMeta; + private @Nullable final QueryLogger queryLogger; public PlannerInstance( - String query, LogicalPlan parsedPlan, GraphBuilder graphBuilder, IrMeta irMeta) { + String query, + LogicalPlan parsedPlan, + GraphBuilder graphBuilder, + IrMeta irMeta, + @Nullable QueryLogger queryLogger) { this.query = query; this.parsedPlan = parsedPlan; this.graphBuilder = graphBuilder; this.irMeta = irMeta; + this.queryLogger = queryLogger; } public LogicalPlan getParsedPlan() { @@ -125,10 +137,16 @@ public LogicalPlan getParsedPlan() { public Summary plan() { LogicalPlan logicalPlan = ClassUtils.callException(() -> planLogical(), Code.LOGICAL_PLAN_BUILD_FAILED); - return new Summary( - logicalPlan, + if (queryLogger != null) { + queryLogger.info("[query][compiled]: logical IR compiled"); + } + PhysicalPlan physicalPlan = ClassUtils.callException( - () -> planPhysical(logicalPlan), Code.PHYSICAL_PLAN_BUILD_FAILED)); + () -> planPhysical(logicalPlan), Code.PHYSICAL_PLAN_BUILD_FAILED); + if (queryLogger != null) { + queryLogger.info("[query][compiled]: physical IR compiled"); + } + return new Summary(logicalPlan, physicalPlan); } public LogicalPlan planLogical() { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/manager/RateLimitExecutor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/manager/RateLimitExecutor.java index 9d8312dd6d93..3dcad741ccb1 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/manager/RateLimitExecutor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/manager/RateLimitExecutor.java @@ -20,14 +20,12 @@ import com.alibaba.graphscope.common.config.FrontendConfig; import com.google.common.util.concurrent.RateLimiter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; public class RateLimitExecutor extends ThreadPoolExecutor { - private static final Logger logger = LoggerFactory.getLogger(RateLimitExecutor.class); private final RateLimiter rateLimiter; + private final AtomicLong queryCounter; public RateLimitExecutor( Configs configs, @@ -48,9 +46,15 @@ public RateLimitExecutor( handler); int permitsPerSecond = FrontendConfig.QUERY_PER_SECOND_LIMIT.get(configs); this.rateLimiter = RateLimiter.create(permitsPerSecond); + this.queryCounter = new AtomicLong(0); + } + + public long getQueryCounter() { + return queryCounter.get(); } public Future submit(Runnable task) { + incrementCounter(); if (rateLimiter.tryAcquire()) { return super.submit(task); } @@ -60,4 +64,15 @@ public Future submit(Runnable task) { + " per second. Please increase the QPS limit by the config" + " 'query.per.second.limit' or slow down the query sending speed"); } + + // lock-free + private void incrementCounter() { + while (true) { + long currentValue = queryCounter.get(); + long nextValue = (currentValue == Long.MAX_VALUE) ? 0 : currentValue + 1; + if (queryCounter.compareAndSet(currentValue, nextValue)) { + break; + } + } + } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/MemoryMetric.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/MemoryMetric.java new file mode 100644 index 000000000000..3b28117a0dfb --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/MemoryMetric.java @@ -0,0 +1,62 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.metric; + +import com.google.common.collect.ImmutableMap; + +import java.lang.management.BufferPoolMXBean; +import java.lang.management.ManagementFactory; +import java.util.List; +import java.util.Map; + +public class MemoryMetric implements Metric { + @Override + public Key getKey() { + return KeyFactory.MEMORY; + } + + @Override + public Map getValue() { + // jvm memory status + long jvmFreeMem = Runtime.getRuntime().freeMemory(); + long jvmTotalMem = Runtime.getRuntime().totalMemory(); + long jvmUsedMem = jvmTotalMem - jvmFreeMem; + + // Direct memory + List bufferPools = + ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class); + long directUsedMem = ValueFactory.INVALID_LONG; + long directTotalMem = ValueFactory.INVALID_LONG; + for (BufferPoolMXBean bufferPool : bufferPools) { + if ("direct".equalsIgnoreCase(bufferPool.getName())) { + directUsedMem = bufferPool.getMemoryUsed(); + directTotalMem = bufferPool.getTotalCapacity(); + } + } + return ImmutableMap.of( + "jvm.used", + jvmUsedMem, + "jvm.total", + jvmTotalMem, + "direct.used", + directUsedMem, + "direct.total", + directTotalMem); + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/Metric.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/Metric.java new file mode 100644 index 000000000000..12f5b6b6dcef --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/Metric.java @@ -0,0 +1,52 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.metric; + +public interface Metric { + + class KeyFactory { + public static final Key MEMORY = new Key("memory.usage"); + public static final Key RPC_CHANNELS_EXECUTOR_QUEUE = + new Key("rpc.channels.executor.queue"); + public static final Key GREMLIN_EXECUTOR_QUEUE = new Key("gremlin.executor.queue"); + public static final Key GREMLIN_QPS = new Key("gremlin.qps"); + } + + class ValueFactory { + public static long INVALID_LONG = -1L; + public static int INVALID_INT = -1; + } + + Key getKey(); + + Value getValue(); + + class Key { + private final String keyName; + + private Key(String keyName) { + this.keyName = keyName; + } + + @Override + public String toString() { + return this.keyName; + } + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/MetricsTool.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/MetricsTool.java new file mode 100644 index 000000000000..99ab3b01efba --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/metric/MetricsTool.java @@ -0,0 +1,70 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.common.metric; + +import com.alibaba.graphscope.common.config.Configs; +import com.alibaba.graphscope.common.config.FrontendConfig; +import com.google.common.collect.Lists; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class MetricsTool { + private static Logger logger = LoggerFactory.getLogger("PerfMetricLog"); + private final List metrics; + private final ScheduledExecutorService service; + private final long intervalMS; + + public MetricsTool(Configs configs) { + this.metrics = Lists.newArrayList(); + this.service = new ScheduledThreadPoolExecutor(1); + this.intervalMS = FrontendConfig.METRICS_TOOL_INTERVAL_MS.get(configs); + if (this.intervalMS > 0) { + this.service.scheduleAtFixedRate( + () -> printMetrics(), intervalMS, intervalMS, TimeUnit.MILLISECONDS); + } + } + + public MetricsTool registerMetric(Metric metric) { + if (metrics.stream().anyMatch(k -> k.getKey().equals(metric.getKey()))) { + logger.warn("metric {} already exists", metric.getKey()); + return this; + } + metrics.add(metric); + return this; + } + + private void printMetrics() { + try { + StringBuilder builder = new StringBuilder(); + metrics.forEach( + k -> { + builder.append(k.getKey()).append(":").append(k.getValue()).append("\n"); + }); + logger.info("print perf metrics per {} ms:\n{} \n\n", intervalMS, builder); + } catch (Throwable t) { + logger.error("print perf metrics failed", t); + } + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java index 1d6eeaaacc25..8db5c94c5109 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/executor/GraphQueryExecutor.java @@ -121,13 +121,18 @@ public StatementResult run( null, graphConfig); try { + statusCallback + .getQueryLogger() + .info("[query][received]: query received from the cypher client"); // hack ways to execute routing table or ping statement before executing the real query if (statement.equals(GET_ROUTING_TABLE_STATEMENT) || statement.equals(PING_STATEMENT)) { return super.run(fabricTransaction, statement, parameters); } irMeta = metaQueryCallback.beforeExec(); QueryCache.Key cacheKey = - queryCache.createKey(graphPlanner.instance(statement, irMeta)); + queryCache.createKey( + graphPlanner.instance( + statement, irMeta, statusCallback.getQueryLogger())); QueryCache.Value cacheValue = queryCache.get(cacheKey); Preconditions.checkArgument( cacheValue != null, @@ -137,21 +142,15 @@ public StatementResult run( new GraphPlanner.Summary( cacheValue.summary.getLogicalPlan(), cacheValue.summary.getPhysicalPlan()); - logger.debug( - "cypher query \"{}\", job conf name \"{}\", calcite logical plan {}, hash id" - + " {}", - statement, - jobName, - planSummary.getLogicalPlan().explain(), - cacheKey.hashCode()); + statusCallback + .getQueryLogger() + .info("logical IR plan \n\n {} \n\n", planSummary.getLogicalPlan().explain()); + statusCallback + .getQueryLogger() + .debug("physical IR plan {}", planSummary.getPhysicalPlan().explain()); if (planSummary.getLogicalPlan().isReturnEmpty()) { return StatementResults.initial(); } - logger.info( - "cypher query \"{}\", job conf name \"{}\", ir core logical plan {}", - statement, - jobName, - planSummary.getPhysicalPlan().explain()); QueryTimeoutConfig timeoutConfig = getQueryTimeoutConfig(); GraphPlanExecutor executor; if (cacheValue.result != null && cacheValue.result.isCompleted) { @@ -190,6 +189,9 @@ public void execute( listener, timeoutConfig, statusCallback.getQueryLogger()); + statusCallback + .getQueryLogger() + .info("[query][submitted]: physical IR submitted"); } }; } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java index b39e2d8eadc6..33256c5959d5 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/result/CypherRecordProcessor.java @@ -110,6 +110,7 @@ public void request(long l) throws Exception { } if (!recordIterator.hasNext()) { subscriber.onResultCompleted(QueryStatistics.EMPTY); + statusCallback.onSuccessEnd(); } } @@ -136,7 +137,6 @@ public void onNext(IrResult.Record record) { public void onCompleted() { try { this.recordIterator.finish(); - this.statusCallback.onSuccessEnd(); } catch (InterruptedException e) { onError(e); } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/metric/GremlinExecutorQueueMetric.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/metric/GremlinExecutorQueueMetric.java new file mode 100644 index 000000000000..6debae48a772 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/metric/GremlinExecutorQueueMetric.java @@ -0,0 +1,48 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.gremlin.metric; + +import com.alibaba.graphscope.common.metric.Metric; +import com.alibaba.graphscope.gremlin.Utils; + +import org.apache.tinkerpop.gremlin.server.GremlinServer; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +public class GremlinExecutorQueueMetric implements Metric { + private final ExecutorService executorService; + + public GremlinExecutorQueueMetric(GremlinServer server) { + this.executorService = + Utils.getFieldValue(GremlinServer.class, server, "gremlinExecutorService"); + } + + @Override + public Key getKey() { + return KeyFactory.GREMLIN_EXECUTOR_QUEUE; + } + + @Override + public Integer getValue() { + return (executorService instanceof ThreadPoolExecutor) + ? ((ThreadPoolExecutor) executorService).getQueue().size() + : ValueFactory.INVALID_INT; + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/metric/GremlinQPSMetric.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/metric/GremlinQPSMetric.java new file mode 100644 index 000000000000..9919586f22bd --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/metric/GremlinQPSMetric.java @@ -0,0 +1,62 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.gremlin.metric; + +import com.alibaba.graphscope.common.manager.RateLimitExecutor; +import com.alibaba.graphscope.common.metric.Metric; +import com.alibaba.graphscope.gremlin.Utils; + +import org.apache.commons.lang3.time.StopWatch; +import org.apache.tinkerpop.gremlin.server.GremlinServer; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +public class GremlinQPSMetric implements Metric { + private final ExecutorService executorService; + + public GremlinQPSMetric(GremlinServer server) { + this.executorService = + Utils.getFieldValue(GremlinServer.class, server, "gremlinExecutorService"); + } + + @Override + public Key getKey() { + return KeyFactory.GREMLIN_QPS; + } + + @Override + public Long getValue() { + try { + if (executorService instanceof RateLimitExecutor) { + long startCounter = ((RateLimitExecutor) executorService).getQueryCounter(); + StopWatch watch = StopWatch.createStarted(); + Thread.sleep(2000); + long endCounter = ((RateLimitExecutor) executorService).getQueryCounter(); + long elapsed = watch.getTime(TimeUnit.MILLISECONDS); + // the counter may be reset to 0, so we need to handle this case + startCounter = (endCounter >= startCounter) ? startCounter : 0; + return (endCounter - startCounter) * 1000 / elapsed; + } + return ValueFactory.INVALID_LONG; + } catch (InterruptedException t) { + throw new RuntimeException(t); + } + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java index 8ac1f418e99a..64d780a507b3 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java @@ -200,7 +200,9 @@ protected void evalOpInternal( new MetricsCollector.Gremlin(evalOpTimer), queryHistogram, configs); - statusCallback.getQueryLogger().info("[compile]: query received"); + statusCallback + .getQueryLogger() + .info("[query][received]: query received from the gremlin client"); QueryTimeoutConfig timeoutConfig = new QueryTimeoutConfig(ctx.getRequestTimeout()); GremlinExecutor.LifeCycle lifeCycle; switch (language) { @@ -361,7 +363,9 @@ protected GremlinExecutor.LifeCycle createLifeCycle( if (o != null && o instanceof Traversal) { applyStrategies((Traversal) o); } - statusCallback.getQueryLogger().info("[compile]: traversal compiled"); + statusCallback + .getQueryLogger() + .info("[query][compiled]: traversal compiled"); return o; }) .withResult( @@ -406,7 +410,7 @@ protected void processTraversal( return opCollection; }, Code.LOGICAL_PLAN_BUILD_FAILED); - queryLogger.info("[compile]: logical IR compiled"); + queryLogger.info("[query][compiled]: logical IR compiled"); StringBuilder irPlanStr = new StringBuilder(); PegasusClient.JobRequest physicalRequest = ClassUtils.callException( @@ -451,7 +455,7 @@ protected void processTraversal( return request; }, Code.PHYSICAL_PLAN_BUILD_FAILED); - queryLogger.info("[compile]: physical IR compiled"); + queryLogger.info("[query][compiled]: physical IR compiled"); Span outgoing; // if exist up trace, useUpTraceId as current traceId if (TraceId.isValid(queryLogger.getUpstreamId())) { @@ -478,6 +482,7 @@ protected void processTraversal( outgoing.setAttribute("query.plan", irPlanStr.toString()); this.rpcClient.submit( physicalRequest, resultProcessor, timeoutConfig.getChannelTimeoutMS()); + queryLogger.info("[query][submitted]: physical IR submitted"); // request results from remote engine service in blocking way resultProcessor.request(); } catch (Throwable t) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java index 1289c9caaef9..27d1228d3982 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/LifeCycleSupplier.java @@ -81,6 +81,7 @@ public GremlinExecutor.LifeCycle get() { b.put("graph.query.cache", queryCache); b.put("graph.planner", graphPlanner); b.put("graph.meta", meta); + b.put("graph.query.logger", statusCallback.getQueryLogger()); }) .withResult( o -> { @@ -95,7 +96,14 @@ public GremlinExecutor.LifeCycle get() { GraphPlanner.Summary summary = value.summary; statusCallback .getQueryLogger() - .debug("ir plan {}", summary.getPhysicalPlan().explain()); + .info( + "logical IR plan \n\n {} \n\n", + summary.getLogicalPlan().explain()); + statusCallback + .getQueryLogger() + .debug( + "physical IR plan {}", + summary.getPhysicalPlan().explain()); ResultSchema resultSchema = new ResultSchema(summary.getLogicalPlan()); GremlinResultProcessor listener = @@ -120,6 +128,9 @@ public GremlinExecutor.LifeCycle get() { listener, timeoutConfig, statusCallback.getQueryLogger()); + statusCallback + .getQueryLogger() + .info("[query][submitted]: physical IR submitted"); } // request results from remote engine in a blocking way listener.request(); diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/script/GremlinCalciteScriptEngineFactory.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/script/GremlinCalciteScriptEngineFactory.java index 6398e88340b3..010f73837544 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/script/GremlinCalciteScriptEngineFactory.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/script/GremlinCalciteScriptEngineFactory.java @@ -20,6 +20,7 @@ import com.alibaba.graphscope.common.ir.meta.IrMeta; import com.alibaba.graphscope.common.ir.tools.GraphPlanner; import com.alibaba.graphscope.common.ir.tools.QueryCache; +import com.alibaba.graphscope.gremlin.plugin.QueryLogger; import org.apache.tinkerpop.gremlin.jsr223.AbstractGremlinScriptEngineFactory; import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngine; @@ -72,8 +73,9 @@ public Object eval(String script, ScriptContext ctx) throws ScriptException { QueryCache queryCache = (QueryCache) globalBindings.get("graph.query.cache"); GraphPlanner graphPlanner = (GraphPlanner) globalBindings.get("graph.planner"); IrMeta irMeta = (IrMeta) globalBindings.get("graph.meta"); + QueryLogger queryLogger = (QueryLogger) globalBindings.get("graph.query.logger"); QueryCache.Key cacheKey = - queryCache.createKey(graphPlanner.instance(script, irMeta)); + queryCache.createKey(graphPlanner.instance(script, irMeta, queryLogger)); return queryCache.get(cacheKey); } catch (FrontendException e) { throw e; diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java index b75f83d89166..12f2dbb0584e 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/result/processor/AbstractResultProcessor.java @@ -108,7 +108,9 @@ public void request() { responseProcessor.process(responseStreamIterator.next()); } responseProcessor.finish(); - statusCallback.getQueryLogger().info("[compile]: process results success"); + statusCallback + .getQueryLogger() + .info("[query][response]: processed and sent all responses to the client"); } catch (Throwable t) { // if the exception is caused by InterruptedException, it means a timeout exception has // been thrown by gremlin executor diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java index 0f6524cab81c..0339d27f940d 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/resultx/GremlinResultProcessor.java @@ -92,6 +92,9 @@ public void request() { processRecord(recordStreamIterator.next()); } finishRecord(); + statusCallback + .getQueryLogger() + .info("[query][response]: processed and sent all responses to the client"); } catch (Throwable t) { // if the exception is caused by InterruptedException, it means a timeout exception has // been thrown by gremlin executor diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java index 8e3659e16f5c..f563c17e83a6 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrGremlinServer.java @@ -25,12 +25,15 @@ import com.alibaba.graphscope.common.ir.tools.QueryIdGenerator; import com.alibaba.graphscope.common.manager.IrMetaQueryCallback; import com.alibaba.graphscope.common.manager.RateLimitExecutor; +import com.alibaba.graphscope.common.metric.MetricsTool; import com.alibaba.graphscope.gremlin.Utils; import com.alibaba.graphscope.gremlin.auth.AuthManager; import com.alibaba.graphscope.gremlin.auth.AuthManagerReference; import com.alibaba.graphscope.gremlin.auth.DefaultAuthManager; import com.alibaba.graphscope.gremlin.integration.processor.IrTestOpProcessor; import com.alibaba.graphscope.gremlin.integration.result.GraphProperties; +import com.alibaba.graphscope.gremlin.metric.GremlinExecutorQueueMetric; +import com.alibaba.graphscope.gremlin.metric.GremlinQPSMetric; import com.alibaba.graphscope.gremlin.plugin.processor.IrOpLoader; import com.alibaba.graphscope.gremlin.plugin.processor.IrStandardOpProcessor; import com.alibaba.graphscope.gremlin.plugin.traversal.IrCustomizedTraversalSource; @@ -62,6 +65,7 @@ public class IrGremlinServer implements AutoCloseable { private final GraphTraversalSource g; private final QueryIdGenerator idGenerator; + private final MetricsTool metricsTool; public IrGremlinServer( Configs configs, @@ -71,7 +75,8 @@ public IrGremlinServer( ExecutionClient executionClient, ChannelFetcher channelFetcher, IrMetaQueryCallback metaQueryCallback, - GraphProperties testGraph) { + GraphProperties testGraph, + MetricsTool metricsTool) { this.configs = configs; this.idGenerator = idGenerator; this.queryCache = queryCache; @@ -91,6 +96,7 @@ public IrGremlinServer( this.settings.evaluationTimeout = FrontendConfig.QUERY_EXECUTION_TIMEOUT_MS.get(configs); this.graph = TinkerFactory.createModern(); this.g = this.graph.traversal(IrCustomizedTraversalSource.class); + this.metricsTool = metricsTool; } public void start() throws Exception { @@ -131,6 +137,9 @@ public void start() throws Exception { serverGremlinExecutor.getGraphManager().putTraversalSource("g", graph.traversal()); this.gremlinServer.start().join(); + this.metricsTool + .registerMetric(new GremlinExecutorQueueMetric(this.gremlinServer)) + .registerMetric(new GremlinQPSMetric(this.gremlinServer)); } private ExecutorService createRateLimitExecutor() { diff --git a/interactive_engine/compiler/src/main/java/io/grpc/internal/RpcUtils.java b/interactive_engine/compiler/src/main/java/io/grpc/internal/RpcUtils.java new file mode 100644 index 000000000000..f18e44225118 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/io/grpc/internal/RpcUtils.java @@ -0,0 +1,56 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package io.grpc.internal; + +import com.alibaba.graphscope.gremlin.Utils; + +import io.grpc.Channel; +import io.grpc.ManagedChannel; +import io.grpc.netty.NettyUtils; + +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.Set; + +public class RpcUtils { + public static @Nullable ManagedChannel getDelegateChannel(Channel channel) { + if (channel instanceof ForwardingManagedChannel) { + ManagedChannel delegate = + Utils.getFieldValue(ForwardingManagedChannel.class, channel, "delegate"); + return getDelegateChannel(delegate); + } + return (channel instanceof ManagedChannel) ? (ManagedChannel) channel : null; + } + + public static io.netty.channel.Channel getNettyChannel(ManagedChannel grpcChannel) { + if (grpcChannel instanceof ManagedChannelImpl) { + ManagedChannelImpl channelImpl = (ManagedChannelImpl) grpcChannel; + Set subChannels = + Utils.getFieldValue(ManagedChannelImpl.class, channelImpl, "subchannels"); + if (subChannels != null && !subChannels.isEmpty()) { + ClientTransport transport = subChannels.iterator().next().getTransport(); + while (transport instanceof ForwardingConnectionClientTransport) { + transport = ((ForwardingConnectionClientTransport) transport).delegate(); + } + return NettyUtils.getNettyChannel(transport); + } + } + return null; + } +} diff --git a/interactive_engine/compiler/src/main/java/io/grpc/netty/NettyUtils.java b/interactive_engine/compiler/src/main/java/io/grpc/netty/NettyUtils.java new file mode 100644 index 000000000000..55a70a8be074 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/io/grpc/netty/NettyUtils.java @@ -0,0 +1,37 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package io.grpc.netty; + +import com.alibaba.graphscope.gremlin.Utils; + +import io.grpc.internal.ClientTransport; +import io.netty.channel.Channel; + +import org.checkerframework.checker.nullness.qual.Nullable; + +public class NettyUtils { + public static @Nullable Channel getNettyChannel(ClientTransport transport) { + if (transport instanceof NettyClientTransport) { + NettyClientTransport nettyClientTransport = (NettyClientTransport) transport; + // Access the Netty Channel from NettyClientTransport + return Utils.getFieldValue(NettyClientTransport.class, nettyClientTransport, "channel"); + } + return null; + } +} diff --git a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcChannel.java b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcChannel.java index 29cacd8232cc..1b7502e4ff49 100644 --- a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcChannel.java +++ b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcChannel.java @@ -15,6 +15,7 @@ */ package com.alibaba.pegasus; +import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.opentelemetry.api.OpenTelemetry; @@ -28,9 +29,9 @@ public class RpcChannel { private static final Logger logger = LoggerFactory.getLogger(RpcChannel.class); - private final ManagedChannel channel; + private final Channel channel; - public RpcChannel(ManagedChannel channel) { + public RpcChannel(Channel channel) { this.channel = channel; } @@ -47,12 +48,17 @@ public RpcChannel(String host, int port, OpenTelemetry openTelemetry) { .build(); } - public ManagedChannel getChannel() { + public Channel getChannel() { return channel; } public void shutdown() throws InterruptedException { - this.channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + if (this.channel instanceof ManagedChannel) { + String name = channel.authority(); + ManagedChannel managedChannel = (ManagedChannel) this.channel; + managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + logger.info("rpc channel {} shutdown successfully", name); + } } public String toString() { diff --git a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java index 62939eac6953..e4062d19b03d 100644 --- a/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java +++ b/interactive_engine/executor/engine/pegasus/clients/java/client/src/main/java/com/alibaba/pegasus/RpcClient.java @@ -22,6 +22,7 @@ import com.alibaba.pegasus.service.protocol.PegasusClient.JobRequest; import com.alibaba.pegasus.service.protocol.PegasusClient.JobResponse; +import io.grpc.CallOptions; import io.grpc.Status; import io.grpc.stub.StreamObserver; import io.opentelemetry.api.trace.Span; @@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,6 +51,22 @@ public RpcClient(List channels) { .collect(Collectors.toList()); } + public RpcClient(List channels, Map options) { + this.channels = Objects.requireNonNull(channels); + this.serviceStubs = + channels.stream() + .map( + k -> { + JobServiceStub stub = JobServiceGrpc.newStub(k.getChannel()); + for (Map.Entry entry : + options.entrySet()) { + stub = stub.withOption(entry.getKey(), entry.getValue()); + } + return stub; + }) + .collect(Collectors.toList()); + } + public void submit(JobRequest jobRequest, ResultProcessor processor, long rpcTimeoutMS) { AtomicInteger counter = new AtomicInteger(this.channels.size()); AtomicBoolean finished = new AtomicBoolean(false);