-
Notifications
You must be signed in to change notification settings - Fork 93
/
Flow_scala.txt
223 lines (209 loc) · 12.6 KB
/
Flow_scala.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Merge, Source}
import akka.util.ByteString
import com.typesafe.scalalogging.LazyLogging
import scala.concurrent.{ExecutionContext, Future}
object Flow extends LazyLogging {
/**
* A method which transforms a (materialized) Query to a future of a (materialized) Seq[Result].
* The transformation is undertaken by a stream flow source to sink.
* The stages in this flow are:
* <ol>
* <li>Evaluate and link: take a Query object and the function evaluateAndLink and invoke Source.unfoldAsync on them (which generates a Source[Payload]).
* The Source.unfoldAsync method feeds back a new request which is the eventual, optional result of invoking evaluateAndLink which generates the next element in the stream.
* Once the optional results proves to be None, the Source is finished.</li>
* <li>Process: take the resulting stream from the first stage (a Source[Payload]) and invoke mapAsync with the processFunc.
* The process func is the function which does something useful with the payloads that arrive.
* An example might be to write the payload out to S3 or to transform it into some other format.</li>
* <li>Sink: take the stream which arrives (a Source[Result]) and aggregate the elements together into a Seq[Result], using a fold based on simply adding each element at the end.
* </ol>
*
* CONSIDER recasting this as a Flow.
*
* @param query the initial Query value to be used.
* @param evaluateAndLink a function Query => Future[ Option[ (Query, Payload) ] ].
* @param processFunc a function Payload => Future[Result].
* @tparam Query the query type. An example of a Query type in the Clever domain might be a tuple of HttpRequest and AccountRequestPair.
* @tparam Payload the payload type. An example of Payload is a JSON string (CONSIDER decoding the JSON).
* @tparam Result the result type. An example of Result type is a Status object.
* @return a Future [ Seq [Result] ] which is a sequence of all of the results which were returned from the processFunc method.
*/
def flowLinkProcessFold[Query, Payload, Result](evaluateAndLink: Query => Future[Option[(Query, Payload)]], processFunc: Payload => Future[Result])(query: Query)(implicit materializer: ActorMaterializer): Future[Seq[Result]] =
Source.unfoldAsync[Query, Payload](query)(evaluateAndLink)
.log("flowLinkProcessFold: stream error in evaluate and link stage")
.mapAsync(2)(processFunc)
.log("Stream error in process stage")
.runFold[Seq[Result]](Nil)((a, b) => a :+ b)
/**
* A method which transforms a (materialized) sequence of Query instances to a future of a (materialized) Seq[Result].
* The transformation is undertaken by a stream flow source to sink.
* The stages in this flow are:
* <ol>
* <li>Evaluate and link: take a Query object and the function evaluateAndLink and invoke Source.unfoldAsync on them (which generates a Source[Payload]).
* The Source.unfoldAsync method feeds back a new request which is the eventual, optional result of invoking evaluateAndLink which generates the next element in the stream.
* Once the optional results proves to be None, the Source is finished.</li>
* <li>Merge: the sources resulting from the first stage are merged together (on a first come first served basis) into a single stream, i.e. Source[Payload].
* <li>Process: take the resulting stream from the first stage (a Source[Payload]) and invoke mapAsync with the processFunc.
* The process func is the function which does something useful with the payloads that arrive.
* An example might be to write the payload out to S3 or to transform it into some other format.</li>
* <li>Sink: take the stream which arrives (a Source[Result]) and aggregate the elements together into a Seq[Result], using a fold based on simply adding each element at the end.
* </ol>
*
* CONSIDER recasting this as a Flow.
*
* @param queries a sequence of initial Query values.
* @param evaluateAndLink a function Query => Future[ Option[ (Query, Payload) ] ].
* @param processFunc a function Payload => Future[Result].
* @tparam Query the query type. An example of a Query type in the Clever domain might be a tuple of HttpRequest and AccountRequestPair.
* @tparam Payload the payload type. An example of Payload is a JSON string (CONSIDER decoding the JSON).
* @tparam Result the result type. An example of Result type is a Status object.
* @return a Future [ Seq [Result] ] which is a sequence of all of the results which were returned from the processFunc method.
*/
def flowLinkMergeProcessFold[Query, Payload, Result]
(evaluateAndLink: Query => Future[Option[(Query, Payload)]], processFunc: Payload => Future[Result])
(queries: Seq[Query])
(implicit materializer: ActorMaterializer): Future[Seq[Result]] =
(for (query <- queries) yield Source.unfoldAsync[Query, Payload](query)(evaluateAndLink).log("flowLinkMergeProcessFold: stream error in evaluate and link stage"))
.foldLeft(Source.empty[Payload])(mergeSources)
.log("flowLinkMergeProcessFold: stream error in merge stage")
.mapAsync(2)(processFunc)
.log("flowLinkMergeProcessFold: stream error in process stage")
.runFold[Seq[Result]](Nil)((a, b) => a :+ b)
/**
* Method to decode an HttpResponse (in particular, the response.entity.dataBytes) into a Payload.
*
* @param converter the function which will convert a ByteString into a Payload.
* @param predicate the function to determine if we should attempt to convert the given HttpResponse into a Payload.
* @param r an HttpResponse.
* @param materializer (implicit)
* @param executionContext (implicit)
* @tparam Payload the payload type.
* @return a Future[Payload].
*/
def extractPayloadFromResponse[Payload](predicate: HttpResponse => Boolean, converter: ByteString => Payload)
(r: HttpResponse)
(implicit materializer: ActorMaterializer, executionContext: ExecutionContext):
Future[Payload] = if (predicate(r))
r.entity.dataBytes.runFold(ByteString(""))(_ ++ _) map converter
else {
r.entity.discardBytes()
Future.failed(FlowException(s"HttpResponse was not OK: ${r.status.intValue()} with message: ${r.status.defaultMessage()}"))
}
/**
* Method to determine if an HttpResponse has OK status.
*
* @param r the response.
* @return true if the status is success.
*/
def statusIsSuccess(r: HttpResponse): Boolean = {
val result = r.status.isSuccess()
if (result) logger.info(s"HttpResponse OK") else logger.warn(s"HttpResponse is not OK: $r")
result
}
/**
* Method to take an HttpRequest/Friend pair, submit the request (using the submit function provided),
* then invoke extractPayloadFromResponse to unpack the HttpResponse, using the unpack function and the statusIsSuccess guard function.
* Finally, any failures have their exceptions transformed into a FlowException with added context.
* The friend object is used to add context to any exceptions.
*
* @param submit a function which takes an HttpRequest and yields a Future[HttpResponse].
* @param unpack a function which take a ByteString and unpacks it into a Payload.
* @param requestWithFriend a tuple of HttpRequest and Friend.
* @param materializer (implicit)
* @param executionContext (implicit)
* @param system (implicit)
* @tparam Friend the type of the friend object.
* @tparam Payload the type of the payload object.
* @return a Future[Payload].
*/
def submitQueryAndExtractPayload[Payload, Friend](submit: HttpRequest => Future[HttpResponse])
(unpack: ByteString => Payload)
(requestWithFriend: (HttpRequest, Friend))
(implicit materializer: ActorMaterializer, executionContext: ExecutionContext, system: ActorSystem):
Future[Payload] = {
val (request, friend) = requestWithFriend
val improveException: Throwable => Throwable = {
case FlowException(m, c) => FlowException(s"$m: with additional information: friend=$friend, request=$request", c)
}
(submit(request) flatMap extractPayloadFromResponse(statusIsSuccess, unpack))
.transform(identity[Payload], improveException)
}
/**
* A method which is used as the evaluateAndLink function in the flows defined by flowLinkProcessFold
* and flowLinkMergeProcessFold (above).
* It invokes submitQueryAndExtractPayload and uses the result from that to "chain" a new request.
* The particular use case for this is Clever, where a request provides the address of the next batch of records.
*
* @param submit a function which takes an HttpRequest and yields a Future[HttpResponse].
* @param unpack a function which take a ByteString and unpacks it into a Payload.
* @param requestWithFriend a tuple of HttpRequest and Friend.
* @param feedback a function which takes an Friend and a Payload and yields an optional appropriate HttpRequest.
* @param materializer (implicit)
* @param executionContext (implicit)
* @param system (implicit)
* @tparam Friend the type of the friend object.
* @tparam Payload the type of the payload object.
* @return a Future [ Option [ (HttpRequest, Friend), (Payload, Friend) ] ].
*/
def submitQueryAndExtractPayloadWithChaining[Friend, Payload](submit: HttpRequest => Future[HttpResponse])
(feedback: (Friend, Payload) => Option[(HttpRequest, Friend)], unpack: ByteString => Payload)
(requestWithFriend: (HttpRequest, Friend))
(implicit materializer: ActorMaterializer, executionContext: ExecutionContext, system: ActorSystem):
Future[Option[((HttpRequest, Friend), (Payload, Friend))]] = {
val (_, friend) = requestWithFriend
submitQueryAndExtractPayload(submit)(unpack)(requestWithFriend)
.map(p => zip(feedback(friend, p), Some(p -> friend)))
}
/**
* The standard submission method for an HttpRequest.
*
* @param req the HttpRequest.
* @param system (implicit)
* @return an HttpResponse.
*/
def submitStandard(req: HttpRequest)(implicit system: ActorSystem): Future[HttpResponse] = Http().singleRequest(req)
/**
* Method to get the next HttpRequest when requests are linked, as they are in the Clever datasets.
*
* @param generator a function to generate an optional HttpRequest based on the (Friend, Payload) pair.
* @param payload the payload object which has been decoded from a previous HttpResponse.
* @param friend a "friend" object which helps bind the various requests and responses together. A kind of identifier.
* @tparam Friend the type of a friend.
* @tparam Payload the type of the payload
* @return an optional pair of HttpRequest and Friend. A None result simply means that we are at the end of the chain--no more requests are needed.
*/
def getNextRequest[Friend, Payload](generator: (Friend, Payload) => Option[HttpRequest])(payload: Payload, friend: Friend): Option[(HttpRequest, Friend)] =
generator(friend, payload) map (h => h -> friend)
/**
* Method to merge two sources together.
*
* @param s1 first Source[T].
* @param s2 second Source[T]
* @tparam T the underlying type of the sources.
* @return a new Source[T] which includes the elements of both sources.
*/
def mergeSources[T](s1: Source[T, NotUsed], s2: Source[T, NotUsed]): Source[T, NotUsed] = Source.combine(s1, s2)(Merge(_))
/**
* Method to zip two optional objects together. Not sure why this isn't in the Option object.
*
* TODO make private.
*
* @param to an Option[T].
* @param uo an Option[U].
* @tparam T the underlying type of to.
* @tparam U the underlying type of uo.
* @return an Option[(T, U)].
*/
def zip[T, U](to: Option[T], uo: Option[U]): Option[(T, U)] = to match {
case Some(t) => uo match {
case Some(u) => Some(t -> u)
case _ => None
}
case _ => None
}
}
case class FlowException(msg: String, cause: Throwable = null) extends Exception(msg, cause)