Skip to content

Commit

Permalink
Merge pull request #6 from ruiwang-linkedin/master
Browse files Browse the repository at this point in the history
fixing @author from scala; complete client filter work
  • Loading branch information
jhartman committed Aug 13, 2012
2 parents 76780e3 + f2b36f7 commit 63d4127
Show file tree
Hide file tree
Showing 13 changed files with 54 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ package com.linkedin.norbert
package network


import netty.{RequestContext => NettyRequestContext, NetworkServerConfig}
import server.{RequestContext, NetworkServer}
import netty.{RequestContext => NettyRequestContext, NetworkServerConfig, NettyServerFilter}
import org.jboss.netty.logging.{InternalLoggerFactory, Log4JLoggerFactory}
import com.google.protobuf.Message
import protos.NorbertExampleProtos
import cluster.ClusterClient
import norbertutils._
import server.{RequestContext, NetworkServer}
import network.NorbertNetworkServerMain.LogFilter
import protos.NorbertProtos.NorbertMessage

Expand Down Expand Up @@ -59,7 +59,7 @@ object NorbertNetworkServerMain {
Pong(System.currentTimeMillis)
}

class LogFilter extends netty.Filter {
class LogFilter extends NettyServerFilter {
val clock = SystemClock
def onRequest(request: Any, context: RequestContext)
{ context.attributes += ("START_TIMER" -> clock.getCurrentTime) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.linkedin.norbert.network

import java.util.UUID
import com.linkedin.norbert.cluster.{ClusterException, Node}
import scala.collection.mutable.Map

object Request {
def apply[RequestMsg, ResponseMsg](message: RequestMsg, node: Node,
Expand All @@ -31,13 +32,16 @@ class Request[RequestMsg, ResponseMsg](val message: RequestMsg, val node: Node,
val callback: Either[Throwable, ResponseMsg] => Unit, val retryAttempt: Int = 0) {
val id = UUID.randomUUID
val timestamp = System.currentTimeMillis
val headers : Map[String, String] = Map.empty[String, String]

def name: String = {
inputSerializer.requestName
}

def requestBytes: Array[Byte] = outputSerializer.requestToBytes(message)

def addHeader(key: String, value: String) = headers += (key -> value)

def onFailure(exception: Throwable) {
callback(Left(exception))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.linkedin.norbert
package network
package client

/**
* Currently only handling one way IC
*/

trait Filter {
def onRequest[RequestMsg, ResponseMsg](request: Request[RequestMsg, ResponseMsg]): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.Future
import cluster._
import logging.Logging
import jmx.JMX
import client.Filter

trait BaseNetworkClient extends Logging {
this: ClusterClientComponent with ClusterIoClientComponent =>
Expand All @@ -36,6 +37,8 @@ trait BaseNetworkClient extends Logging {
protected val shutdownSwitch = new AtomicBoolean

private var listenerKey: ClusterListenerKey = _

private var filters : List[Filter] = List.empty[Filter]

def start {
if (startedSwitch.compareAndSet(false, true)) {
Expand Down Expand Up @@ -144,8 +147,11 @@ trait BaseNetworkClient extends Logging {

protected def updateLoadBalancer(nodes: Set[Endpoint]): Unit

def addFilters(clientFilters: List[Filter]) = { filters = clientFilters }

protected def doSendRequest[RequestMsg, ResponseMsg](requestCtx: Request[RequestMsg, ResponseMsg])
(implicit is: InputSerializer[RequestMsg, ResponseMsg], os: OutputSerializer[RequestMsg, ResponseMsg]): Unit = {
filters.foreach { filter => filter.onRequest(requestCtx) }
clusterIoClient.sendMessage(requestCtx.node, requestCtx)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ class ClientChannelHandler(clientName: Option[String],
message.setRequestIdMsb(request.id.getMostSignificantBits)
message.setRequestIdLsb(request.id.getLeastSignificantBits)
message.setMessageName(request.name)
request.headers.foreach { header =>
message.addHeader(NorbertProtos.NorbertMessage.Header.newBuilder.setKey(header._1).setValue(header._2).build)
}
message.setMessage(ProtoUtils.byteArrayToByteString(request.requestBytes, avoidByteStringCopy))

super.writeRequested(ctx, new DownstreamMessageEvent(e.getChannel, e.getFuture, message.build, e.getRemoteAddress))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import client.loadbalancer.{LoadBalancerFactoryComponent, LoadBalancerFactory}
import cluster.{ClusterClient, ClusterClientComponent}
import protos.NorbertProtos
import org.jboss.netty.channel.{ChannelPipelineFactory, Channels}
import client.{ThreadPoolResponseHandler, ResponseHandlerComponent, NetworkClient, NetworkClientConfig}
import client.{ThreadPoolResponseHandler, ResponseHandlerComponent, NetworkClient, NetworkClientConfig, NetworkClientComponent}
import common.{CompositeCanServeRequestStrategy, SimpleBackoffStrategy, BaseNetworkClient}
import java.util.{Map => JMap}
import jmx.JMX
Expand Down Expand Up @@ -129,4 +129,4 @@ abstract class BaseNettyNetworkClient(clientConfig: NetworkClientConfig) extends
class NettyNetworkClient(clientConfig: NetworkClientConfig, val loadBalancerFactory: LoadBalancerFactory) extends BaseNettyNetworkClient(clientConfig) with NetworkClient with LoadBalancerFactoryComponent

class NettyPartitionedNetworkClient[PartitionedId](clientConfig: NetworkClientConfig, val loadBalancerFactory: PartitionedLoadBalancerFactory[PartitionedId]) extends BaseNettyNetworkClient(clientConfig)
with PartitionedNetworkClient[PartitionedId] with PartitionedLoadBalancerFactoryComponent[PartitionedId]
with PartitionedNetworkClient[PartitionedId] with PartitionedLoadBalancerFactoryComponent[PartitionedId]
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
package com.linkedin.norbert
package network
package netty
/**
* @auther: rwang
* @date: 7/27/12
* @version: $Revision$
*/

import server.{Filter => SFilter, RequestContext => SRequestContext}
import server.{Filter, RequestContext => SRequestContext}
import protos.NorbertProtos.NorbertMessage

trait Filter extends SFilter {
trait NettyServerFilter extends Filter {
def onMessage(message: NorbertMessage, context: SRequestContext) : Unit
def postMessage(message: NorbertMessage, context: SRequestContext) : Unit
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package netty
import org.jboss.netty.channel.group.ChannelGroup
import org.jboss.netty.channel._
import server.{MessageExecutor, MessageHandlerRegistry, RequestContext => NorbertRequestContext}
import common.CachedNetworkStatistics
import protos.NorbertProtos
import logging.Logging
import java.util.UUID
Expand All @@ -29,7 +30,6 @@ import jmx.{FinishedRequestTimeTracker, JMX}
import java.lang.String
import com.google.protobuf.{ByteString}
import norbertutils._
import common.CachedNetworkStatistics
import util.ProtoUtils


Expand Down Expand Up @@ -65,15 +65,23 @@ class ServerFilterChannelHandler(messageExecutor: MessageExecutor) extends Simpl
override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent) {
if (e.isInstanceOf[MessageEvent]) {
val (context, norbertMessage) = e.asInstanceOf[MessageEvent].getMessage.asInstanceOf[(RequestContext, NorbertProtos.NorbertMessage)]
messageExecutor.filters.foreach { filter => filter.asInstanceOf[Filter].onMessage(norbertMessage, context) }
messageExecutor.filters.foreach { filter =>
filter match {
case f : NettyServerFilter => f.onMessage(norbertMessage, context)
}
}
}
super.handleUpstream(ctx, e)
}

override def handleDownstream(ctx: ChannelHandlerContext, e: ChannelEvent) {
if (e.isInstanceOf[MessageEvent]) {
val (context, norbertMessage) = e.asInstanceOf[MessageEvent].getMessage.asInstanceOf[(RequestContext, NorbertProtos.NorbertMessage)]
messageExecutor.filters.foreach { filter => filter.asInstanceOf[Filter].postMessage(norbertMessage, context) }
messageExecutor.filters.foreach { filter =>
filter match {
case f :NettyServerFilter => f.postMessage(norbertMessage, context)
}
}
}
super.handleDownstream(ctx, e)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.linkedin.norbert.network.server

trait Filter {
def onRequest(request: Any, context: RequestContext): Unit
def onResponse(response: Any, context: RequestContext): Unit
def onError(error: Exception, context: RequestContext): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import actors.DaemonActor
import java.util.concurrent.atomic.AtomicInteger
import norbertutils.{SystemClock, NamedPoolThreadFactory}
import java.util.concurrent._
import common.{CachedNetworkStatistics}
import cluster.ClusterClientComponent
import scala.collection.mutable.MutableList
import common.CachedNetworkStatistics

/**
* A component which submits incoming messages to their associated message handler.
Expand All @@ -35,12 +35,6 @@ trait MessageExecutorComponent {
val messageExecutor: MessageExecutor
}

trait Filter {
def onRequest(request: Any, context: RequestContext): Unit
def onResponse(response: Any, context: RequestContext): Unit
def onError(error: Exception, context: RequestContext): Unit
}

trait MessageExecutor {
def executeMessage[RequestMsg, ResponseMsg](request: RequestMsg, responseHandler: (Either[Exception, ResponseMsg]) => Unit)
(implicit is: InputSerializer[RequestMsg, ResponseMsg]) : Unit = executeMessage(request, responseHandler, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,7 @@ package com.linkedin.norbert.network.server

import scala.collection.mutable.Map

/**
* @auther: rwang
* @date: 7/26/12
* @version: $Revision$
*/
trait RequestContext
{
val attributes : Map[String, Any] = Map.empty[String, Any]
trait RequestContext {
val attributes: Map[String, Any] = Map.empty[String, Any]
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.specs.Specification
import org.specs.mock.Mockito
import client.NetworkClient
import client.loadbalancer.{LoadBalancerFactory, LoadBalancer, LoadBalancerFactoryComponent}
import server.{MessageExecutorComponent, MessageExecutor, Filter, RequestContext}
import server._
import cluster.{Node, ClusterClientComponent, ClusterClient}
import com.google.protobuf.Message
import scala.collection.mutable.MutableList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package server
import org.specs.Specification
import org.specs.mock.Mockito
import org.specs.util.WaitFor
import common.SampleMessage
import scala.collection.mutable.MutableList
import common.SampleMessage

class MessageExecutorSpec extends Specification with Mockito with WaitFor with SampleMessage {
val messageHandlerRegistry = mock[MessageHandlerRegistry]
Expand Down

0 comments on commit 63d4127

Please sign in to comment.