Skip to content

Commit

Permalink
Merge pull request #5 from ruiwang-linkedin/master
Browse files Browse the repository at this point in the history
Added capability to NetworkClient (PartitionedNetworkClient), LoadBalancers and its Java Peers
  • Loading branch information
jhartman committed Aug 8, 2012
2 parents cd83d02 + 74f9a33 commit 76780e3
Show file tree
Hide file tree
Showing 42 changed files with 1,629 additions and 244 deletions.
787 changes: 781 additions & 6 deletions cluster/src/main/java/com/linkedin/norbert/protos/NorbertProtos.java

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions cluster/src/main/protobuf/norbert.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ message NorbertMessage {
required string message_name = 11;
optional bytes message = 12;
optional string error_message = 13;

message Header {
required string key = 1;
optional string value = 2;
}
repeated Header header = 14;
}

message Node {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,12 @@ final case class Node(id: Int, url: String, available: Boolean, partitionIds: Se
}

override def toString = "Node(%d,%s,[%s],%b,0x%08X)".format(id, url, partitionIds.mkString(","), available, if (capability.isEmpty) 0L else capability.get)

def isCapableOf(c: Option[Long]) : Boolean = {
(capability, c) match {
case (Some(nc), Some(rc)) => (nc & rc) == rc
case (None, Some(rc)) => rc == 0L
case _ => true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public Node nextNode()
{
return endpoints.iterator().next().getNode();
}

@java.lang.Override
public Node nextNode(Long capability)
{
return endpoints.iterator().next().getNode();
}
};
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ object NorbertNetworkClientMain {
case Some(n) =>
val future = nc.sendRequestToNode(Ping(System.currentTimeMillis), n)
try {
val response = future.get(500, TimeUnit.MILLISECONDS).asInstanceOf[NorbertExampleProtos.PingResponse]
println("Ping took %dms".format(System.currentTimeMillis - response.getTimestamp))
val response = future.get(500, TimeUnit.MILLISECONDS).asInstanceOf[Pong]
println("Ping took %dms".format(System.currentTimeMillis - response.timestamp))
} catch {
case ex: TimeoutException => println("Ping timed out")
case ex: ExecutionException => println("Error: %s".format(ex.getCause))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
package com.linkedin.norbert
package network

import netty.NetworkServerConfig

import netty.{RequestContext => NettyRequestContext, NetworkServerConfig}
import org.jboss.netty.logging.{InternalLoggerFactory, Log4JLoggerFactory}
import com.google.protobuf.Message
import server.NetworkServer
import protos.NorbertExampleProtos
import cluster.ClusterClient
import norbertutils._
import server.{RequestContext, NetworkServer}
import network.NorbertNetworkServerMain.LogFilter
import protos.NorbertProtos.NorbertMessage


object NorbertNetworkServerMain {
InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory)
Expand All @@ -38,6 +43,7 @@ object NorbertNetworkServerMain {
val ns = NetworkServer(config)

ns.registerHandler(pingHandler)
ns.addFilters(List(new LogFilter))

ns.bind(args(2).toInt)

Expand All @@ -48,8 +54,31 @@ object NorbertNetworkServerMain {
})
}

private def pingHandler(ping: Ping): Ping = {
private def pingHandler(ping: Ping): Pong = {
println("Requested ping from client %d milliseconds ago (assuming synchronized clocks)".format(ping.timestamp - System.currentTimeMillis) )
Ping(System.currentTimeMillis)
Pong(System.currentTimeMillis)
}

class LogFilter extends netty.Filter {
val clock = SystemClock
def onRequest(request: Any, context: RequestContext)
{ context.attributes += ("START_TIMER" -> clock.getCurrentTime) }

def onResponse(response: Any, context: RequestContext)
{ val start: Long = context.attributes.getOrElse("START_TIMER", -1).asInstanceOf[Long]
println("server side time logging: " + (clock.getCurrentTime - start) + " ms.")
}

def onMessage(message: NorbertMessage, context: RequestContext) =
{ context.attributes += ("PRE_SERIALIZATION" -> clock.getCurrentTime) }

def postMessage(message: NorbertMessage, context: RequestContext) =
{
val start: Long = context.attributes.getOrElse("PRE_SERIALIZATION", -1).asInstanceOf[Long]
println("server side time logging including serialization: " + (clock.getCurrentTime - start) + " ms.")
}

def onError(error: Exception, context: RequestContext)
{}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.linkedin.norbert.network
import com.linkedin.norbert.protos.NorbertExampleProtos

object Ping {
implicit case object PingSerializer extends Serializer[Ping, Ping] {
implicit case object PingSerializer extends Serializer[Ping, Pong] {
def requestName = "ping"
def responseName = "pong"

Expand All @@ -29,12 +29,13 @@ object Ping {
Ping(NorbertExampleProtos.Ping.newBuilder.mergeFrom(bytes).build.getTimestamp)
}

def responseToBytes(message: Ping) =
requestToBytes(message)
def responseToBytes(message: Pong) =
NorbertExampleProtos.PingResponse.newBuilder.setTimestamp(message.timestamp).build.toByteArray

def responseFromBytes(bytes: Array[Byte]) =
requestFromBytes(bytes)
Pong(NorbertExampleProtos.PingResponse.newBuilder.mergeFrom(bytes).build.getTimestamp)
}
}

case class Ping(timestamp: Long)
case class Pong(timestamp: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ public interface Node {
String getUrl();
Set<Integer> getPartitionIds();
boolean isAvailable();
boolean isCapableOf(Long c);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ object JavaNode {
if (node.partitionIds != null) {
node.partitionIds.foreach {id => s.add(id)}
}
JavaNode(node.id, node.url, node.available, s)
JavaNode(node.id, node.url, node.available, s, node.capability)
}
}
}

case class JavaNode(@BeanProperty id: Int, @BeanProperty url: String, @BeanProperty available: Boolean, @BeanProperty partitionIds: java.util.Set[java.lang.Integer]) extends Node {
case class JavaNode(@BeanProperty id: Int, @BeanProperty url: String, @BeanProperty available: Boolean, @BeanProperty partitionIds: java.util.Set[java.lang.Integer], capability: Option[Long]) extends Node {
def isAvailable = available
def isCapableOf(c: java.lang.Long) : Boolean =
capability match {
case Some(nc) => (nc & c.longValue) == c.longValue
case None => c.longValue == 0L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,15 @@ public class ConsistentHashPartitionedLoadBalancer<PartitionedId> implements Par

@Override
public Node nextNode(PartitionedId partitionedId)
{
return nextNode(partitionedId, 0L);
}

@Override
public Node nextNode(PartitionedId partitionedId, Long capability)
{
if(_fallThrough != null)
return _fallThrough.nextNode(partitionedId);
return _fallThrough.nextNode(partitionedId, capability);

// TODO: How do we choose which node to return if we don't want to throw Exception?
throw new UnsupportedOperationException();
Expand All @@ -113,18 +119,33 @@ public Set<Node> nodesForPartitionedId(PartitionedId partitionedId)
throw new UnsupportedOperationException();
}

@Override
public Set<Node> nodesForPartitionedId(PartitionedId partitionedId, Long capability)
{
if (_fallThrough != null)
return _fallThrough.nodesForPartitionedId(partitionedId, capability);

throw new UnsupportedOperationException();
}

@Override
public Map<Node, Set<Integer>> nodesForOneReplica(PartitionedId partitionedId)
{
return nodesForOneReplica(partitionedId, 0L);
}

@Override
public Map<Node, Set<Integer>> nodesForOneReplica(PartitionedId partitionedId, Long capability)
{
Map<Endpoint, Set<Integer>> replica = lookup(_routingMap, _hashFunction.hash(partitionedId.toString()));
Map<Node, Set<Integer>> results = new HashMap<Node, Set<Integer>>();

Set<Integer> unsatisfiedPartitions = new HashSet<Integer>();

// Attempt to filter out results that are not available
for(Map.Entry<Endpoint, Set<Integer>> entry : replica.entrySet())
{
if(entry.getKey().canServeRequests())
if(entry.getKey().canServeRequests() && entry.getKey().getNode().isCapableOf(capability))
{
results.put(entry.getKey().getNode(), entry.getValue());
}
Expand All @@ -133,15 +154,15 @@ public Map<Node, Set<Integer>> nodesForOneReplica(PartitionedId partitionedId)
unsatisfiedPartitions.addAll(entry.getValue());
}
}


if(unsatisfiedPartitions.size() > 0)
{
Map<Node, Set<Integer>> resolved = _fallThrough.nodesForPartitions(partitionedId, unsatisfiedPartitions);
for(Map.Entry<Node, Set<Integer>> entry : resolved.entrySet())
Map<Node, Set<Integer>> resolved = _fallThrough.nodesForPartitions(partitionedId, unsatisfiedPartitions, capability);
for(Map.Entry<Node, Set<Integer>> entry : resolved.entrySet())
{
Set<Integer> partitions = results.get(entry.getKey());
if(partitions != null)
if(partitions != null)
{
partitions.addAll(entry.getValue());
}
Expand All @@ -157,8 +178,14 @@ public Map<Node, Set<Integer>> nodesForOneReplica(PartitionedId partitionedId)

@Override
public Map<Node, Set<Integer>> nodesForPartitions(PartitionedId partitionedId, Set<Integer> partitions) {
Map<Node, Set<Integer>> entireReplica = nodesForOneReplica(partitionedId);

return nodesForPartitions(partitionedId, partitions, 0L);
}


@Override
public Map<Node, Set<Integer>> nodesForPartitions(PartitionedId partitionedId, Set<Integer> partitions, Long capability) {
Map<Node, Set<Integer>> entireReplica = nodesForOneReplica(partitionedId, capability);

Map<Node, Set<Integer>> result = new HashMap<Node, Set<Integer>>();
for(Map.Entry<Node, Set<Integer>> entry : entireReplica.entrySet())
{
Expand All @@ -169,7 +196,7 @@ public Map<Node, Set<Integer>> nodesForPartitions(PartitionedId partitionedId, S
if(partitions.contains(localPartition))
partitionsToUse.add(localPartition);
}

if(!localPartitions.isEmpty())
{
result.put(entry.getKey(), localPartitions);
Expand All @@ -178,6 +205,7 @@ public Map<Node, Set<Integer>> nodesForPartitions(PartitionedId partitionedId, S
return result;
}


private <K, V> V lookup(NavigableMap<K, V> ring, K key)
{
V result = ring.get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,12 @@ public interface LoadBalancer {
* @return the <code>Node</code> to route the next request to or null if there are no <code>Node</code>s available
*/
Node nextNode();

/**
* Returns the next <code>Node</code> that fulfill the capability a request should be routed to.
*
* @param capability A Long that representing the minimal capability of the node that's serving the request
* @return the <code>Node</code> to route the next request to or null if there are no <code>Node</code>'s available
*/
Node nextNode(Long capability);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

import java.util.Map;
import java.util.Set;
import java.lang.Long;
import com.linkedin.norbert.javacompat.cluster.Node;

/**
* A <code>PartitionedLoadBalancer</code> handles calculating the next <code>Node</code> a message should be routed to
* based on a PartitionedId.
*
* Each of these API have two versions, one without capability long, one with it. The capability long is a piece of data
* held by Norbert node representing 64 different capability by setting each bit of the Long. If the API takes a capability
* parameter, load balancer will only return nodes satisfying the at least the capability requested.
*/
public interface PartitionedLoadBalancer<PartitionedId> {
/**
Expand All @@ -33,12 +38,28 @@ public interface PartitionedLoadBalancer<PartitionedId> {
*/
Node nextNode(PartitionedId id);

/**
* Returns the next <code>Node</code> a message should be routed to based on the PartitionId provided.
*
* @param id the id to be used to calculate partitioning information.
* @param capability the minimal capability required by client
*
* @return the <code>Node</code> to route the next message to
*/
Node nextNode(PartitionedId id, Long capability);

/**
* Returns all replica nodes for the same partitionedId
* @return the <code>Nodes</code> to multicast the next messages to each replica
*/
Set<Node> nodesForPartitionedId(PartitionedId id);

/**
* Returns all replica nodes for the same partitionedId
* @return the <code>Nodes</code> to multicast the next messages to each replica
*/
Set<Node> nodesForPartitionedId(PartitionedId id, Long capability);

/**
* Returns a set of nodes represents one replica of the cluster, this is used by the PartitionedNetworkClient to handle
* broadcast to one replica
Expand All @@ -47,11 +68,27 @@ public interface PartitionedLoadBalancer<PartitionedId> {
*/
Map<Node, Set<Integer>> nodesForOneReplica(PartitionedId id);

/**
* Returns a set of nodes represents one replica of the cluster, this is used by the PartitionedNetworkClient to handle
* broadcast to one replica
*
* @return the set of <code>Nodes</code> to broadcast the next message to a replica to
*/
Map<Node, Set<Integer>> nodesForOneReplica(PartitionedId id, Long capability);

/**
* Calculates a mapping of nodes to partitions for broadcasting a partitioned request. Optionally uses a partitioned
* id for consistent hashing purposes
*
* @return the <code>Nodes</code> to broadcast the next message to a replica to
*/
Map<Node, Set<Integer>> nodesForPartitions(PartitionedId id, Set<Integer> partitions);

/**
* Calculates a mapping of nodes to partitions for broadcasting a partitioned request. Optionally uses a partitioned
* id for consistent hashing purposes
*
* @return the <code>Nodes</code> to broadcast the next message to a replica to
*/
Map<Node, Set<Integer>> nodesForPartitions(PartitionedId id, Set<Integer> partitions, Long capability);
}
Loading

0 comments on commit 76780e3

Please sign in to comment.