Skip to content

Commit

Permalink
[AETHER-1324] Force push pipeline config
Browse files Browse the repository at this point in the history
Introduces a distributed set to "remember" the devices already configured by the cluster.
In this way, we can force push the pipeline after a new installation even though the
pipeline results to be the same.

Change-Id: I9d04b04828daf8d1e6944b5a8e07d580e978ee69
(cherry picked from commit ce191fe)
  • Loading branch information
pierventre committed Mar 24, 2021
1 parent 4cb120b commit c44ccc7
Showing 1 changed file with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@
import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
import org.onosproject.store.primitives.DefaultDistributedSet;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.osgi.service.component.ComponentContext;
Expand Down Expand Up @@ -129,11 +133,20 @@ public class PiPipeconfWatchdogManager
private EventuallyConsistentMap<DeviceId, PipelineStatus> statusMap;
private Map<DeviceId, PipelineStatus> localStatusMap;

// Configured devices by this cluster. We use a set to keep track of all devices for which
// we have pushed the forwarding pipeline config at least once. This guarantees that device
// pipelines are wiped out/reset at least once when starting the cluster, minimizing the risk
// of any stale state from previous runs affecting control operations. Another effect of this
// approach is that the default entries mirror will get populated even though the pipeline results
// to be the same across different ONOS installations.
private static final String CONFIGURED_DEVICES = "onos-pipeconf-configured-set";
private DistributedSet<DeviceId> configuredDevices;

@Activate
public void activate() {
eventDispatcher.addSink(PiPipeconfWatchdogEvent.class, listenerRegistry);
localStatusMap = Maps.newConcurrentMap();
// Init distributed status map.
// Init distributed status map and configured devices set
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(PipelineStatus.class);
Expand All @@ -142,6 +155,12 @@ public void activate() {
.withSerializer(serializer)
.withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
statusMap.addListener(new StatusMapListener());
// Init the set of the configured devices
configuredDevices = new DefaultDistributedSet<>(storageService.<DeviceId>setBuilder()
.withName(CONFIGURED_DEVICES)
.withSerializer(Serializer.using(KryoNamespaces.API))
.build(),
DistributedPrimitive.DEFAULT_OPERATION_TIMEOUT_MILLIS);
// Register component configurable properties.
componentConfigService.registerProperties(getClass());
// Start periodic watchdog task.
Expand Down Expand Up @@ -230,6 +249,7 @@ private void filterAndTriggerTasks(Iterable<Device> devices) {
final boolean success = doSetPipeconfIfRequired(device, pipeconf);
if (success) {
signalStatusReady(device.id());
signalStatusConfigured(device.id());
} else {
signalStatusUnknown(device.id());
}
Expand All @@ -253,7 +273,8 @@ private boolean doSetPipeconfIfRequired(Device device, PiPipeconf pipeconf) {
if (!handshaker.hasConnection()) {
return false;
}
if (Futures.getUnchecked(pipelineProg.isPipeconfSet(pipeconf))) {
if (Futures.getUnchecked(pipelineProg.isPipeconfSet(pipeconf)) &&
configuredDevices.contains(device.id())) {
log.debug("Pipeconf {} already configured on {}",
pipeconf.id(), device.id());
return true;
Expand Down Expand Up @@ -281,6 +302,14 @@ private void signalStatusReady(DeviceId deviceId) {
statusMap.put(deviceId, PipelineStatus.READY);
}

private void signalStatusUnconfigured(DeviceId deviceId) {
configuredDevices.remove(deviceId);
}

private void signalStatusConfigured(DeviceId deviceId) {
configuredDevices.add(deviceId);
}

private boolean isLocalMaster(Device device) {
if (mastershipService.isLocalMaster(device.id())) {
return true;
Expand Down Expand Up @@ -354,6 +383,7 @@ public void event(DeviceEvent event) {
case DEVICE_REMOVED:
case DEVICE_SUSPENDED:
signalStatusUnknown(device.id());
signalStatusUnconfigured(device.id());
break;
case PORT_ADDED:
case PORT_UPDATED:
Expand Down

0 comments on commit c44ccc7

Please sign in to comment.