Skip to content

Commit

Permalink
Merge "[518434] Fixes index listener notification order for fall-thro…
Browse files Browse the repository at this point in the history
…ugh" into 1.6-maintenance
  • Loading branch information
bergmanngabor authored and Gerrit Code Review @ Eclipse.org committed Jul 24, 2017
2 parents a71e103 + f55d4c9 commit 2cc7814
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import org.eclipse.viatra.query.runtime.matchers.tuple.Tuple;
import org.eclipse.viatra.query.runtime.matchers.tuple.TupleMask;
import org.eclipse.viatra.query.runtime.rete.index.SpecializedProjectionIndexer.ListenerSubscription;
import org.eclipse.viatra.query.runtime.rete.network.Direction;
import org.eclipse.viatra.query.runtime.rete.network.Node;
import org.eclipse.viatra.query.runtime.rete.network.ReteContainer;
Expand All @@ -28,13 +30,17 @@
* node). Do not attach parents directly!
*
* @author Gabor Bergmann
* @noimplement Rely on the provided implementations
* @noreference Use only via standard Node and Indexer interfaces
* @noinstantiate This class is not intended to be instantiated by clients.
*/
public abstract class IdentityIndexer extends SpecializedProjectionIndexer {

protected abstract Collection<Tuple> getTuples();

public IdentityIndexer(ReteContainer reteContainer, int tupleWidth, Supplier parent, Node activeNode) {
super(reteContainer, TupleMask.identity(tupleWidth), parent, activeNode);
public IdentityIndexer(ReteContainer reteContainer, int tupleWidth, Supplier parent,
Node activeNode, List<ListenerSubscription> sharedSubscriptionList) {
super(reteContainer, TupleMask.identity(tupleWidth), parent, activeNode, sharedSubscriptionList);
}

public Collection<Tuple> get(Tuple signature) {
Expand All @@ -60,8 +66,9 @@ public Iterator<Tuple> iterator() {
return getTuples().iterator();
}

public void propagate(Direction direction, Tuple updateElement) {
propagate(direction, updateElement, updateElement, true);
@Override
public void propagateToListener(IndexerListener listener, Direction direction, Tuple updateElement) {
listener.notifyIndexerUpdate(direction, updateElement, updateElement, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
package org.eclipse.viatra.query.runtime.rete.index;

import java.util.Collection;
import java.util.List;

import org.eclipse.viatra.query.runtime.matchers.tuple.Tuple;
import org.eclipse.viatra.query.runtime.rete.index.SpecializedProjectionIndexer.ListenerSubscription;
import org.eclipse.viatra.query.runtime.rete.network.Receiver;
import org.eclipse.viatra.query.runtime.rete.network.ReteContainer;
import org.eclipse.viatra.query.runtime.rete.network.Supplier;
Expand All @@ -23,6 +25,9 @@
* space. Can only exist in connection with a memory, and must be operated by another node. Do not attach parents
* directly!
*
* @noimplement Rely on the provided implementations
* @noreference Use only via standard Node and Indexer interfaces
* @noinstantiate This class is not intended to be instantiated by clients.
* @author Gabor Bergmann
*/

Expand All @@ -38,9 +43,9 @@ public class MemoryIdentityIndexer extends IdentityIndexer {
* @param parent
* the parent node that owns the memory
*/
public MemoryIdentityIndexer(ReteContainer reteContainer, int tupleWidth, Collection<Tuple> memory,
Supplier parent, Receiver activeNode) {
super(reteContainer, tupleWidth, parent, activeNode);
public MemoryIdentityIndexer(ReteContainer reteContainer, int tupleWidth, Collection<Tuple> memory, Supplier parent,
Receiver activeNode, List<ListenerSubscription> sharedSubscriptionList) {
super(reteContainer, tupleWidth, parent, activeNode, sharedSubscriptionList);
this.memory = memory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
package org.eclipse.viatra.query.runtime.rete.index;

import java.util.Collection;
import java.util.List;

import org.eclipse.viatra.query.runtime.matchers.tuple.Tuple;
import org.eclipse.viatra.query.runtime.rete.index.SpecializedProjectionIndexer.ListenerSubscription;
import org.eclipse.viatra.query.runtime.rete.network.Receiver;
import org.eclipse.viatra.query.runtime.rete.network.ReteContainer;
import org.eclipse.viatra.query.runtime.rete.network.Supplier;
Expand All @@ -24,6 +26,9 @@
* directly!
*
* @author Gabor Bergmann
* @noimplement Rely on the provided implementations
* @noreference Use only via standard Node and Indexer interfaces
* @noinstantiate This class is not intended to be instantiated by clients.
*/
public class MemoryNullIndexer extends NullIndexer {

Expand All @@ -39,8 +44,8 @@ public class MemoryNullIndexer extends NullIndexer {
* the parent node that owns the memory
*/
public MemoryNullIndexer(ReteContainer reteContainer, int tupleWidth, Collection<Tuple> memory, Supplier parent,
Receiver activeNode) {
super(reteContainer, tupleWidth, parent, activeNode);
Receiver activeNode, List<ListenerSubscription> sharedSubscriptionList) {
super(reteContainer, tupleWidth, parent, activeNode, sharedSubscriptionList);
this.memory = memory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import org.eclipse.viatra.query.runtime.matchers.tuple.FlatTuple;
import org.eclipse.viatra.query.runtime.matchers.tuple.Tuple;
import org.eclipse.viatra.query.runtime.matchers.tuple.TupleMask;
import org.eclipse.viatra.query.runtime.rete.index.SpecializedProjectionIndexer.ListenerSubscription;
import org.eclipse.viatra.query.runtime.rete.network.Direction;
import org.eclipse.viatra.query.runtime.rete.network.Node;
import org.eclipse.viatra.query.runtime.rete.network.ReteContainer;
Expand All @@ -29,6 +31,9 @@
* active node). Do not attach parents directly!
*
* @author Gabor Bergmann
* @noimplement Rely on the provided implementations
* @noreference Use only via standard Node and Indexer interfaces
* @noinstantiate This class is not intended to be instantiated by clients.
*/
public abstract class NullIndexer extends SpecializedProjectionIndexer {

Expand All @@ -39,8 +44,9 @@ public abstract class NullIndexer extends SpecializedProjectionIndexer {
static Collection<Tuple> nullSingleton = Collections.singleton(nullSignature);
static Collection<Tuple> emptySet = Collections.emptySet();

public NullIndexer(ReteContainer reteContainer, int tupleWidth, Supplier parent, Node activeNode) {
super(reteContainer, TupleMask.linear(0, tupleWidth), parent, activeNode);
public NullIndexer(ReteContainer reteContainer, int tupleWidth, Supplier parent,
Node activeNode, List<ListenerSubscription> sharedSubscriptionList) {
super(reteContainer, TupleMask.linear(0, tupleWidth), parent, activeNode, sharedSubscriptionList);
}

public Collection<Tuple> get(Tuple signature) {
Expand Down Expand Up @@ -69,10 +75,11 @@ public Iterator<Tuple> iterator() {
return getTuples().iterator();
}

public void propagate(Direction direction, Tuple updateElement) {
@Override
public void propagateToListener(IndexerListener listener, Direction direction, Tuple updateElement) {
boolean radical = (direction == Direction.REVOKE && isEmpty())
|| (direction == Direction.INSERT && isSingleElement());
propagate(direction, updateElement, nullSignature, radical);
listener.notifyIndexerUpdate(direction, updateElement, nullSignature, radical);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,119 @@

package org.eclipse.viatra.query.runtime.rete.index;

import java.util.List;
import java.util.Objects;

import org.eclipse.viatra.query.runtime.matchers.tuple.Tuple;
import org.eclipse.viatra.query.runtime.matchers.tuple.TupleMask;
import org.eclipse.viatra.query.runtime.rete.network.Direction;
import org.eclipse.viatra.query.runtime.rete.network.Node;
import org.eclipse.viatra.query.runtime.rete.network.ReteContainer;
import org.eclipse.viatra.query.runtime.rete.network.Supplier;

/**
* A specialized projection indexer that can be memory-less (relying on an external source of information).
*
* @author Gabor Bergmann
* <p> All specialized projection indexers of a single node will share the same listener list, so
* that notification order is maintained (see Bug 518434).
*
* @author Gabor Bergmann
* @noimplement Rely on the provided implementations
* @noreference Use only via standard Node and Indexer interfaces
* @noinstantiate This class is not intended to be instantiated by clients.
*/
public abstract class SpecializedProjectionIndexer extends StandardIndexer implements ProjectionIndexer {

protected Node activeNode;
protected List<ListenerSubscription> sharedSubscriptionList;

public SpecializedProjectionIndexer(ReteContainer reteContainer, TupleMask mask, Supplier parent, Node activeNode) {
/**
* @since 1.7
*/
public SpecializedProjectionIndexer(ReteContainer reteContainer, TupleMask mask, Supplier parent,
Node activeNode, List<ListenerSubscription> sharedSubscriptionList) {
super(reteContainer, mask);
this.parent = parent;
this.activeNode = activeNode;
this.sharedSubscriptionList = sharedSubscriptionList;
}

@Override
public Node getActiveNode() {
return activeNode;
}

@Override
protected void propagate(Direction direction, Tuple updateElement, Tuple signature, boolean change) {
throw new UnsupportedOperationException();
}

@Override
public void attachListener(IndexerListener listener) {
super.attachListener(listener);
ListenerSubscription subscription = new ListenerSubscription(this, listener);
// See Bug 518434
// Must add to the first position, so that the later listeners are notified earlier.
// Thus if the beta node added as listener is also an indirect descendant of the same indexer on its opposite slot,
// then the beta node is connected later than its ancestor's listener, therefore it will be notified earlier,
// eliminating duplicate insertions and lost deletions that would result from fall-through update propagation
sharedSubscriptionList.add(0, subscription);
}
@Override
public void detachListener(IndexerListener listener) {
super.detachListener(listener);
ListenerSubscription subscription = new ListenerSubscription(this, listener);
sharedSubscriptionList.remove(subscription);
}

/**
* @since 1.7
*/
public abstract void propagateToListener(IndexerListener listener, Direction direction, Tuple updateElement);

/**
* Infrastructure to share subscriptions between specialized indexers of the same parent node.
*
* @author Gabor Bergmann
* @since 1.7
*/
public static class ListenerSubscription {
protected SpecializedProjectionIndexer indexer;
protected IndexerListener listener;

public ListenerSubscription(SpecializedProjectionIndexer indexer, IndexerListener listener) {
super();
this.indexer = indexer;
this.listener = listener;
}

/**
* Call this from parent node.
*/
public void propagate(Direction direction, Tuple updateElement) {
indexer.propagateToListener(listener, direction, updateElement);
}

@Override
public int hashCode() {
return Objects.hash(indexer, listener);
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ListenerSubscription other = (ListenerSubscription) obj;
return Objects.equals(listener, other.listener) &&
Objects.equals(indexer, other.indexer);
}


}


}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ public Supplier getParent() {
}

public void attachListener(IndexerListener listener) {
listeners.add(listener);
// See Bug 518434
// Must add to the first position, so that the later listeners are notified earlier.
// Thus if the beta node added as listener is also an indirect descendant of the same indexer on its opposite slot,
// then the beta node is connected later than its ancestor's listener, therefore it will be notified earlier,
// eliminating duplicate insertions and lost deletions that would result from fall-through update propagation
listeners.add(0, listener);
reteContainer.getTracker().registerDependency(this, listener.getOwner());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.eclipse.viatra.query.runtime.matchers.context.IPosetComparator;
import org.eclipse.viatra.query.runtime.matchers.tuple.Tuple;
import org.eclipse.viatra.query.runtime.matchers.tuple.TupleMask;
import org.eclipse.viatra.query.runtime.rete.index.MemoryIdentityIndexer;
import org.eclipse.viatra.query.runtime.rete.index.MemoryNullIndexer;
import org.eclipse.viatra.query.runtime.rete.index.ProjectionIndexer;
import org.eclipse.viatra.query.runtime.rete.index.SpecializedProjectionIndexer;
import org.eclipse.viatra.query.runtime.rete.index.SpecializedProjectionIndexer.ListenerSubscription;
import org.eclipse.viatra.query.runtime.rete.network.DefaultMailbox;
import org.eclipse.viatra.query.runtime.rete.network.Direction;
import org.eclipse.viatra.query.runtime.rete.network.Mailbox;
Expand Down Expand Up @@ -69,6 +72,11 @@ public class UniquenessEnforcerNode extends StandardNode
protected final Mailbox mailbox;
private final TupleMask nullMask;
private final TupleMask identityMask;

/**
* @since 1.7
*/
protected final List<ListenerSubscription> specializedListeners;

public UniquenessEnforcerNode(ReteContainer reteContainer, int tupleWidth) {
this(reteContainer, tupleWidth, false);
Expand Down Expand Up @@ -104,6 +112,7 @@ public UniquenessEnforcerNode(ReteContainer reteContainer, int tupleWidth, boole
TupleMask coreMask, TupleMask posetMask, IPosetComparator posetComparator) {
super(reteContainer);
this.parents = new ArrayList<Supplier>();
this.specializedListeners = new ArrayList<ListenerSubscription>();
this.memory = new TupleMemory();
this.rederivableMemory = new TupleMemory();
this.tupleWidth = tupleWidth;
Expand Down Expand Up @@ -280,14 +289,16 @@ public void rederiveOne() {
* @since 1.6
*/
protected void propagate(Direction direction, Tuple update) {
propagateUpdate(direction, update);

if (memoryIdentityIndexer != null) {
memoryIdentityIndexer.propagate(direction, update);
}
if (memoryNullIndexer != null) {
memoryNullIndexer.propagate(direction, update);
// See Bug 518434
// trivial (non-active) indexers must be updated before other listeners
// so that if they are joined against each other, trivial indexers lookups
// will be consistent with their notifications;
// also, their subscriptions must share a single order
for (ListenerSubscription subscription : specializedListeners) {
subscription.propagate(direction, update);
}

propagateUpdate(direction, update);
}

@Override
Expand Down Expand Up @@ -316,15 +327,15 @@ public void pullInto(Collection<Tuple> collector) {

public MemoryNullIndexer getNullIndexer() {
if (memoryNullIndexer == null) {
memoryNullIndexer = new MemoryNullIndexer(reteContainer, tupleWidth, memory, this, this);
memoryNullIndexer = new MemoryNullIndexer(reteContainer, tupleWidth, memory, this, this, specializedListeners);
reteContainer.getTracker().registerDependency(this, memoryNullIndexer);
}
return memoryNullIndexer;
}

public MemoryIdentityIndexer getIdentityIndexer() {
if (memoryIdentityIndexer == null) {
memoryIdentityIndexer = new MemoryIdentityIndexer(reteContainer, tupleWidth, memory, this, this);
memoryIdentityIndexer = new MemoryIdentityIndexer(reteContainer, tupleWidth, memory, this, this, specializedListeners);
reteContainer.getTracker().registerDependency(this, memoryIdentityIndexer);
}
return memoryIdentityIndexer;
Expand Down

0 comments on commit 2cc7814

Please sign in to comment.