Skip to content
This repository has been archived by the owner on Mar 11, 2020. It is now read-only.

Feature/streaming take2 #79

Open
wants to merge 65 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
6c8055a
WIP - Client facing changes for Streaming
jedesah Apr 24, 2015
5f5d72c
Merge branch 'master' into feature/streaming_take2
jedesah Apr 28, 2015
3244392
Add collect method to Remote
jedesah Apr 28, 2015
b5135a8
Implement client side streaming (sending and receiving)
jedesah Apr 28, 2015
ae1e061
Update benchmark tests to reflect Response being a Process
jedesah Apr 29, 2015
6e94f83
Server side streaming
jedesah Apr 29, 2015
e7d96e6
Factor out code common to evaluate and evaluateStream
jedesah Apr 29, 2015
caba72b
Improve runWithoutContext argument list
jedesah Apr 29, 2015
cf4d346
Update test for new Streaming API
jedesah Apr 29, 2015
8432b80
Remove NonDeterminism instance from Response
jedesah Apr 29, 2015
3647f81
Fix Capabilities Test
jedesah Apr 29, 2015
7cb096e
Merge branch 'master' into feature/streaming_take2
jedesah Apr 29, 2015
a8f58e5
Fix compile errors due to merging of master with scodec upgrade
jedesah Apr 29, 2015
a733e94
Merge branch 'master' into feature/streaming_take2
jedesah Apr 29, 2015
bc2b26b
Fix minor compile error after merge
jedesah Apr 29, 2015
b93177d
Comment out a function that should not be available right now
jedesah Apr 30, 2015
1aba960
Fix SSL spec
jedesah Apr 30, 2015
2c09f7f
Fix issue where we were sending the Any tag to the server because of …
jedesah Apr 30, 2015
2ce6a9e
Remove TODO that is now fixed
jedesah Apr 30, 2015
a23b227
Fix stack overflow issue
jedesah Apr 30, 2015
be6d7bc
Add test for Streaming
jedesah May 1, 2015
57e88e5
Merge branch 'master' into feature/streaming_take2
jedesah May 4, 2015
35a6836
Minor refactoring
jedesah May 4, 2015
0930dfd
Merge branch 'master' into feature/streaming_take2
jedesah May 6, 2015
df9be48
Merge branch 'master' into feature/streaming_take2
jedesah May 6, 2015
12b7ed6
Add concept of Streaming to Response
jedesah May 6, 2015
661802e
Improve Streaming test
jedesah May 7, 2015
3469be6
Use EmptyStream as placeholder instead of null
jedesah May 7, 2015
1121089
Revert Response to original state and implement alternative way to id…
jedesah May 8, 2015
4673822
Merge branch 'master' into feature/streaming_take2
jedesah May 8, 2015
24bd9a1
Simple fix I don't understand to fix Streaming
jedesah May 8, 2015
aa199e8
Cleanup Streaming example
jedesah May 8, 2015
69d7d8c
Add knowledge of Streaming to the macro code generation layer
jedesah May 8, 2015
6745f04
Add documentation for collect
jedesah May 11, 2015
7561472
Remove outdated comment
jedesah May 11, 2015
ad8df7f
Delete unused param (revert of unintended change)
jedesah May 11, 2015
9bbb569
Add sealed to App
jedesah May 11, 2015
329de89
Fix runWithoutContext (now I remember..._
jedesah May 11, 2015
6a24514
Remote head() function from AugmentedProcess, use uncons instead
jedesah May 11, 2015
dacb2d8
Change Stream magic number to 255
jedesah May 11, 2015
ca57f81
Minor typing improvement around evalutate
jedesah May 11, 2015
f821d2d
Restore ResponseSpec to original state
jedesah May 11, 2015
8da4370
Add test for failed Stream
jedesah May 11, 2015
939c965
Improve test name
jedesah May 12, 2015
fdfe77c
Fix implementation of uncons
jedesah May 12, 2015
8d60190
Merge branch 'master' into feature/streaming_take2
jedesah May 20, 2015
c3fba37
Add concept of strict or stream fields
jedesah May 27, 2015
e013ee6
Fix String representation of Signature
jedesah May 27, 2015
42779bb
Merge branch 'master' into feature/streaming_take2
jedesah May 27, 2015
d483ae5
Fix bug in Signature
jedesah May 27, 2015
0907369
Get GenerationSpec tests to compile
jedesah May 28, 2015
4eebd5e
Fix tests
jedesah May 28, 2015
3987183
Add tests for Signature String methods
jedesah May 28, 2015
f430fd2
Add extra arity for declareStream
jedesah May 28, 2015
1478295
Enable support for Streaming arguments along side normal arguments in…
jedesah May 28, 2015
e30d763
Add Protocol level test for Streaming
jedesah May 28, 2015
e755771
Add very basic benchmarks for Streaming
jedesah May 28, 2015
f3443f7
Upgrade to scalaz 7.1.2
jedesah Jun 17, 2015
20a94f9
Remove question in comments and clean imports
jedesah Jun 17, 2015
bf0c549
Refactor monitoring on Server side
jedesah Jun 17, 2015
76457c5
Fix warning which uncovered error in test
jedesah Jun 18, 2015
31a1312
Fix compile error
jedesah Jul 8, 2015
e0f3aef
Add chat server example
jedesah Jul 8, 2015
d807f4b
Cleanup chat example
jedesah Jul 8, 2015
20c44af
Add TODO
jedesah Jul 30, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions benchmark/client/src/main/scala/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ object BenchmarkClientMain extends TestData with transformations {
new Test(results, server.BenchmarkClient.identityBig(toBigW(bigIn)).runWithoutContext(endpoint))
// new Test(results, BenchmarkClient.identityMedium(toMediumW(medIn)).runWithoutContext(endpoint))
// new Test(results, BenchmarkClient.identityLarge(toLargeW(largeIn)).runWithoutContext(endpoint))
// We are currently measuring how fast we can send receive streams of a single element.
// It might be more useful to measure how fast we can stream elements of a single stream
// new Test(results, server.BenchmarkClient.streamBig(localStreamToRemote(bigStream.map(toBigW))).run(endpoint))
// new Test(results, server.BenchmarkClient.streamMedium(medStream.map(toMediumW)).runWithoutContext(endpoint))
// new Test(results, server.BenchmarkClient.streamLarge(largeStream.map(toLargeW)).runWithoutContext(endpoint))
}
val threads = testers.map(new Thread(_))

Expand Down
9 changes: 6 additions & 3 deletions benchmark/protocol/src/main/scala/Protocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ object protocol {
.codec[MediumW]
.codec[SmallW]
.codec[BigW]
.specify1("identityLarge", Field.strict[LargeW]("in"), Type[LargeW])
.specify1[MediumW, MediumW]("identityMedium", Field.strict[MediumW]("in"), Type[MediumW])
.specify1[BigW, BigW]("identityBig", Field.strict[BigW]("in"), Type[BigW])
.specify1("identityLarge", Field.strict[LargeW]("in"), Type.strict[LargeW])
.specify1("identityMedium", Field.strict[MediumW]("in"), Type.strict[MediumW])
.specify1("identityBig", Field.strict[BigW]("in"), Type.strict[BigW])
.specify1("streamLarge", Field.stream[LargeW]("in"), Type.stream[LargeW])
.specify1("streamMedium", Field.stream[MediumW]("in"), Type.stream[MediumW])
.specify1("streamBig", Field.stream[BigW]("in"), Type.stream[BigW])
}
9 changes: 9 additions & 0 deletions benchmark/protocol/src/main/scala/TestData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package remotely
package example.benchmark

import scalaz.concurrent.Task
import scalaz.stream.Process

trait TestData {
val sm: Small = Small((for(i <- 1 to 10) yield i.toString -> i.toString).toMap, (for(i <- 1 to 10) yield i.toString).toList)

Expand All @@ -27,4 +30,10 @@ trait TestData {

val bigIn: Big = Big(1)

val bigStream: Process[Task, Big] = Process(bigIn)

val medStream: Process[Task, Medium] = Process(medIn)

val largeStream: Process[Task, Large] = Process(largeIn)

}
12 changes: 7 additions & 5 deletions benchmark/server/src/main/scala/ServerImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ package example.benchmark
package server

import scalaz.concurrent._
import java.util.concurrent._
import scalaz.stream.Process
import java.util.concurrent.atomic.AtomicInteger

class BenchmarkServerImpl extends BenchmarkServer with transformations {
override def identityLarge = (large: LargeW) => Response[LargeW]((c: Response.Context) => Task.now{toLargeW(fromLargeW(large))})
override def identityMedium = (med: MediumW) => Response[MediumW]((c: Response.Context) => Task.now{toMediumW(fromMediumW(med))})
override def identityBig = (big: BigW) => Response[BigW]((c: Response.Context) => Task.now{toBigW(fromBigW(big))})

override def identityLarge = (large: LargeW) => Response.now(toLargeW(fromLargeW(large)))
override def identityMedium = (med: MediumW) => Response.now(toMediumW(fromMediumW(med)))
override def identityBig = (big: BigW) => Response.now(toBigW(fromBigW(big)))
override def streamLarge = (largeStream: Process[Task, LargeW]) => Response.now(largeStream.map(large => toLargeW(fromLargeW(large))))
override def streamMedium = (mediumStream: Process[Task, MediumW]) => Response.now(mediumStream.map(medium => toMediumW(fromMediumW(medium))))
override def streamBig = (bigStream: Process[Task, BigW]) => Response.now(bigStream.map(big => toBigW(fromBigW(big))))
}

object Main {
Expand Down
3 changes: 2 additions & 1 deletion core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ scalacOptions ++= Seq(
libraryDependencies ++= Seq(
"org.scodec" %% "scodec-core" % "1.7.0",
"org.scodec" %% "scodec-scalaz" % "1.0.0",
"org.scalaz" %% "scalaz-core" % "7.1.0",
"org.scalaz" %% "scalaz-core" % "7.1.2",
"org.scalaz" %% "scalaz-concurrent" % "7.1.2",
"org.scalaz.stream" %% "scalaz-stream" % "0.7a",
"org.apache.commons" % "commons-pool2" % "2.2",
"io.netty" % "netty-handler" % "4.0.25.Final",
Expand Down
8 changes: 2 additions & 6 deletions core/src/main/scala/Environment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,8 @@ case class Environment[H <: HList](codecs: Codecs[H], values: Values) {
this.populate(_ => v)

private def serverHandler(monitoring: Monitoring): Handler = { bytes =>
// we assume the input is a framed stream, and encode the response(s)
// as a framed stream as well
bytes pipe Process.await1[BitVector] /*server.Handler.deframe*/ evalMap { bs =>
Server.handle(this)(bs)(monitoring)
}
}
Server.handle(this)(bytes)(monitoring)
}

/**
* start a netty server listening to the given address
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/scala/Gen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,17 @@ private[remotely] object Gen extends MacrosCompatibility {
def liftSignature(c: Context)(signature: Signature): c.universe.Tree = {
import c.universe._
val s = signature
val t: Tree = q"_root_.remotely.Signature(${s.name}, List(..${s.params.map(liftField(c)(_))}), ${s.outType})"
val t: Tree = q"_root_.remotely.Signature(${s.name}, List(..${s.params.map(liftField(c)(_))}), ${liftType(c)(s.out)})"
t
}

def liftField(c: Context)(field: Field[Any]): c.universe.Tree = {
import c.universe._
q"_root_.remotely.Field(${field.name}, ${field.typeString})"
q"_root_.remotely.Field(${field.name}, ${liftType(c)(field.type_)})"
}

def liftType(c: Context)(type_ : Type[Any]): c.universe.Tree = {
import c.universe._
q"_root_.remotely.Type(${type_.name}, ${type_.isStream})"
}
}
9 changes: 5 additions & 4 deletions core/src/main/scala/GenServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ object GenServer extends MacrosCompatibility {

// Creates name/type pairs from the signatures in the protocol.
val signatures = p.signatures.signatures.map { s =>
val typ = parseType(c)(Signatures.wrapResponse(s.typeString))
(s.name, typ)
val typ = parseType(c)(s.wrapResponse)
(s.name, typ, s.out.isStream)
}

// Generates the method defs for the generated class.
val sigDefs = signatures.collect { case (n,t) if n != "describe" =>
val sigDefs = signatures.collect { case (n,t, isStream) if n != "describe" =>
genSig(n, t)
}

Expand Down Expand Up @@ -93,7 +93,8 @@ object GenServer extends MacrosCompatibility {

private def populateDeclarations(env: Values): Values =
${ signatures.foldLeft(q"env":c.Tree)((e,s) =>
q"$e.declare(${Literal(Constant(s._1))},${Ident(createTermName(c)(s._1))})"
if (s._3) q"$e.declareStream(${Literal(Constant(s._1))},${Ident(createTermName(c)(s._1))})"
else q"$e.declare(${Literal(Constant(s._1))},${Ident(createTermName(c)(s._1))})"
)}

..$body
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/scala/Monitoring.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ package remotely
import java.net.InetSocketAddress
import java.net.SocketAddress
import scala.concurrent.duration._
import scalaz.\/
import scalaz.{\/-, \/}
import scalaz.concurrent.Task
import scalaz.stream.Process

/**
* A collection of callbacks that can be passed to `[[remotely.Environment#serve]]`
Expand All @@ -39,6 +41,11 @@ trait Monitoring { self =>
result: Throwable \/ A,
took: Duration): Unit

def handle[A](ctx: Response.Context, req: Remote[A], references: Iterable[String], result: Throwable \/ A, startTime: Long) = Task.delay {
val duration = Duration.fromNanos(System.nanoTime() - startTime)
handled(ctx, req, references, result, duration)
}

def negotiating(addr: Option[SocketAddress],
what: String,
error: Option[Throwable]): Unit
Expand All @@ -48,6 +55,10 @@ trait Monitoring { self =>
case _ => "<unknown addr>"
}

def sink[A](ctx: Response.Context, req: Remote[A], references: Iterable[String], startTime: Long) = Process.constant { a: A =>
handle(ctx, req, references, \/-(a), startTime)
}

/**
* Return a new `Monitoring` instance that send statistics
* to both `this` and `other`.
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/Protocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ case class Protocol[H <: HList](codecs: Codecs[H], signatures: Signatures) {
this.copy(codecs = codecs.codec[A])

def specify0[O](name: String, out: Type[O])(implicit evidenceA : Selector[H, O]): Protocol[H] =
this.copy(signatures = signatures.specify0(name, out.name))
this.copy(signatures = signatures.specify0(name, out))

def specify1[A,O](name: String, in: Field[A], out: Type[O])(implicit evidenceA : Selector[H, A], evidenceB : Selector[H, O]): Protocol[H] =
this.copy(signatures = signatures.specify1(name, in, out.name))
this.copy(signatures = signatures.specify1(name, in, out))

def specify2[A,B,O](name: String, in1: Field[A], in2: Field[B], out: Type[O])(implicit evidenceA : Selector[H, A], evidenceB : Selector[H, B], evidenceC : Selector[H, O]): Protocol[H] =
this.copy(signatures = signatures.specify2(name, in1, in2, out.name))
this.copy(signatures = signatures.specify2(name, in1, in2, out))

def specify3[A,B,C,O](name: String, in1: Field[A], in2: Field[B], in3: Field[C], out: Type[O])(implicit evidenceA : Selector[H, A], evidenceB : Selector[H, B], evidenceC : Selector[H, C], evidenceD: Selector[H, O]): Protocol[H] =
this.copy(signatures = signatures.specify3(name, in1, in2, in3, out.name))
this.copy(signatures = signatures.specify3(name, in1, in2, in3, out))

def specify4[A,B,C,D,O](name: String, in1: Field[A], in2: Field[B], in3: Field[C], in4: Field[D], out: Type[O])(implicit evidenceA : Selector[H, A], evidenceB : Selector[H, B], evidenceC : Selector[H, C], evidenceD: Selector[H, D], evidenceE: Selector[H, O]): Protocol[H] =
this.copy(signatures = signatures.specify4(name, in1, in2, in3, in4, out.name))
this.copy(signatures = signatures.specify4(name, in1, in2, in3, in4, out))

def pretty: String =
"Protocol(\n" +
Expand Down
82 changes: 50 additions & 32 deletions core/src/main/scala/Remote.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.reflect.runtime.universe.TypeTag
import scodec.{Codec,Decoder,Encoder}
import scodec.bits.BitVector
import shapeless._
import scalaz.stream.Process

/**
* Represents a remote computation which yields a
Expand All @@ -38,6 +39,18 @@ sealed trait Remote[+A] {
def pretty: String = "Remote {\n " +
Remote.refs(this).mkString("\n ") + "\n " +
toString + "\n}"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add documentation

/**
* Builds a List by applying a partial function to all children of this Remote on which the function is defined.
*/
def collect[B](partial: PartialFunction[Remote[Any],B]): List[B] = {
val me = partial.lift(this).toList
val rest = this match {
case app: Remote.Ap[_] => (app.f :: app.args).map(_.collect(partial)).flatten
case _ => Nil
}
me ++ rest
}
}

object Remote {
Expand All @@ -52,6 +65,15 @@ object Remote {
def local[A:Encoder:TypeTag](a: A): Remote[A] =
Remote.Local(a, Some(Encoder[A]), Remote.toTag[A])

def localStream[A: Encoder:TypeTag](stream: Process[Task, A]): Remote[Process[Task,A]] =
Remote.LocalStream(stream, Some(Encoder[A]), Remote.toTag[A])

implicit class RunSyntaxForStreaming[A](self: Remote[Process[Task,A]]) {
/** Call `self.run(at, M).apply(ctx)` to get back a `Task[A]`. */
def run(at: Endpoint, ctx: Response.Context = Response.Context.empty, M: Monitoring = Monitoring.empty)(implicit A: TypeTag[A], C: Codec[A]): Task[Process[Task,A]] =
evaluateStream(at,M)(self).apply(ctx)
}

/** Provides the syntax `expr.run(endpoint)`, where `endpoint: Endpoint`. */
implicit class RunSyntax[A](self: Remote[A]) {
/**
Expand All @@ -66,8 +88,8 @@ object Remote {
run(at, M).apply(ctx)

/** Run this with an empty context */
def runWithoutContext(at: Endpoint)(implicit A: TypeTag[A], C: Codec[A]): Task[A] =
runWithContext(at, Response.Context.empty)
def runWithoutContext(at: Endpoint, M: Monitoring = Monitoring.empty)(implicit A: TypeTag[A], C: Codec[A]): Task[A] =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something weird

runWithContext(at, Response.Context.empty, M)
}
implicit class Ap1Syntax[A,B](self: Remote[A => B]) {
def apply(a: Remote[A]): Remote[B] =
Expand Down Expand Up @@ -95,64 +117,58 @@ object Remote {
override def toString = a.toString
}

private[remotely] case class LocalStream[A](
stream: Process[Task,A],
format: Option[Encoder[A]],
tag: String
) extends Remote[Process[Task,A]] {
override def toString = stream.toString
}

/**
* Reference to a remote value on the server.
*/
private[remotely] case class Ref[A](name: String) extends Remote[A] {
override def toString = name.takeWhile(_ != ':')
}

private[remotely] sealed abstract class Ap[A](val f: Remote[Any],val args: List[Remote[Any]]) extends Remote[A] {
override def toString = s"""$f({$args.mkString(","})"""
}

// we require a separate constructor for each function
// arity, since remote invocations must be fully saturated
private[remotely] case class Ap1[A,B](
f: Remote[A => B],
a: Remote[A]) extends Remote[B] {
override def toString = s"$f($a)"
}
override val f: Remote[A => B],
a: Remote[A]) extends Ap[B](f,List(a))

private[remotely] case class Ap2[A,B,C](
f: Remote[(A,B) => C],
override val f: Remote[(A,B) => C],
a: Remote[A],
b: Remote[B]) extends Remote[C] {
override def toString = s"$f($a, $b)"
}
b: Remote[B]) extends Ap[C](f,List(a,b))

private[remotely] case class Ap3[A,B,C,D](
f: Remote[(A,B,C) => D],
override val f: Remote[(A,B,C) => D],
a: Remote[A],
b: Remote[B],
c: Remote[C]) extends Remote[D] {
override def toString = s"$f($a, $b, $c)"
}
c: Remote[C]) extends Ap[D](f,List(a,b,c))

private[remotely] case class Ap4[A,B,C,D,E](
f: Remote[(A,B,C,D) => E],
override val f: Remote[(A,B,C,D) => E],
a: Remote[A],
b: Remote[B],
c: Remote[C],
d: Remote[D]) extends Remote[E] {
override def toString = s"$f($a, $b, $c, $d)"
}
d: Remote[D]) extends Ap[E](f,List(a,b,c,d))

/** Collect up all the `Ref` names referenced by `r`. */
def refs[A](r: Remote[A]): SortedSet[String] = r match {
case Local(a,e,t) => SortedSet.empty
def refs[A](r: Remote[A]): SortedSet[String] = (r collect {
case Ref(t) => SortedSet(t)
case Ap1(f,a) => refs(f).union(refs(a))
case Ap2(f,a,b) => refs(f).union(refs(b)).union(refs(b))
case Ap3(f,a,b,c) => refs(f).union(refs(b)).union(refs(b)).union(refs(c))
case Ap4(f,a,b,c,d) => refs(f).union(refs(b)).union(refs(b)).union(refs(c)).union(refs(d))
}
}).fold(SortedSet.empty[String])(_.union(_))

/** Collect up all the formats referenced by `r`. */
def formats[A](r: Remote[A]): SortedSet[String] = r match {
def formats[A](r: Remote[A]): SortedSet[String] = r.collect {
case Local(a,e,t) => SortedSet(t)
case Ref(t) => SortedSet.empty
case Ap1(f,a) => formats(f).union(formats(a))
case Ap2(f,a,b) => formats(f).union(formats(b)).union(formats(b))
case Ap3(f,a,b,c) => formats(f).union(formats(b)).union(formats(b)).union(formats(c))
case Ap4(f,a,b,c,d) => formats(f).union(formats(b)).union(formats(b)).union(formats(c)).union(formats(d))
}
}.fold(SortedSet.empty[String])(_.union(_))

def toTag[A:TypeTag]: String = {
val tt = typeTag[A]
Expand All @@ -175,6 +191,8 @@ object Remote {

/** Implicitly promote a local value to a `Remote[A]`. */
implicit def localToRemote[A:Encoder:TypeTag](a: A): Remote[A] = local(a)

implicit def localStreamToRemote[A: Encoder:TypeTag](stream: Process[Task, A]): Remote[Process[Task,A]] = localStream(stream)
}
}

Loading