diff --git a/interactive_engine/assembly/src/conf/groot/logback.xml b/interactive_engine/assembly/src/conf/groot/logback.xml index 9b5b2c7eb66b..b36fb2d8b02d 100644 --- a/interactive_engine/assembly/src/conf/groot/logback.xml +++ b/interactive_engine/assembly/src/conf/groot/logback.xml @@ -1,5 +1,4 @@ - @@ -11,12 +10,12 @@ 1GB - [%d{ISO8601}][%p][%t][${hostname}][%c:%L] %m%n + [%d{ISO8601}][%p][%t][%c:%L] %m%n - [%d{ISO8601}][%p][%t][${hostname}][%c:%L] %m%n + [%d{ISO8601}][%p][%t][%c:%L] %m%n diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java index 2a501a0e63ab..40347b0dba71 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java @@ -48,11 +48,14 @@ import io.grpc.netty.NettyServerBuilder; import org.apache.curator.framework.CuratorFramework; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; public class Frontend extends NodeBase { + private static final Logger logger = LoggerFactory.getLogger(Frontend.class); private CuratorFramework curator; private NodeDiscovery discovery; @@ -63,6 +66,8 @@ public class Frontend extends NodeBase { private ClientService clientService; private AbstractService graphService; + private SnapshotCache snapshotCache; + public Frontend(Configs configs) { super(configs); configs = reConfig(configs); @@ -75,13 +80,9 @@ public Frontend(Configs configs) { } NameResolver.Factory nameResolverFactory = new GrootNameResolverFactory(this.discovery); this.channelManager = new ChannelManager(configs, nameResolverFactory); - SnapshotCache snapshotCache = new SnapshotCache(); - this.metaService = new DefaultMetaService(configs); - MetricsCollector metricsCollector = new MetricsCollector(configs); - RoleClients ingestorWriteClients = - new RoleClients<>(this.channelManager, RoleType.INGESTOR, IngestorWriteClient::new); - FrontendSnapshotService frontendSnapshotService = - new FrontendSnapshotService(snapshotCache); + + snapshotCache = new SnapshotCache(); + RoleClients frontendMetricsCollectClients = new RoleClients<>( this.channelManager, RoleType.FRONTEND, MetricsCollectClient::new); @@ -96,18 +97,21 @@ public Frontend(Configs configs) { frontendMetricsCollectClients, ingestorMetricsCollectClients, storeMetricsCollectClients); + StoreIngestor storeIngestClients = new StoreIngestClients(this.channelManager, RoleType.STORE, StoreIngestClient::new); SchemaWriter schemaWriter = new SchemaWriter( new RoleClients<>( this.channelManager, RoleType.COORDINATOR, SchemaClient::new)); - DdlExecutors ddlExecutors = new DdlExecutors(); + BatchDdlClient batchDdlClient = - new BatchDdlClient(ddlExecutors, snapshotCache, schemaWriter); + new BatchDdlClient(new DdlExecutors(), snapshotCache, schemaWriter); StoreStateFetcher storeStateClients = new StoreStateClients(this.channelManager, RoleType.STORE, StoreStateClient::new); + this.metaService = new DefaultMetaService(configs); + this.clientService = new ClientService( snapshotCache, @@ -116,10 +120,21 @@ public Frontend(Configs configs) { this.metaService, batchDdlClient, storeStateClients); - GrootDdlService clientDdlService = new GrootDdlService(snapshotCache, batchDdlClient); + + FrontendSnapshotService frontendSnapshotService = + new FrontendSnapshotService(snapshotCache); + + MetricsCollector metricsCollector = new MetricsCollector(configs); MetricsCollectService metricsCollectService = new MetricsCollectService(metricsCollector); - WriteSessionGenerator writeSessionGenerator = new WriteSessionGenerator(configs); + this.rpcServer = + new RpcServer( + configs, localNodeProvider, frontendSnapshotService, metricsCollectService); + + GrootDdlService clientDdlService = new GrootDdlService(snapshotCache, batchDdlClient); + EdgeIdGenerator edgeIdGenerator = new DefaultEdgeIdGenerator(configs, this.channelManager); + RoleClients ingestorWriteClients = + new RoleClients<>(this.channelManager, RoleType.INGESTOR, IngestorWriteClient::new); GraphWriter graphWriter = new GraphWriter( snapshotCache, @@ -127,15 +142,13 @@ public Frontend(Configs configs) { this.metaService, ingestorWriteClients, metricsCollector); + WriteSessionGenerator writeSessionGenerator = new WriteSessionGenerator(configs); ClientWriteService clientWriteService = new ClientWriteService(writeSessionGenerator, graphWriter); + RoleClients backupClients = new RoleClients<>(this.channelManager, RoleType.COORDINATOR, BackupClient::new); ClientBackupService clientBackupService = new ClientBackupService(backupClients); - this.rpcServer = - new RpcServer( - configs, localNodeProvider, frontendSnapshotService, metricsCollectService); - this.serviceServer = buildServiceServer( configs, @@ -182,6 +195,15 @@ public void start() { } this.discovery.start(); this.channelManager.start(); + + while (snapshotCache.getSnapshotWithSchema().getGraphDef() == null) { + try { + Thread.sleep(1000); + logger.info("Waiting for schema ready..."); + } catch (InterruptedException e) { + throw new GrootException(e); + } + } this.graphService.start(); try { this.serviceServer.start();