Skip to content

Commit

Permalink
[grid] shutdown the server backend on stop
Browse files Browse the repository at this point in the history
  • Loading branch information
joerg1985 authored and jkim2492 committed Nov 17, 2024
1 parent 3dba8b2 commit 5d4790c
Show file tree
Hide file tree
Showing 15 changed files with 200 additions and 38 deletions.
18 changes: 16 additions & 2 deletions java/src/org/openqa/selenium/grid/TemplateGridServerCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.openqa.selenium.grid;

import java.io.Closeable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -47,7 +48,17 @@ public Server<?> asServer(Config initialConfig) {
Handlers handler = createHandlers(config);

return new NettyServer(
new BaseServerOptions(config), handler.httpHandler, handler.websocketHandler);
new BaseServerOptions(config), handler.httpHandler, handler.websocketHandler) {

@Override
public void stop() {
try {
handler.close();
} finally {
super.stop();
}
}
};
}

private static final String GRAPHQL = "/graphql";
Expand Down Expand Up @@ -77,7 +88,7 @@ protected static Routable baseRoute(String prefix, Route route) {

protected abstract Handlers createHandlers(Config config);

public static class Handlers {
public abstract static class Handlers implements Closeable {
public final HttpHandler httpHandler;
public final BiFunction<String, Consumer<Message>, Optional<Consumer<Message>>>
websocketHandler;
Expand All @@ -89,5 +100,8 @@ public Handlers(
this.websocketHandler =
websocketHandler == null ? (str, sink) -> Optional.empty() : websocketHandler;
}

@Override
public abstract void close();
}
}
12 changes: 11 additions & 1 deletion java/src/org/openqa/selenium/grid/commands/EventBusCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,17 @@ public Server<?> asServer(Config initialConfig) {
return httpResponse(false, "Status checking was interrupted");
}
}),
Route.get("/readyz").to(() -> req -> new HttpResponse().setStatus(HTTP_NO_CONTENT))));
Route.get("/readyz").to(() -> req -> new HttpResponse().setStatus(HTTP_NO_CONTENT)))) {

@Override
public void stop() {
try {
bus.close();
} finally {
super.stop();
}
}
};
}

@Override
Expand Down
19 changes: 12 additions & 7 deletions java/src/org/openqa/selenium/grid/commands/Hub.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.openqa.selenium.grid.TemplateGridServerCommand;
import org.openqa.selenium.grid.config.Config;
import org.openqa.selenium.grid.config.Role;
import org.openqa.selenium.grid.distributor.Distributor;
import org.openqa.selenium.grid.distributor.config.DistributorOptions;
import org.openqa.selenium.grid.distributor.local.LocalDistributor;
import org.openqa.selenium.grid.graphql.GraphqlHandler;
Expand All @@ -57,9 +56,7 @@
import org.openqa.selenium.grid.server.EventBusOptions;
import org.openqa.selenium.grid.server.NetworkOptions;
import org.openqa.selenium.grid.server.Server;
import org.openqa.selenium.grid.sessionmap.SessionMap;
import org.openqa.selenium.grid.sessionmap.local.LocalSessionMap;
import org.openqa.selenium.grid.sessionqueue.NewSessionQueue;
import org.openqa.selenium.grid.sessionqueue.config.NewSessionQueueOptions;
import org.openqa.selenium.grid.sessionqueue.local.LocalNewSessionQueue;
import org.openqa.selenium.grid.web.CombinedHandler;
Expand Down Expand Up @@ -120,7 +117,7 @@ protected Handlers createHandlers(Config config) {

CombinedHandler handler = new CombinedHandler();

SessionMap sessions = new LocalSessionMap(tracer, bus);
LocalSessionMap sessions = new LocalSessionMap(tracer, bus);
handler.addHandler(sessions);

BaseServerOptions serverOptions = new BaseServerOptions(config);
Expand All @@ -141,7 +138,7 @@ protected Handlers createHandlers(Config config) {

DistributorOptions distributorOptions = new DistributorOptions(config);
NewSessionQueueOptions newSessionRequestOptions = new NewSessionQueueOptions(config);
NewSessionQueue queue =
LocalNewSessionQueue queue =
new LocalNewSessionQueue(
tracer,
distributorOptions.getSlotMatcher(),
Expand All @@ -151,7 +148,7 @@ protected Handlers createHandlers(Config config) {
newSessionRequestOptions.getBatchSize());
handler.addHandler(queue);

Distributor distributor =
LocalDistributor distributor =
new LocalDistributor(
tracer,
bus,
Expand Down Expand Up @@ -212,7 +209,15 @@ protected Handlers createHandlers(Config config) {
// these checks
httpHandler = combine(httpHandler, Route.get("/readyz").to(() -> readinessCheck));

return new Handlers(httpHandler, new ProxyWebsocketsIntoGrid(clientFactory, sessions));
return new Handlers(httpHandler, new ProxyWebsocketsIntoGrid(clientFactory, sessions)) {
@Override
public void close() {
router.close();
distributor.close();
queue.close();
bus.close();
}
};
}

@Override
Expand Down
23 changes: 14 additions & 9 deletions java/src/org/openqa/selenium/grid/commands/Standalone.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.openqa.selenium.grid.server.Server;
import org.openqa.selenium.grid.sessionmap.SessionMap;
import org.openqa.selenium.grid.sessionmap.local.LocalSessionMap;
import org.openqa.selenium.grid.sessionqueue.NewSessionQueue;
import org.openqa.selenium.grid.sessionqueue.config.NewSessionQueueOptions;
import org.openqa.selenium.grid.sessionqueue.local.LocalNewSessionQueue;
import org.openqa.selenium.grid.web.CombinedHandler;
Expand Down Expand Up @@ -145,7 +144,7 @@ protected Handlers createHandlers(Config config) {

DistributorOptions distributorOptions = new DistributorOptions(config);
NewSessionQueueOptions newSessionRequestOptions = new NewSessionQueueOptions(config);
NewSessionQueue queue =
LocalNewSessionQueue queue =
new LocalNewSessionQueue(
tracer,
distributorOptions.getSlotMatcher(),
Expand All @@ -155,7 +154,7 @@ protected Handlers createHandlers(Config config) {
newSessionRequestOptions.getBatchSize());
combinedHandler.addHandler(queue);

Distributor distributor =
LocalDistributor distributor =
new LocalDistributor(
tracer,
bus,
Expand All @@ -171,9 +170,8 @@ protected Handlers createHandlers(Config config) {
distributorOptions.getSlotMatcher());
combinedHandler.addHandler(distributor);

Routable router =
new Router(tracer, clientFactory, sessions, queue, distributor)
.with(networkOptions.getSpecComplianceChecks());
Router router = new Router(tracer, clientFactory, sessions, queue, distributor);
Routable routerWithSpecChecks = router.with(networkOptions.getSpecComplianceChecks());

HttpHandler readinessCheck =
req -> {
Expand All @@ -192,8 +190,8 @@ protected Handlers createHandlers(Config config) {

Routable appendRoute =
Stream.of(
baseRoute(subPath, combine(router)),
hubRoute(subPath, combine(router)),
baseRoute(subPath, combine(routerWithSpecChecks)),
hubRoute(subPath, combine(routerWithSpecChecks)),
graphqlRoute(subPath, () -> graphqlHandler))
.reduce(Route::combine)
.get();
Expand All @@ -218,7 +216,14 @@ protected Handlers createHandlers(Config config) {
httpHandler = combine(httpHandler, Route.get("/readyz").to(() -> readinessCheck));
Node node = createNode(config, bus, distributor, combinedHandler);

return new Handlers(httpHandler, new ProxyNodeWebsockets(clientFactory, node, subPath));
return new Handlers(httpHandler, new ProxyNodeWebsockets(clientFactory, node, subPath)) {
@Override
public void close() {
router.close();
distributor.close();
queue.close();
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.MediaType;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.Set;
import java.util.logging.Level;
Expand Down Expand Up @@ -118,7 +121,18 @@ protected Handlers createHandlers(Config config) {
"message",
"Distributor is ready"))))),
get("/readyz").to(() -> readinessCheck)),
null);
null) {
@Override
public void close() {
if (distributor instanceof Closeable) {
try {
((Closeable) distributor).close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.openqa.selenium.grid.distributor.local;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static org.openqa.selenium.concurrent.ExecutorServices.shutdownGracefully;
import static org.openqa.selenium.grid.data.Availability.DOWN;
import static org.openqa.selenium.grid.data.Availability.DRAINING;
import static org.openqa.selenium.grid.data.Availability.UP;
Expand All @@ -34,6 +35,7 @@
import com.google.common.collect.ImmutableSet;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import java.io.Closeable;
import java.io.UncheckedIOException;
import java.net.URI;
import java.time.Duration;
Expand All @@ -45,7 +47,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -118,7 +119,7 @@
@ManagedService(
objectName = "org.seleniumhq.grid:type=Distributor,name=LocalDistributor",
description = "Grid 4 node distributor")
public class LocalDistributor extends Distributor implements AutoCloseable {
public class LocalDistributor extends Distributor implements Closeable {

private static final Logger LOG = Logger.getLogger(LocalDistributor.class.getName());

Expand Down Expand Up @@ -165,7 +166,7 @@ public class LocalDistributor extends Distributor implements AutoCloseable {
return thread;
});

private final Executor sessionCreatorExecutor;
private final ExecutorService sessionCreatorExecutor;

private final NewSessionQueue sessionQueue;

Expand Down Expand Up @@ -752,9 +753,10 @@ public int getIdleSlots() {
@Override
public void close() {
LOG.info("Shutting down Distributor executor service");
purgeDeadNodesService.shutdown();
nodeHealthCheckService.shutdown();
newSessionService.shutdown();
shutdownGracefully("Local Distributor - Purge Dead Nodes", purgeDeadNodesService);
shutdownGracefully("Local Distributor - Node Health Check", nodeHealthCheckService);
shutdownGracefully("Local Distributor - New Session Queue", newSessionService);
shutdownGracefully("Local Distributor - Session Creation", sessionCreatorExecutor);
}

private class NewSessionRunnable implements Runnable {
Expand Down
25 changes: 24 additions & 1 deletion java/src/org/openqa/selenium/grid/node/httpd/NodeServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import com.google.common.net.MediaType;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -172,7 +175,18 @@ protected Handlers createHandlers(Config config) {
Route httpHandler = Route.combine(node, get("/readyz").to(() -> readinessCheck));

return new Handlers(
httpHandler, new ProxyNodeWebsockets(clientFactory, node, nodeOptions.getGridSubPath()));
httpHandler, new ProxyNodeWebsockets(clientFactory, node, nodeOptions.getGridSubPath())) {
@Override
public void close() {
if (node instanceof Closeable) {
try {
((Closeable) node).close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
};
}

@Override
Expand Down Expand Up @@ -225,6 +239,15 @@ public NettyServer start() {

return this;
}

@Override
public void stop() {
try {
handler.close();
} finally {
super.stop();
}
}
};
}

Expand Down
27 changes: 26 additions & 1 deletion java/src/org/openqa/selenium/grid/node/local/LocalNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.openqa.selenium.grid.node.local;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static org.openqa.selenium.concurrent.ExecutorServices.shutdownGracefully;
import static org.openqa.selenium.grid.data.Availability.DOWN;
import static org.openqa.selenium.grid.data.Availability.DRAINING;
import static org.openqa.selenium.grid.data.Availability.UP;
Expand All @@ -37,6 +38,7 @@
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -114,7 +116,7 @@
@ManagedService(
objectName = "org.seleniumhq.grid:type=Node,name=LocalNode",
description = "Node running the webdriver sessions.")
public class LocalNode extends Node {
public class LocalNode extends Node implements Closeable {

private static final Json JSON = new Json();
private static final Logger LOG = Logger.getLogger(LocalNode.class.getName());
Expand All @@ -139,6 +141,7 @@ public class LocalNode extends Node {
private final Cache<SessionId, UUID> sessionToDownloadsDir;
private final AtomicInteger pendingSessions = new AtomicInteger();
private final AtomicInteger sessionCount = new AtomicInteger();
private final Runnable shutdown;

protected LocalNode(
Tracer tracer,
Expand Down Expand Up @@ -301,6 +304,23 @@ protected LocalNode(
heartbeatPeriod.getSeconds(),
TimeUnit.SECONDS);

shutdown =
() -> {
if (heartbeatNodeService.isShutdown()) return;

shutdownGracefully(
"Local Node - Session Cleanup " + externalUri, sessionCleanupNodeService);
shutdownGracefully(
"UploadTempFile Cleanup Node " + externalUri, uploadTempFileCleanupNodeService);
shutdownGracefully(
"DownloadTempFile Cleanup Node " + externalUri, downloadTempFileCleanupNodeService);
shutdownGracefully("HeartBeat Node " + externalUri, heartbeatNodeService);

// ensure we do not leak running browsers
currentSessions.invalidateAll();
currentSessions.cleanUp();
};

Runtime.getRuntime()
.addShutdownHook(
new Thread(
Expand All @@ -311,6 +331,11 @@ protected LocalNode(
new JMXHelper().register(this);
}

@Override
public void close() {
shutdown.run();
}

private void stopTimedOutSession(RemovalNotification<SessionId, SessionSlot> notification) {
if (notification.getKey() != null && notification.getValue() != null) {
SessionSlot slot = notification.getValue();
Expand Down
Loading

0 comments on commit 5d4790c

Please sign in to comment.