From c44ccc7dd03f06d81ddd60791f87aae634cc85ab Mon Sep 17 00:00:00 2001 From: pierventre Date: Mon, 22 Mar 2021 22:17:21 +0100 Subject: [PATCH] [AETHER-1324] Force push pipeline config Introduces a distributed set to "remember" the devices already configured by the cluster. In this way, we can force push the pipeline after a new installation even though the pipeline results to be the same. Change-Id: I9d04b04828daf8d1e6944b5a8e07d580e978ee69 (cherry picked from commit ce191fe9983698b3988bee8e6e8c41382ab1fce3) --- .../pi/impl/PiPipeconfWatchdogManager.java | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java index 21df7781d84..103d6cf9c92 100644 --- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java +++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java @@ -42,10 +42,14 @@ import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent; import org.onosproject.net.pi.service.PiPipeconfWatchdogListener; import org.onosproject.net.pi.service.PiPipeconfWatchdogService; +import org.onosproject.store.primitives.DefaultDistributedSet; import org.onosproject.store.serializers.KryoNamespaces; +import org.onosproject.store.service.DistributedPrimitive; +import org.onosproject.store.service.DistributedSet; import org.onosproject.store.service.EventuallyConsistentMap; import org.onosproject.store.service.EventuallyConsistentMapEvent; import org.onosproject.store.service.EventuallyConsistentMapListener; +import org.onosproject.store.service.Serializer; import org.onosproject.store.service.StorageService; import org.onosproject.store.service.WallClockTimestamp; import org.osgi.service.component.ComponentContext; @@ -129,11 +133,20 @@ public class PiPipeconfWatchdogManager private EventuallyConsistentMap statusMap; private Map localStatusMap; + // Configured devices by this cluster. We use a set to keep track of all devices for which + // we have pushed the forwarding pipeline config at least once. This guarantees that device + // pipelines are wiped out/reset at least once when starting the cluster, minimizing the risk + // of any stale state from previous runs affecting control operations. Another effect of this + // approach is that the default entries mirror will get populated even though the pipeline results + // to be the same across different ONOS installations. + private static final String CONFIGURED_DEVICES = "onos-pipeconf-configured-set"; + private DistributedSet configuredDevices; + @Activate public void activate() { eventDispatcher.addSink(PiPipeconfWatchdogEvent.class, listenerRegistry); localStatusMap = Maps.newConcurrentMap(); - // Init distributed status map. + // Init distributed status map and configured devices set KryoNamespace.Builder serializer = KryoNamespace.newBuilder() .register(KryoNamespaces.API) .register(PipelineStatus.class); @@ -142,6 +155,12 @@ public void activate() { .withSerializer(serializer) .withTimestampProvider((k, v) -> new WallClockTimestamp()).build(); statusMap.addListener(new StatusMapListener()); + // Init the set of the configured devices + configuredDevices = new DefaultDistributedSet<>(storageService.setBuilder() + .withName(CONFIGURED_DEVICES) + .withSerializer(Serializer.using(KryoNamespaces.API)) + .build(), + DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS); // Register component configurable properties. componentConfigService.registerProperties(getClass()); // Start periodic watchdog task. @@ -230,6 +249,7 @@ private void filterAndTriggerTasks(Iterable devices) { final boolean success = doSetPipeconfIfRequired(device, pipeconf); if (success) { signalStatusReady(device.id()); + signalStatusConfigured(device.id()); } else { signalStatusUnknown(device.id()); } @@ -253,7 +273,8 @@ private boolean doSetPipeconfIfRequired(Device device, PiPipeconf pipeconf) { if (!handshaker.hasConnection()) { return false; } - if (Futures.getUnchecked(pipelineProg.isPipeconfSet(pipeconf))) { + if (Futures.getUnchecked(pipelineProg.isPipeconfSet(pipeconf)) && + configuredDevices.contains(device.id())) { log.debug("Pipeconf {} already configured on {}", pipeconf.id(), device.id()); return true; @@ -281,6 +302,14 @@ private void signalStatusReady(DeviceId deviceId) { statusMap.put(deviceId, PipelineStatus.READY); } + private void signalStatusUnconfigured(DeviceId deviceId) { + configuredDevices.remove(deviceId); + } + + private void signalStatusConfigured(DeviceId deviceId) { + configuredDevices.add(deviceId); + } + private boolean isLocalMaster(Device device) { if (mastershipService.isLocalMaster(device.id())) { return true; @@ -354,6 +383,7 @@ public void event(DeviceEvent event) { case DEVICE_REMOVED: case DEVICE_SUSPENDED: signalStatusUnknown(device.id()); + signalStatusUnconfigured(device.id()); break; case PORT_ADDED: case PORT_UPDATED: