Skip to content

Commit

Permalink
fix(interactive): block service until graph schema is synced (#3280)
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 authored Oct 11, 2023
1 parent d8b0185 commit caabbbd
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 18 deletions.
5 changes: 2 additions & 3 deletions interactive_engine/assembly/src/conf/groot/logback.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
<configuration scan="true" scanPeriod="30 seconds">
<define name="hostname" class="ch.qos.logback.core.property.CanonicalHostNamePropertyDefiner"/>
<property name="log_dir" value="${log.dir:-/var/log/graphscope}"/>
<property name="log_name" value="${log.name:-groot}"/>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
Expand All @@ -11,12 +10,12 @@
<totalSizeCap>1GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>[%d{ISO8601}][%p][%t][${hostname}][%c:%L] %m%n</pattern>
<pattern>[%d{ISO8601}][%p][%t][%c:%L] %m%n</pattern>
</encoder>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%d{ISO8601}][%p][%t][${hostname}][%c:%L] %m%n</pattern>
<pattern>[%d{ISO8601}][%p][%t][%c:%L] %m%n</pattern>
</encoder>
</appender>
<appender name="Metric" class="ch.qos.logback.core.rolling.RollingFileAppender">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@
import io.grpc.netty.NettyServerBuilder;

import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class Frontend extends NodeBase {
private static final Logger logger = LoggerFactory.getLogger(Frontend.class);

private CuratorFramework curator;
private NodeDiscovery discovery;
Expand All @@ -63,6 +66,8 @@ public class Frontend extends NodeBase {
private ClientService clientService;
private AbstractService graphService;

private SnapshotCache snapshotCache;

public Frontend(Configs configs) {
super(configs);
configs = reConfig(configs);
Expand All @@ -75,13 +80,9 @@ public Frontend(Configs configs) {
}
NameResolver.Factory nameResolverFactory = new GrootNameResolverFactory(this.discovery);
this.channelManager = new ChannelManager(configs, nameResolverFactory);
SnapshotCache snapshotCache = new SnapshotCache();
this.metaService = new DefaultMetaService(configs);
MetricsCollector metricsCollector = new MetricsCollector(configs);
RoleClients<IngestorWriteClient> ingestorWriteClients =
new RoleClients<>(this.channelManager, RoleType.INGESTOR, IngestorWriteClient::new);
FrontendSnapshotService frontendSnapshotService =
new FrontendSnapshotService(snapshotCache);

snapshotCache = new SnapshotCache();

RoleClients<MetricsCollectClient> frontendMetricsCollectClients =
new RoleClients<>(
this.channelManager, RoleType.FRONTEND, MetricsCollectClient::new);
Expand All @@ -96,18 +97,21 @@ public Frontend(Configs configs) {
frontendMetricsCollectClients,
ingestorMetricsCollectClients,
storeMetricsCollectClients);

StoreIngestor storeIngestClients =
new StoreIngestClients(this.channelManager, RoleType.STORE, StoreIngestClient::new);
SchemaWriter schemaWriter =
new SchemaWriter(
new RoleClients<>(
this.channelManager, RoleType.COORDINATOR, SchemaClient::new));
DdlExecutors ddlExecutors = new DdlExecutors();

BatchDdlClient batchDdlClient =
new BatchDdlClient(ddlExecutors, snapshotCache, schemaWriter);
new BatchDdlClient(new DdlExecutors(), snapshotCache, schemaWriter);
StoreStateFetcher storeStateClients =
new StoreStateClients(this.channelManager, RoleType.STORE, StoreStateClient::new);

this.metaService = new DefaultMetaService(configs);

this.clientService =
new ClientService(
snapshotCache,
Expand All @@ -116,26 +120,35 @@ public Frontend(Configs configs) {
this.metaService,
batchDdlClient,
storeStateClients);
GrootDdlService clientDdlService = new GrootDdlService(snapshotCache, batchDdlClient);

FrontendSnapshotService frontendSnapshotService =
new FrontendSnapshotService(snapshotCache);

MetricsCollector metricsCollector = new MetricsCollector(configs);
MetricsCollectService metricsCollectService = new MetricsCollectService(metricsCollector);
WriteSessionGenerator writeSessionGenerator = new WriteSessionGenerator(configs);
this.rpcServer =
new RpcServer(
configs, localNodeProvider, frontendSnapshotService, metricsCollectService);

GrootDdlService clientDdlService = new GrootDdlService(snapshotCache, batchDdlClient);

EdgeIdGenerator edgeIdGenerator = new DefaultEdgeIdGenerator(configs, this.channelManager);
RoleClients<IngestorWriteClient> ingestorWriteClients =
new RoleClients<>(this.channelManager, RoleType.INGESTOR, IngestorWriteClient::new);
GraphWriter graphWriter =
new GraphWriter(
snapshotCache,
edgeIdGenerator,
this.metaService,
ingestorWriteClients,
metricsCollector);
WriteSessionGenerator writeSessionGenerator = new WriteSessionGenerator(configs);
ClientWriteService clientWriteService =
new ClientWriteService(writeSessionGenerator, graphWriter);

RoleClients<BackupClient> backupClients =
new RoleClients<>(this.channelManager, RoleType.COORDINATOR, BackupClient::new);
ClientBackupService clientBackupService = new ClientBackupService(backupClients);
this.rpcServer =
new RpcServer(
configs, localNodeProvider, frontendSnapshotService, metricsCollectService);

this.serviceServer =
buildServiceServer(
configs,
Expand Down Expand Up @@ -182,6 +195,15 @@ public void start() {
}
this.discovery.start();
this.channelManager.start();

while (snapshotCache.getSnapshotWithSchema().getGraphDef() == null) {
try {
Thread.sleep(1000);
logger.info("Waiting for schema ready...");
} catch (InterruptedException e) {
throw new GrootException(e);
}
}
this.graphService.start();
try {
this.serviceServer.start();
Expand Down

0 comments on commit caabbbd

Please sign in to comment.