From 3c47ad63518144b1b1a55f60a10f200c2d6e8a15 Mon Sep 17 00:00:00 2001 From: Stephan Schnabel Date: Wed, 9 Aug 2023 14:26:09 +0200 Subject: [PATCH] Add periodic log of missing deployments --- .../io/kokuwa/maven/k3s/mojo/ApplyMojo.java | 56 +++++++++++++++---- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/kokuwa/maven/k3s/mojo/ApplyMojo.java b/src/main/java/io/kokuwa/maven/k3s/mojo/ApplyMojo.java index 21f7f09..ab71156 100644 --- a/src/main/java/io/kokuwa/maven/k3s/mojo/ApplyMojo.java +++ b/src/main/java/io/kokuwa/maven/k3s/mojo/ApplyMojo.java @@ -12,6 +12,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -80,9 +82,14 @@ public void execute() throws MojoExecutionException { // wait for service account, see https://github.com/kubernetes/kubernetes/issues/66689 - Await.await(getLog(), "k3s service account ready").until(() -> !getDocker() - .exec("kubectl", "get", "sa", "default", "--ignore-not-found", "--output=name") - .isEmpty()); + var serviceAccount = new String[] { "kubectl", "get", "sa", "default", "--ignore-not-found", "--output=name" }; + if (getDocker().exec(serviceAccount).isEmpty()) { + getLog().info(""); + getLog().info("No service account found, waiting for sa ..."); + Await.await(getLog(), "k3s service account ready").until(() -> !getDocker().exec(serviceAccount).isEmpty()); + getLog().info("Service account found, continue ..."); + getLog().info(""); + } // wait for node getting ready @@ -108,13 +115,37 @@ public void execute() throws MojoExecutionException { .entrySet().stream().parallel().flatMap(this::waitFor).collect(Collectors.toSet()); try { var success = true; - for (var future : Executors.newWorkStealingPool().invokeAll(tasks)) { - success &= future.get(); + var pool = Executors.newWorkStealingPool(); + var futures = tasks.stream().collect(Collectors.toMap(Entry::getKey, e -> pool.submit(e.getValue()))); + var missing = new AtomicReference<>(futures.keySet().stream().sorted().collect(Collectors.toList())); + pool.submit(() -> { + while (!futures.values().stream().allMatch(Future::isDone)) { + Thread.sleep(5000); + var newMissing = futures.entrySet().stream() + .filter(f -> !f.getValue().isDone()) + .map(Entry::getKey).sorted().collect(Collectors.toList()); + var oldMissing = missing.getAndSet(newMissing); + if (oldMissing.equals(newMissing)) { + getLog().debug("Still waiting for: " + missing); + } else { + getLog().info("Still waiting for: " + missing); + } + } + return null; + }); + + for (var future : futures.entrySet()) { + success &= future.getValue().get(); } + pool.shutdownNow(); + if (!success) { throw new MojoExecutionException("Failed to wait for resources, see previous log"); } - } catch (InterruptedException | ExecutionException e) { + + } catch (InterruptedException | + + ExecutionException e) { throw new MojoExecutionException("Failed to wait for resources", e); } } @@ -143,7 +174,7 @@ private Task apply() throws MojoExecutionException { return getDocker().execWithoutVerify(timeout, command); } - private Stream> waitFor(Entry> entry) { + private Stream>> waitFor(Entry> entry) { try { var kind = entry.getKey(); @@ -170,21 +201,22 @@ private Stream> waitFor(Entry> entry) { tmp.add(name); tmp.add("--namespace=" + namespace); tmp.add("--timeout=" + timeout.getSeconds() + "s"); - return (Callable) () -> { + var representation = "default".equals(namespace) ? name : namespace + "/" + name; + return Map.entry(representation, (Callable) () -> { try { - getLog().debug(kind + " " + namespace + "/" + name + " ... waiting"); + getLog().debug(kind + " " + representation + " ... waiting"); getDocker().exec(timeout.plusSeconds(10), tmp); - getLog().info(kind + " " + namespace + "/" + name + " ... ready"); + getLog().info(kind + " " + representation + " ... ready"); return true; } catch (MojoExecutionException e) { getDocker().exec("kubectl", "get", "--output=yaml", "--namespace=" + namespace, kind, name); return false; } - }; + }); }); } catch (MojoExecutionException e) { - return Stream.of(() -> false); + return Stream.of(Map.entry("exception", () -> false)); } }