Skip to content

Commit

Permalink
Dynamically add or remove gateway node according to GW annotation
Browse files Browse the repository at this point in the history
Change-Id: Ic7ac799eda0c1d028e934cc1bfd07af34e714e18
(cherry picked from commit 91358d6)
  • Loading branch information
gunine committed Mar 22, 2021
1 parent 9793ec4 commit 517597a
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
import org.onosproject.kubevirtnode.api.KubevirtNode;
import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
import org.onosproject.kubevirtnode.api.KubevirtNodeService;
import org.onosproject.net.Device;
import org.onosproject.net.PortNumber;
Expand Down Expand Up @@ -118,23 +120,28 @@ public class KubevirtFloatingIpHandler {
private ApplicationId appId;
private NodeId localNodeId;

private final InternalRouterEventListener kubevirtRouterlistener =
private final InternalRouterEventListener kubevirtRouterListener =
new InternalRouterEventListener();

private final InternalNodeEventListener kubevirtNodeListener =
new InternalNodeEventListener();

@Activate
protected void activate() {
appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
localNodeId = clusterService.getLocalNode().id();
leadershipService.runForLeadership(appId.name());
kubevirtRouterService.addListener(kubevirtRouterlistener);
kubevirtRouterService.addListener(kubevirtRouterListener);
kubevirtNodeService.addListener(kubevirtNodeListener);

log.info("Started");
}

@Deactivate
protected void deactivate() {
leadershipService.withdraw(appId.name());
kubevirtRouterService.removeListener(kubevirtRouterlistener);
kubevirtRouterService.removeListener(kubevirtRouterListener);
kubevirtNodeService.removeListener(kubevirtNodeListener);

eventExecutor.shutdown();

Expand Down Expand Up @@ -204,6 +211,7 @@ private void setFloatingIpArpResponseRules(KubevirtRouter router,
PRE_FLAT_TABLE,
install);
}

private KubevirtPort getKubevirtPort(KubevirtFloatingIp floatingIp) {

return kubevirtPortService.ports().stream()
Expand Down Expand Up @@ -352,7 +360,6 @@ private boolean isRelevantHelper() {
return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
}


@Override
public void event(KubevirtRouterEvent event) {
switch (event.type()) {
Expand Down Expand Up @@ -385,4 +392,44 @@ private void processFloatingIpDisassociation(KubevirtRouter router, KubevirtFloa
setFloatingIpRules(router, floatingIp, false);
}
}

private class InternalNodeEventListener implements KubevirtNodeListener {

private boolean isRelevantHelper() {
return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
}

@Override
public void event(KubevirtNodeEvent event) {
switch (event.type()) {
case KUBEVIRT_NODE_COMPLETE:
eventExecutor.execute(() -> processNodeCompletion(event.subject()));
break;
case KUBEVIRT_NODE_REMOVED:
case KUBEVIRT_NODE_INCOMPLETE:
default:
break;
}
}

private void processNodeCompletion(KubevirtNode node) {
if (!isRelevantHelper()) {
return;
}

kubevirtRouterService.floatingIps().forEach(fip -> {
KubevirtRouter router = kubevirtRouterService.router(fip.routerName());
if (router != null) {
KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
if (electedGw == null) {
return;
}

if (electedGw.hostname().equals(node.hostname())) {
setFloatingIpRules(router, fip, true);
}
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private void setDefaultGatewayRuleToWorkerNodeWhenNodeCreated(KubevirtNode node,
return;
}

setDefaulGatewayRuleToWorkerNodeTunBridge(router, network, electedGw.intgBridge(), node, true);
setDefaultGatewayRuleToWorkerNodeTunBridge(router, network, electedGw.intgBridge(), node, true);
}

private void setDefaultRulesForTenantNetwork(KubevirtNode node, KubevirtNetwork network) {
Expand Down Expand Up @@ -428,7 +428,7 @@ private void initGatewayNodeForInternalNetwork(KubevirtNetwork network,
setGatewayIcmpRuleForTenantInternalNetwork(router, network, TENANT_ICMP_TABLE,
electedGateway.intgBridge(),
network.tenantDeviceId(node.hostname()), install);
setDefaulGatewayRuleToWorkerNodeTunBridge(router, network,
setDefaultGatewayRuleToWorkerNodeTunBridge(router, network,
electedGateway.intgBridge(), node, install);
});
break;
Expand All @@ -446,11 +446,11 @@ private void initGatewayNodeForInternalNetwork(KubevirtNetwork network,
}
}

private void setDefaulGatewayRuleToWorkerNodeTunBridge(KubevirtRouter router,
KubevirtNetwork network,
DeviceId gwDeviceId,
KubevirtNode workerNode,
boolean install) {
private void setDefaultGatewayRuleToWorkerNodeTunBridge(KubevirtRouter router,
KubevirtNetwork network,
DeviceId gwDeviceId,
KubevirtNode workerNode,
boolean install) {
MacAddress routerMacAddress = getRouterMacAddress(router);

if (routerMacAddress == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
import org.onosproject.kubevirtnode.api.KubevirtNode;
import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
import org.onosproject.kubevirtnode.api.KubevirtNodeService;
import org.onosproject.net.Device;
import org.onosproject.net.PortNumber;
Expand Down Expand Up @@ -140,6 +142,9 @@ public class KubevirtRoutingSnatHandler {
private final InternalRouterEventListener kubevirtRouterlistener =
new InternalRouterEventListener();

private final InternalNodeEventListener kubevirtNodeListener =
new InternalNodeEventListener();

private ApplicationId appId;
private NodeId localNodeId;

Expand All @@ -151,13 +156,15 @@ protected void activate() {

kubevirtPortService.addListener(kubevirtPortListener);
kubevirtRouterService.addListener(kubevirtRouterlistener);
kubevirtNodeService.addListener(kubevirtNodeListener);

log.info("Started");
}

@Deactivate
protected void deactivate() {
leadershipService.withdraw(appId.name());
kubevirtNodeService.removeListener(kubevirtNodeListener);
kubevirtPortService.removeListener(kubevirtPortListener);
kubevirtRouterService.removeListener(kubevirtRouterlistener);

Expand Down Expand Up @@ -720,4 +727,12 @@ private void processPortDeletion(KubevirtPort kubevirtPort) {
}
}
}

private class InternalNodeEventListener implements KubevirtNodeListener {

@Override
public void event(KubevirtNodeEvent event) {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -601,13 +601,21 @@ private void provisionPhysicalInterfaces(KubevirtNode node) {
log.info("Creating physnet bridge {}", bridgeName);
log.info("Creating patch ports for physnet {}", bridgeName);
} else {
// in case physical bridge exists, but patch port is missing on br-int,
// we will add patch port to connect br-int with physical bridge
// in case physical bridge exists, but patch port is missing,
// we will add patch port to connect br-physnet with physical bridge
if (!hasPhyPatchPort(node, patchPortName)) {
createPhysicalPatchPorts(node, pi);

log.info("Creating patch ports for physnet {}", bridgeName);
}

// in case physical bridge exists, but physnet interface is missing,
// we will add the physnet interface to connect br-physnet to the external
if (!hasPhyIntf(node, pi.intf())) {
attachPhysicalPort(node, pi);

log.info("Attaching external ports for physnet {}", bridgeName);
}
}
});
}
Expand Down Expand Up @@ -979,6 +987,9 @@ public void event(KubevirtNodeEvent event) {
if (!isRelevantHelper()) {
return;
}
if (event.subject() == null) {
return;
}
bootstrapNode(event.subject());
});
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.GATEWAY;
import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
import static org.onosproject.kubevirtnode.api.KubevirtNodeService.APP_ID;
import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
Expand Down Expand Up @@ -203,18 +204,29 @@ private void processModification(Node node) {
log.trace("Process node {} updating event from API server.",
node.getMetadata().getName());

KubevirtNode original = buildKubevirtNode(node);
KubevirtNode existing = kubevirtNodeAdminService.node(node.getMetadata().getName());

if (existing != null) {
KubevirtNode kubevirtNode = buildKubevirtNode(node);
// if a master node is annotated as a gateway node, we simply add
// the node into the cluster
if (original.type() == GATEWAY && existing == null) {
kubevirtNodeAdminService.createNode(original);
}

// if a gateway annotation removed from the master node, we simply remove
// the node from the cluster
if (original.type() == MASTER && existing != null && existing.type() == GATEWAY) {
kubevirtNodeAdminService.removeNode(original.hostname());
}

if (existing != null) {
// we update the kubevirt node and re-run bootstrapping,
// only if the updated node has different phyInts and data IP
// if the updated node has different phyInts and data IP
// this means we assume that the node's hostname, type and mgmt IP
// are immutable
if (!kubevirtNode.phyIntfs().equals(existing.phyIntfs()) ||
!kubevirtNode.dataIp().equals(existing.dataIp())) {
kubevirtNodeAdminService.updateNode(kubevirtNode.updateState(INIT));
if (!original.phyIntfs().equals(existing.phyIntfs()) ||
!original.dataIp().equals(existing.dataIp())) {
kubevirtNodeAdminService.updateNode(original.updateState(INIT));
}
}
}
Expand Down

0 comments on commit 517597a

Please sign in to comment.