Skip to content

Commit

Permalink
finagle-memcached: Implement compressing cache using lz4 compression
Browse files Browse the repository at this point in the history
**Problem** Make lz4 compression easily available via supported memcache
clients. The desired effect is to drastically reduce the size of blobs stored in
memcache, with minimal CPU cost to the calling services.

**Solution** The hypothesis is that we can drastically reduce the size of our
typical memcache payloads, at very low CPU cost, by using a throughput-oriented
compressor like lz4. This will drastically reduce the network utilization of
caches, improving performance in high-load situations like site failovers. This
will also improve the storage utilization of cache, which will reduce eviction
(and perhaps increase cache hit rates of calling services). For larger blobs
that span >1 packet, this might also reduce cache tail latencies, by reducing
the number of packets needed to transmit an object and thereby reducing the
chances of any one of them being delayed or needing a retransmit.

By using a few bits in the memcache protocol's flags field, this can be
implemented in a way that allows for a transparent upgrade. New blobs written to
the store will be compressed - existing blobs will be unmodified. The bits in
the flags field will be used to signal whether the blob is compressed.
Transparent upgrades will make adoption of this significantly easier, as the
existing cached data in the cluster can be used while the transition occurs.

**Result** ~30% compression for the higher sized items and 15-20% from p40
onwards

Differential Revision: https://phabricator.twitter.biz/D1130236
  • Loading branch information
ctutika authored and jenkins committed Mar 20, 2024
1 parent 454148f commit 17b8d32
Show file tree
Hide file tree
Showing 26 changed files with 1,575 additions and 24 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ Note that ``PHAB_ID=#`` and ``RB_ID=#`` correspond to associated messages in com
Unreleased
----------

New Features
~~~~~~~~~~~~

* finagle-memcached: Implement compressing cache using lz4 compression filter. ``PHAB_ID=D1130236``
* finagle-core: Implement Toggle aware SimpleFilter. ``PHAB_ID=D1130236``

Bug Fixes
~~~~~~~~~~

Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,8 @@ lazy val finagleMemcached = Project(
util("hashing"),
util("zk-test") % "test",
"com.twitter" %% "bijection-core" % "0.9.7",
"org.apache.thrift" % "libthrift" % libthriftVersion
"org.apache.thrift" % "libthrift" % libthriftVersion,
lz4Lib
),
libraryDependencies ++= jacksonLibs
).dependsOn(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.twitter.finagle.filter

import com.twitter.finagle.Service
import com.twitter.finagle.SimpleFilter
import com.twitter.finagle.server.ServerInfo
import com.twitter.finagle.toggle.Toggle
import com.twitter.util.Future

/**
* Simple filter which calls the underlying filter if the toggle is enabled or passes the call through
*/
class ToggleAwareSimpleFilter[Req, Rep](underlyingFilter: SimpleFilter[Req, Rep], toggle: Toggle)
extends SimpleFilter[Req, Rep] {

def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
if (ToggleEnabled(toggle)) {
underlyingFilter(request, service)
} else {
service(request)
}
}
}

private object ToggleEnabled {
def apply(toggle: Toggle): Boolean =
toggle.isEnabled(ServerInfo().id.hashCode)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.twitter.finagle.filter

import com.twitter.conversions.DurationOps.RichDuration
import com.twitter.finagle.CoreToggles
import com.twitter.finagle.Service
import com.twitter.finagle.SimpleFilter
import com.twitter.util.Await
import com.twitter.util.Awaitable
import com.twitter.util.Future
import org.scalatestplus.mockito.MockitoSugar
import org.scalatest.funsuite.AnyFunSuite

class ToggleAwareSimpleFilterTest extends AnyFunSuite with MockitoSugar {

def await[T](awaitable: Awaitable[T]): T = Await.result(awaitable, 5.seconds)

trait Fixture {
val underlyingFilter = new SimpleFilter[Long, String] {
override def apply(
request: Long,
service: Service[Long, String]
): Future[String] = Future.value("underlying filter")
}

val service = new Service[Long, String] {
override def apply(request: Long): Future[String] = Future.value("service")
}

val toggleKey = "com.twitter.finagle.filter.TestToggleAwareUnderlying"
val toggle = CoreToggles(toggleKey)
val filter = new ToggleAwareSimpleFilter[Long, String](underlyingFilter, toggle)

}

test("calls underlying filter when toggle is enabled") {
new Fixture {

com.twitter.finagle.toggle.flag.overrides.let(toggleKey, 1) {
val result = filter.apply(0L, service)
assert(await(result) == "underlying filter")
}
}
}

test("calls actual service when toggle is disabled") {
new Fixture {

com.twitter.finagle.toggle.flag.overrides.let(toggleKey, 0) {
val result = filter.apply(0L, service)
assert(await(result) == "service")
}
}
}
}
1 change: 1 addition & 0 deletions finagle-memcached/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ target(
tags = ["bazel-compatible"],
dependencies = [
"finagle/finagle-memcached/src/main/java",
"finagle/finagle-memcached/src/main/resources",
"finagle/finagle-memcached/src/main/scala",
],
)
Expand Down
8 changes: 8 additions & 0 deletions finagle-memcached/src/main/resources/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
resources(
sources = [
"!**/*.pyc",
"!BUILD*",
"**/*",
],
tags = ["bazel-compatible"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"toggles": [
{
"id": "com.twitter.finagle.filter.CompressingMemcached",
"description": "Enable compressing filter for memcached values",
"fraction": 1.0
}
]
}
2 changes: 2 additions & 0 deletions finagle-memcached/src/main/scala/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ scala_library(
"3rdparty/jvm/io/netty:netty4-handler-proxy",
"3rdparty/jvm/org/apache/thrift:libthrift",
"3rdparty/jvm/org/apache/zookeeper:zookeeper-client",
"3rdparty/jvm/org/lz4:lz4-java",
"finagle/finagle-core/src/main",
"finagle/finagle-memcached/src/main/java:pants-workaround",
"finagle/finagle-memcached/src/main/resources",
"finagle/finagle-netty4",
"finagle/finagle-partitioning",
"finagle/finagle-serversets",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import com.twitter.finagle.loadbalancer.Balancers
import com.twitter.finagle.loadbalancer.LoadBalancerFactory
import com.twitter.finagle.memcached.exp.LocalMemcached
import com.twitter.finagle.memcached._
import com.twitter.finagle.memcached.compressing.WithCompressionScheme
import com.twitter.finagle.memcached.compressing.scheme.CompressionScheme
import com.twitter.finagle.memcached.partitioning.MemcachedPartitioningService
import com.twitter.finagle.memcached.protocol.text.server.ServerTransport
import com.twitter.finagle.memcached.protocol.text.transport.MemcachedNetty4ClientPipelineInit
Expand Down Expand Up @@ -184,6 +186,7 @@ object Memcached extends finagle.Client[Command, Response] with finagle.Server[C
.replace(StackClient.Role.protoTracing, MemcachedTracingFilter.memcachedTracingModule)
// we place this module at a point where the endpoint is resolved in the stack.
.insertBefore(ClientDestTracingFilter.role, MemcachedTracingFilter.shardIdTracingModule)
.prepend(CompressingMemcachedFilter.memcachedCompressingModule)

/**
* The memcached client should be using fixed hosts that do not change
Expand Down Expand Up @@ -219,6 +222,7 @@ object Memcached extends finagle.Client[Command, Response] with finagle.Server[C
params: Stack.Params = Client.params)
extends PushStackClient[Command, Response, Client]
with WithPartitioningStrategy[Client]
with WithCompressionScheme[Client]
with MemcachedRichClient {

protected type In = Response
Expand Down Expand Up @@ -358,6 +362,9 @@ object Memcached extends finagle.Client[Command, Response] with finagle.Server[C
override def configured[P](psp: (P, Stack.Param[P])): Client = super.configured(psp)
override def filtered(filter: Filter[Command, Response, Command, Response]): Client =
super.filtered(filter)

override def withCompressionScheme(scheme: CompressionScheme): Client = super
.withCompressionScheme(scheme)
}

def client: Memcached.Client = Client()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package com.twitter.finagle.memcached

import com.twitter.finagle._
import com.twitter.finagle.memcached.compressing.CompressionProvider
import com.twitter.finagle.memcached.compressing.param.CompressionParam
import com.twitter.finagle.memcached.compressing.scheme.CompressionScheme
import com.twitter.finagle.memcached.compressing.scheme.Lz4
import com.twitter.finagle.memcached.compressing.scheme.MemcachedCompression.FlagsAndBuf
import com.twitter.finagle.memcached.compressing.scheme.Uncompressed
import com.twitter.finagle.memcached.protocol.Add
import com.twitter.finagle.memcached.protocol.Cas
import com.twitter.finagle.memcached.protocol.Command
import com.twitter.finagle.memcached.protocol.NonStorageCommand
import com.twitter.finagle.memcached.protocol.Response
import com.twitter.finagle.memcached.protocol.RetrievalCommand
import com.twitter.finagle.memcached.protocol.Set
import com.twitter.finagle.memcached.protocol.StorageCommand
import com.twitter.finagle.memcached.protocol.Value
import com.twitter.finagle.memcached.protocol.Values
import com.twitter.finagle.memcached.protocol.ValuesAndErrors
import com.twitter.finagle.server.ServerInfo
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.toggle.Toggle
import com.twitter.io.Buf
import com.twitter.util.Future
import com.twitter.util.Return
import com.twitter.util.Throw
import scala.collection.mutable

private[finagle] object CompressingMemcachedFilter {

/**
* Apply the [[CompressingMemcachedFilter]] protocol specific annotations
*/
def memcachedCompressingModule: Stackable[ServiceFactory[Command, Response]] =
new Stack.Module2[CompressionParam, param.Stats, ServiceFactory[Command, Response]] {
override val role: Stack.Role = Stack.Role("MemcachedCompressing")
override val description: String = "Memcached filter with compression logic"

override def make(
_compressionParam: CompressionParam,
_stats: param.Stats,
next: ServiceFactory[Command, Response]
): ServiceFactory[Command, Response] = {
_compressionParam.scheme match {
case Uncompressed =>
new CompressingMemcachedFilter(Uncompressed, _stats.statsReceiver).andThen(next)
case Lz4 =>
new CompressingMemcachedFilter(Lz4, _stats.statsReceiver).andThen(next)
}
}
}
}

private[finagle] final class CompressingMemcachedFilter(
compressionScheme: CompressionScheme,
statsReceiver: StatsReceiver)
extends SimpleFilter[Command, Response] {
private final val compressionFactory =
CompressionProvider(compressionScheme, statsReceiver)

private val toggle: Toggle = Toggles("com.twitter.finagle.filter.CompressingMemcached")

private val serverInfo = ServerInfo()

override def apply(command: Command, service: Service[Command, Response]): Future[Response] = {
command match {
case storageCommand: StorageCommand =>
if (compressionScheme == Uncompressed || !toggle.isEnabled(serverInfo.id.hashCode)) {
service(storageCommand)
} else { service(compress(storageCommand)) }
case nonStorageCommand: NonStorageCommand =>
decompress(nonStorageCommand, service)
case otherCommand => service(otherCommand)
}
}

private def compressedBufAndFlags(value: Buf, flags: Int): FlagsAndBuf = {
val (compressionFlags, compressedBuf) = compressionFactory.compressor(value)
(CompressionScheme.flagsWithCompression(flags, compressionFlags), compressedBuf)
}

def compress(command: StorageCommand): StorageCommand = {
command match {
case Set(key, flags, expiry, value) =>
val (flagsWithCompression, compressedBuf) =
compressedBufAndFlags(value, flags)

Set(key, flagsWithCompression, expiry, compressedBuf)

case Add(key, flags, expiry, value) =>
val (flagsWithCompression, compressedBuf) =
compressedBufAndFlags(value, flags)

Add(key, flagsWithCompression, expiry, compressedBuf)

case Cas(key, flags, expiry, value, casUnique) =>
val (flagsWithCompression, compressedBuf) =
compressedBufAndFlags(value, flags)

Cas(key, flagsWithCompression, expiry, compressedBuf, casUnique)

case unsupportedStorageCommand => unsupported(unsupportedStorageCommand.name)
}
}

def decompress(
command: NonStorageCommand,
service: Service[Command, Response]
): Future[Response] = {
command match {
case retrievalCommand: RetrievalCommand =>
service(retrievalCommand).map {
case values: Values =>
val decompressedValues =
decompressValues(values.values)
Values(decompressedValues)
case valuesAndErrors: ValuesAndErrors =>
val decompressedValuesAndErrors =
decompressValuesAndErrors(valuesAndErrors)
decompressedValuesAndErrors
case retrievalResponse => retrievalResponse
}
case otherNonStorageCommand => service(otherNonStorageCommand)
}
}

private[finagle] def decompressValues(
values: Seq[Value],
): Seq[Value] = {
val decompressedValuesList = mutable.ArrayBuffer[Value]()

values.foreach { value =>
val flagsInt = flagsFromValues(value)

compressionFactory.decompressor((flagsInt, value.value)) match {
case Throw(ex) => throw ex
case Return(uncompressedValue) =>
decompressedValuesList.append(value.copy(value = uncompressedValue))
}
}

decompressedValuesList.toSeq
}

private[finagle] def decompressValuesAndErrors(
valueAndErrors: ValuesAndErrors,
): ValuesAndErrors = {
// Decompress values. If for some reason this fails, move the values to failures.
val decompressedValuesList = mutable.ArrayBuffer[Value]()
val failuresList = mutable.ListBuffer[(Buf, Throwable)]()

val values = valueAndErrors.values

values.foreach { value =>
try {
val flagsInt = flagsFromValues(value)

compressionFactory.decompressor((flagsInt, value.value)) match {
case Return(decompressedValue) =>
decompressedValuesList.append(value.copy(value = decompressedValue))
case Throw(ex) => failuresList.append(value.key -> ex)
}
} catch {
case ex: Throwable => failuresList.append(value.key -> ex)
}
}

valueAndErrors.copy(
values = decompressedValuesList.toSeq,
errors = valueAndErrors.errors ++ failuresList.toMap)
}

private def flagsFromValues(value: Value): Int = {
// The flags value is stored as a literal string, from the memcache text protocol.
value.flags.flatMap(Buf.Utf8.unapply(_)) match {
case Some(s) => s.toInt
case None => 0
}
}

private def unsupported(command: String) = throw new UnsupportedOperationException(
s"$command is unsupported for compressing cache")
}
Loading

0 comments on commit 17b8d32

Please sign in to comment.