Skip to content

Commit

Permalink
Redo replication doco for Queue Enterprise
Browse files Browse the repository at this point in the history
  • Loading branch information
JerryShea committed Mar 16, 2023
1 parent d2cde07 commit 4dbe181
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 218 deletions.
15 changes: 2 additions & 13 deletions ReadMe.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,8 @@ best use of the product, and can also deliver
projects using a mix of our resources and
your own.

=== Replication Environment Example
The following diagram shows an example of Chronicle Map replication over three servers (or sites).
Chronicle Map Replication is part of Chronicle Map (Enterprise Edition); a commercially supported
version of our successful open source Chronicle Map.

.Three-Way Replication
image::docs\images\Image2_.png[750,650]

Replication is multi-master, lock-free, redundant, deterministic, and eventually consistent.

The writer can optionally wait for replication to occur across nodes or regions.

NOTE: See <<docs/CM_Replication.adoc#,Chronicle Map Replication>> for more information.
=== Replication
See <<docs/CM_Replication.adoc#,Chronicle Map Replication>> for more information.

'''
== Documentation
Expand Down
12 changes: 5 additions & 7 deletions docs/CM_Features.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ applications, such as trading and financial market applications.
- **High concurrency**: Write queries scale well up to the number of hardware execution threads in
the server. Read queries never block each other.
- **Persistence to disk** - (Optional)
- **Multi-master replication** - (Optional, commercial functionality) - Eventually-consistent, fully-redundant, asynchronous replication across
servers, "last write wins" strategy by default, allows to implement custom https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type[state-based CRDT] strategy.
- **Replication** - (Optional, commercial functionality) - replication from one to N other servers across LAN/WAN

== Unique features
- Multiple processes can access a Chronicle Map concurrently. At the same time,
the data store is *in-process* for each of the accessing processes. Out-of-process approach to IPC
is simply incompatible with Chronicle Map's median latency target of < 1 μs.

- Replication *without logs*, with constant footprint cost, guarantees progress even if the network
doesn't sustain write rates.
- Replication

NOTE: See <<CM_Replication.adoc#,Chronicle Map Replication>> for more information.

Expand Down Expand Up @@ -70,7 +68,7 @@ queries*.
- **Durability** - no; Chronicle Map can be persisted to disk, but with no guarantee as to how frequently this
happens. This is under the control of the OS. All data is guaranteed to be written to disk when the Map is closed.
- **Clustering** and **replication** for Chronicle
Map is provided by https://chronicle.software/products/map[Chronicle Map Enterprise].
Map is provided by https://chronicle.software/queue-enterprise/[Chronicle Queue Enterprise].

== What is the data structure of Chronicle Map?
Simply put, a Chronicle Map data store is a big chunk of shared memory (optionally mapped to disk).
Expand All @@ -83,7 +81,7 @@ It is split into independent segments; each segment has:

- a lock in shared memory (implemented via CAS loops) for managing concurrent access.

See https://github.com/OpenHFT/Chronicle-Map/blob/master/spec[ Chronicle Map data store design overview] for more information.
See https://github.com/OpenHFT/Chronicle-Map/blob/master/spec[Chronicle Map data store design overview] for more information.

== Chronicle Map is not

Expand Down Expand Up @@ -115,7 +113,7 @@ Please contact us at mailto:[email protected][[email protected]] i
|Open source
|Remote calls
|Commercially available
|Eventually-consistent replication (100% redundancy)
|Replication
|Commercially available
|Synchronous replication
|Commercially available
Expand Down
212 changes: 14 additions & 198 deletions docs/CM_Replication.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,221 +9,37 @@ Neil Clifford
toc::[]

== Enterprise Edition
Chronicle Map Replication is part of Chronicle Map (Enterprise Edition); a commercially supported version of our successful open source Chronicle Map. Extended features include:
Chronicle Map Replication is part of Chronicle Queue Enterprise; which is commercially supported. Extended features include:

- *Replication* to ensure real-time backup of all your map data.
- *Resilience* support for robust fail-over and disaster recovery environments.

In addition, you will be fully supported by our technical experts.

For more information on Chronicle Map (Enterprise Edition), please contact mailto:sales@chronicle.software[[email protected]].
For more information see https://chronicle.software/queue-enterprise/[Chronicle Queue Enterprise].

== Replication

Chronicle Map Replication is a multi-master replicated data store. Chronicle Map supports replication over TCP/IP.
//This URL does not exist anymore
//image::http://openhft.net/wp-content/uploads/2014/07/Chronicle-Map-TCP-Replication_simple_02.jpg[TCP/IP Replication]
Chronicle Queue Enterprise supports replication of Chronicle Maps over TCP/IP.

== TCP Background
TCP/IP is a reliable protocol. This means that, unless you have a network failure or hardware outage, the data is guaranteed to arrive. TCP/IP provides point-to-point connectivity. For example, if the message was sent to 100 hosts, the message would have to be sent 100 times.

== TCP/IP Throttling
We are careful not to swamp your network with too much TCP/IP traffic. We do this by providing a throttled version of TCP replication. This works because Chronicle Map only broadcasts the latest update of each entry.
TCP/IP is a reliable protocol. This means that, unless you have a network failure or hardware outage, the data is guaranteed to arrive.

== How Chronicle Map Replication works
Chronicle Map provides multi-master hash-map replication. This means that each remote map mirrors its changes over to other remote maps. No map is considered to be the master store of data. Each map uses timestamps to reconcile changes.

We refer to an instance of a remote map as a **node**. Each node can be connected to up to 128 other nodes.

The data that is stored locally in each node becomes eventually consistent. So changes made to one node, for example by calling `put()`, will be replicated over to the other nodes at some unspecified time in the future.

To achieve a high level of performance and throughput, a call to `put()` will not block.

With `ConcurrentHashMap`, it is typical to check the return code of some methods to obtain the old value; for example, `remove()`.

Due to the loose coupling, and lock-free nature of this multi-master implementation, the return value is only the old value on the node's local data store. In other
words, the nodes are only concurrent locally. Another node, performing exactly the same operation, may return a different value. However, reconciliation will ensure that all the maps
will become eventually consistent.

== Reconciliation
If two or more nodes receive a change to their maps for the same key, but with different values (say by a user of the maps, calling the `put(key,value)`) then initially, each node will update its local store, and each local store will hold a different value.

The aim of multi-master replication is
to provide eventual consistency across the nodes. Using multi-master replication, whenever a node is changed, it will notify the other nodes of its change; we refer to this notification as an event.

The event will hold a timestamp indicating the time the change occurred. It will also hold the state transition; in this case it was a `put` with a key and value.

Eventual consistency is achieved by looking at the timestamp from the remote node. If, for a given key, the remote node's timestamp is newer than the local node's timestamp, then the event from the remote node will be applied to the local node; otherwise, the event will be ignored.

Since none of the nodes is a primary, each node holds information about the other nodes. For a specific node, its own identifier is referred to as the 'localIdentifier'. The identifiers of other nodes are the 'remoteIdentifiers'.

On an update, or insert, of a key/value, the node pushes the information about the change to the remote nodes. The nodes use non-blocking Java NIO I/O, and all replication is done on a single thread.

However, there is a specific edge case. If two nodes update their map at precisely the same time with different values, we have to deterministically resolve which update wins. This is because eventual
consistency mandates that both nodes should end up holding the same data locally.

Although it is rare that two remote
nodes receive an update to their maps at exactly the same time, for the same key, we have to handle this edge case. We cannot therefore rely on timestamps alone to reconcile
the updates.

Typically, the update with the newest timestamp should win, but in this example both timestamps are the same, and the decision made to one node should be identical to the decision made to the other. This dilemma is resolved by using a node identifier. The node identifier is a unique
'byte' value that is assigned to each node. When the timestamps are the same, the remote node with the smaller identifier will be preferred.

== Multiple Processes on the same server with Replication

On a single server, if you have a number of Java processes, and then within each Java process you create an instance of a Chronicle Map which binds to the same underline 'file', they exchange data via shared memory, rather than by TCP or UDP replication.

If an instance of Chronicle Map, which is not performing TCP Replication, is updated, then this update can be picked up by another instance of Chronicle Map. This other Chronicle Map instance could be TCP replicated. In such an example, the TCP replicated Chronicle Map instance would then push the update to the remote nodes.

Likewise, if the TCP replicated Chronicle Map instance received an update from a remote node, then this update would be immediately available to all the instances of Chronicle Map on the server.

== Identifier for Replication
If you are only replicating your Chronicle Map instances on the same server, then you do not have to set up TCP and UDP replication. You also do not have to set the identifiers; as the identifiers are only used for the resolution of conflicts amongst remote servers.

If however, you wish to replicate data between two or more servers, then all of the Chronicle Map instances, including those not actively participating in TCP or UDP replication, must have their identifiers set.

The identifier must be unique to each server. Each ChronicleMap on the same server must have
the same identifier. The reason that all Chronicle Map instances must have the identifier set, is because
the memory is laid out slightly differently when using replication, so even if a map is not actively performing TCP or UDP replication itself, if it wishes to replicate with one that is, it must have its memory laid out in the same way to be compatible.

If the identifiers are not set up uniquely, then the updates will be ignored. For example,
a Chronicle Map instance that is set up with the identifier equal to '1', will ignore all events which contain the remote identifier of '1'. In other words, Chronicle Map replication ignores updates which have originated from itself. This is to avoid circular conditions involving events.

When setting up the identifier you can use values from `1` to `127`.

The identifier is setup on the builder as follows:

```java
TcpTransportAndNetworkConfig tcpConfig = ...
map = ChronicleMapBuilder
.of(Integer.class, CharSequence.class)
.replication(identifier, tcpConfig)
.create();
```

== Configuration

Configuration of map nodes is done either, by creating configuration programmatically, or through YAML configuration files.

The following example uses a basic `yaml` configuration file to define clustered replication for the map named `fx`:

[source, yaml]
....
!MapReplicationCfg {
cluster: {
host1: {
hostId: 1,
connectUri: hostport1,
},
host2: {
hostId: 2,
connectUri: hostport2,
},
host3: {
hostId: 3,
connectUri: hostport3,
}
},
maps: {
fx: {
entries: 10000,
keyClass: !type String,
valueClass: !type software.chronicle.enterprise.map.ValueObject,
averageKeySize: 64,
averageValueSize: 128,
mapFileDataDirectory: data/$hostId/,
mapLogDirectory: logs/$hostId/,
enableReplicationLogging: true
},
}
}
....

And below is an example using this configuration file to start up cluster and insert entries in different maps, verifying that
all maps are eventually in sync:

[source, java]
....
try (ReplicatedMap clusterOnHost1 = createCluster(CLUSTER_YAML, 1);
ReplicatedMap clusterOnHost3 = createCluster(CLUSTER_YAML, 3);
ReplicatedMap clusterOnHost2 = createCluster(CLUSTER_YAML, 2)) {
final ChronicleMap<String, ValueObject> mapOnHost1 = clusterOnHost1.getReplicatedMap("fx");
final ChronicleMap<String, ValueObject> mapOnHost2 = clusterOnHost2.getReplicatedMap("fx");
final ChronicleMap<String, ValueObject> mapOnHost3 = clusterOnHost3.getReplicatedMap("fx");
mapOnHost1.put("USD/GBP", new ValueObject("BATS", System.currentTimeMillis(), 0.767957));
mapOnHost2.put("GBP/USD", new ValueObject("BATS", System.currentTimeMillis(), 1.30216));
mapOnHost3.put("EUR/USD", new ValueObject("LXN", System.currentTimeMillis(), 1.16337));
Jvm.pause(500L);
printMap("one", mapOnHost1);
printMap("two", mapOnHost2);
printMap("three", mapOnHost3);
}
....

This example is available in the Chronicle Map Enterprise Demo - for access contact [email protected]

=== Implementing Custom Serializers

In Chronicle Map, custom serializers can be applied to `ChronicleMapBuilder` using `valueMarshaller()` method.
For example:

[source, java]
----
ChronicleMapBuilder<Integer, MyDto> builder = ChronicleMapBuilder.of(Integer.class, MyDto.class)
.entries(10)
.averageValueSize(128)
.actualSegments(1)
.valueMarshaller(new MarshallableReaderWriter<>(MyDto.class));
----

However, `ReplicatedMapCfg` is not able to express `valueMarshaller()`, but it is possible to do this in a subclass, for example:

[source, java]
----
public class MarshallableReplicatedMapCfg<K, V extends Marshallable> extends ReplicatedMapCfg<K, V> {
public ChronicleMapBuilder<K, V> mapBuilder(byte localHostId) {
ChronicleMapBuilder<K, V> builder = super.mapBuilder(localHostId);
builder.valueMarshaller(new MarshallableReaderWriter<V>(valueClass()));
return builder;
}
}
----

Then this class should be added as an alias in the Java code and finally be specified in the YAML file explicitly:

[source, java]
----
ClassAliasPool.CLASS_ALIASES.addAlias(MarshallableReplicatedMapCfg.class);
----

[source, yaml]
----
valueclass.map: !MarshallableReplicatedMapCfg {
...
}
----

== Replication event logging

Chronicle Map Enterprise can be configured to log all replication events to a Chronicle Queue for auditing purposes.
An underlying Chronicle Queue is used to persist changes to the Map, and this is replicated using Chronicle Queue Enterprise
replication to one or more other Chronicle Maps. One underlying queue can be shared by multiple Maps, if required.
Replication never block Map gets or puts.

Currently, a map can be configured to log all *outgoing* events that it sends to remote peers.
Maps are configured either as a source or as a sink, with a source acting as the source of changes, and the sinks following.
Replication occurs from source to sink and a slow consumer, or a dead sink, will not impact replication to other sinks or
the source Map's performance.

The example below shows the message flow for a map with a single remote peer receiving replication events:
If a node goes offline then once it rejoins the cluster it will handshake and start synchronising itself with the cluster.
The application can be notified when this process completes and the sink is up to date and replicating "live".

[source, java]
....
.. header omitted
A new node can easily be created by connecting to the cluster, and then the new sink Map will become hydrated from the cluster.

== encoded replication update sent to remote peer
targetHostId: 2
replicatedEntry: !!binary AYChq5LqwKXqFAFOEwAAAAAAAAD/////////fw==
....
The source can be changed. If, for example the master host or data centre is changed, and it typically takes only milliseconds to reconfigure the cluster with a new source, and restart replication.

'''
<<CM_Features.adoc#,Back to Features>>

0 comments on commit 4dbe181

Please sign in to comment.