Skip to content

Commit

Permalink
Merge pull request #1944 from UKHomeOffice/DRTII-1673-no-hourly-pax-d…
Browse files Browse the repository at this point in the history
…ata-from-late-december-2024

Drtii 1673 no hourly pax data from late december 2024
  • Loading branch information
richbirch authored Oct 30, 2024
2 parents 45df0ea + 6b862d7 commit d10834e
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 136 deletions.
2 changes: 1 addition & 1 deletion client/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ object DynamicRunnableDeskRecs extends DrtRunnableGraph {
}


private def crunchRequestsToDeskRecs(loadsProvider: (SDateLike, SDateLike, Terminal) => Future[Map[TQM, PassengersMinute]],
maxDesksProviders: Map[Terminal, TerminalDeskLimitsLike],
loadsToQueueMinutes: PassengersToQueueMinutes,
setUpdatedAtForDay: (Terminal, LocalDate, Long) => Future[Done],
)
(implicit executionContext: ExecutionContext, ac: AirportConfig): Flow[TerminalUpdateRequest, MinutesContainer[CrunchMinute, TQM], NotUsed] = {
def crunchRequestsToDeskRecs(loadsProvider: (SDateLike, SDateLike, Terminal) => Future[Map[TQM, PassengersMinute]],
maxDesksProviders: Map[Terminal, TerminalDeskLimitsLike],
loadsToQueueMinutes: PassengersToQueueMinutes,
setUpdatedAtForDay: (Terminal, LocalDate, Long) => Future[Done],
)
(implicit executionContext: ExecutionContext, ac: AirportConfig): Flow[TerminalUpdateRequest, MinutesContainer[CrunchMinute, TQM], NotUsed] = {
Flow[TerminalUpdateRequest]
.mapAsync(1) { request =>
val start = request.start.addMinutes(ac.crunchOffsetMinutes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ object DynamicRunnablePassengerLoads extends DrtRunnableGraph {
.wireTap { crWithPax =>
log.info(s"${crWithPax._1} crunch request - ${crWithPax._2.minutes.size} minutes of passenger loads with ${crWithPax._2.minutes.map(_.toMinute.passengers.size).sum} passengers")
val datesToUpdate = Set(crWithPax._1.start.toUtcDate, crWithPax._1.end.toUtcDate)
datesToUpdate.foreach(updateCapacity)
datesToUpdate.foreach { d =>
updateCapacity(d)
updateLiveView(crWithPax._2)
}
}
.via(Flow[(TerminalUpdateRequest, MinutesContainer[PassengersMinute, TQM])].map {
case (pr, paxMinutes) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import drt.shared._
import org.slf4j.{Logger, LoggerFactory}
import services.TryCrunchWholePax
import services.crunch.desklimits.TerminalDeskLimitsLike
import services.crunch.deskrecs
import services.graphstages.{DynamicWorkloadCalculator, FlightFilter, WorkloadCalculatorLike}
import uk.gov.homeoffice.drt.arrivals.{FlightsWithSplits, Splits}
import uk.gov.homeoffice.drt.ports.Queues._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,31 @@ import akka.stream.scaladsl.Source
import akka.testkit.TestProbe
import akka.{Done, NotUsed}
import controllers.ArrivalGenerator
import drt.shared.CrunchApi.{MinutesContainer, PassengersMinute}
import drt.shared.CrunchApi._
import drt.shared._
import manifests.passengers.{BestAvailableManifest, ManifestLike, ManifestPaxCount}
import manifests.passengers.{BestAvailableManifest, ManifestPaxCount}
import manifests.queues.SplitsCalculator
import manifests.{ManifestLookupLike, UniqueArrivalKey}
import passengersplits.parsing.VoyageManifestParser.{PassengerInfoJson, VoyageManifest, VoyageManifests}
import queueus._
import services.TryCrunchWholePax
import services.crunch.VoyageManifestGenerator.manifestForArrival
import services.crunch.desklimits.PortDeskLimits
import services.crunch.deskrecs.OptimiserMocks._
import services.crunch.{CrunchTestLike, MockEgatesProvider, TestDefaults, VoyageManifestGenerator}
import services.crunch.{CrunchTestLike, MockEgatesProvider, TestDefaults}
import services.graphstages.{CrunchMocks, FlightFilter}
import uk.gov.homeoffice.drt.actor.acking.AckingReceiver.StreamInitialized
import uk.gov.homeoffice.drt.actor.commands.TerminalUpdateRequest
import uk.gov.homeoffice.drt.arrivals.SplitStyle.Percentage
import uk.gov.homeoffice.drt.arrivals._
import uk.gov.homeoffice.drt.ports.Queues.{EGate, EeaDesk, Queue}
import uk.gov.homeoffice.drt.ports.SplitRatiosNs.SplitSources
import uk.gov.homeoffice.drt.ports.SplitRatiosNs.SplitSources.{ApiSplitsWithHistoricalEGateAndFTPercentages, Historical, TerminalAverage}
import uk.gov.homeoffice.drt.ports.Queues.{EeaDesk, Queue}
import uk.gov.homeoffice.drt.ports.Terminals.{T1, Terminal}
import uk.gov.homeoffice.drt.ports._
import uk.gov.homeoffice.drt.redlist.RedListUpdates
import uk.gov.homeoffice.drt.time.{LocalDate, SDate, SDateLike}

import scala.collection.SortedSet
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}


object OptimiserMocks {
class MockActor(somethingToReturn: List[Any]) extends Actor {
override def receive: Receive = {
case _ => sender() ! Source(somethingToReturn)
}
}

class MockSinkActor(probe: ActorRef) extends Actor {
override def receive: Receive = {
case StreamInitialized =>
Expand All @@ -53,61 +42,8 @@ object OptimiserMocks {
}
}

def getMockManifestLookupService(arrivalsWithMaybePax: Map[Arrival, Option[List[PassengerInfoJson]]],
portCode: PortCode): MockManifestLookupService =
MockManifestLookupService(arrivalsWithMaybePax.map { case (arrival, maybePax) =>
val key = UniqueArrivalKey(portCode, arrival.Origin, arrival.VoyageNumber, SDate(arrival.Scheduled))
val maybeManifest = maybePax.map(pax => BestAvailableManifest.historic(VoyageManifestGenerator.manifestForArrival(arrival, pax)))
(key, maybeManifest)
}, arrivalsWithMaybePax.map { case (arrival, maybePax) =>
val key = UniqueArrivalKey(portCode, arrival.Origin, arrival.VoyageNumber, SDate(arrival.Scheduled))
val maybeManifest = maybePax.map(pax => ManifestPaxCount(VoyageManifestGenerator.manifestForArrival(arrival, pax), SplitSources.Historical))
(key, maybeManifest)
}, portCode)

def mockFlightsProvider(flights: List[ApiFlightWithSplits]): TerminalUpdateRequest => Future[Source[List[ApiFlightWithSplits], NotUsed]] =
_ => Future.successful(Source(List(flights)))

def mockHistoricManifestsProviderNoop: Iterable[Arrival] => Source[ManifestLike, NotUsed] = {
_: Iterable[Arrival] => Source(List())
}

def mockHistoricManifestsPaxProviderNoop: Arrival => Future[Option[ManifestPaxCount]] = {
_: Arrival => Future.successful(None)
}

def mockLiveManifestsProvider(arrival: Arrival, maybePax: Option[List[PassengerInfoJson]]): TerminalUpdateRequest => Future[Source[VoyageManifests, NotUsed]] = {
val manifests = maybePax match {
case Some(pax) => VoyageManifests(Set(manifestForArrival(arrival, pax)))
case None => VoyageManifests(Set())
}

_ => Future.successful(Source(List(manifests)))
}

def mockHistoricManifestsProvider(arrivalsWithMaybePax: Map[Arrival, Option[List[PassengerInfoJson]]])
(implicit ec: ExecutionContext): Iterable[Arrival] => Source[ManifestLike, NotUsed] = {
val portCode = PortCode("STN")

val mockCacheLookup: Arrival => Future[Option[ManifestLike]] = _ => Future.successful(None)
val mockCacheStore: (Arrival, ManifestLike) => Future[Any] = (_: Arrival, _: ManifestLike) => Future.successful(StatusReply.Ack)

OptimisationProviders.historicManifestsProvider(
portCode,
getMockManifestLookupService(arrivalsWithMaybePax, portCode),
mockCacheLookup,
mockCacheStore,
)
}

def mockHistoricManifestsPaxProvider(arrivalsWithMaybePax: Map[Arrival, Option[List[PassengerInfoJson]]])
(implicit ec: ExecutionContext): Arrival => Future[Option[ManifestPaxCount]] = {
val portCode = PortCode("STN")
OptimisationProviders.historicManifestsPaxProvider(
portCode,
getMockManifestLookupService(arrivalsWithMaybePax, portCode)
)
}
}

case class MockManifestLookupService(bestAvailableManifests: Map[UniqueArrivalKey, Option[BestAvailableManifest]],
Expand All @@ -126,14 +62,14 @@ case class MockManifestLookupService(bestAvailableManifests: Map[UniqueArrivalKe
departurePort: PortCode,
voyageNumber: VoyageNumber,
scheduled: SDateLike,
): Future[(UniqueArrivalKey, Option[ManifestPaxCount])] = {
): Future[(UniqueArrivalKey, Option[ManifestPaxCount])] = {
val key = UniqueArrivalKey(arrivalPort, departurePort, voyageNumber, scheduled)
Future.successful((key, historicManifestsPax.get(key).flatten))
}
}

class RunnableDynamicDeskRecsSpec extends CrunchTestLike {
val airportConfig: AirportConfig = TestDefaults.airportConfigWithEgates
implicit val airportConfig: AirportConfig = TestDefaults.airportConfigWithEgates

val mockCrunch: TryCrunchWholePax = CrunchMocks.mockCrunchWholePax

Expand All @@ -152,33 +88,29 @@ class RunnableDynamicDeskRecsSpec extends CrunchTestLike {
val request = TerminalUpdateRequest(T1, SDate(flight.apiFlight.Scheduled).toLocalDate)
val sink = system.actorOf(Props(new MockSinkActor(probe.ref)))

val queueMinutesProducer = DynamicRunnablePassengerLoads.crunchRequestsToQueueMinutes(
arrivalsProvider = mockFlightsProvider(List(flight)),
portDesksAndWaitsProvider = desksAndWaitsProvider,
redListUpdatesProvider = () => Future.successful(RedListUpdates.empty),
dynamicQueueStatusProvider = DynamicQueueStatusProvider(airportConfig, MockEgatesProvider.portProvider(airportConfig)),
queuesByTerminal = airportConfig.queuesByTerminal,
updateLiveView = _ => Future.successful(StatusReply.Ack),
paxFeedSourceOrder = paxFeedSourceOrder,
terminalSplits = _ => Option(Splits(Set(
ApiPaxTypeAndQueueCount(PaxTypes.EeaMachineReadable, EeaDesk, 50, None, None),
ApiPaxTypeAndQueueCount(PaxTypes.EeaMachineReadable, EGate, 50, None, None),
), TerminalAverage, None, Percentage)),
updateCapacity = _ => Future.successful(Done),
val minute = SDate(flight.apiFlight.Scheduled).millisSinceEpoch
val maxDeskProviders = PortDeskLimits.flexed(airportConfig, MockEgatesProvider.terminalProvider(airportConfig))
val queueMinutesProducer = DynamicRunnableDeskRecs.crunchRequestsToDeskRecs(
loadsProvider = (_, _, _) => {
Future.successful(Map(TQM(T1, EeaDesk, minute) -> PassengersMinute(T1, EeaDesk, minute, Seq(50, 50, 50), None)))
},
maxDesksProviders = maxDeskProviders,
loadsToQueueMinutes = (_, mins, _, _, _) => Future.successful(DeskRecMinutes(mins.values.map(m => DeskRecMinute(m.terminal, m.queue, m.minute, m.passengers.size, m.passengers.sum, 0, 0, None)).toList)),
setUpdatedAtForDay = (_, _, _) => Future.successful(Done),
)

val crunchGraphSource = new SortedActorRefSource(TestProbe().ref, SortedSet.empty[TerminalUpdateRequest], "passenger-loads")

val (queue, _) = QueuedRequestProcessing.createGraph(crunchGraphSource, sink, queueMinutesProducer, "passenger-loads").run()
queue ! request

probe.fishForMessage(2.second) {
case container: MinutesContainer[PassengersMinute, TQM] =>
case container: MinutesContainer[CrunchMinute, TQM] =>
val tqPax = container.minutes
.groupBy(pm => (pm.toMinute.terminal, pm.toMinute.queue))
.map {
case (tq, mins) =>
(tq, mins.map(_.toMinute.passengers.size).sum)
(tq, mins.map(_.toMinute.paxLoad).sum)
}
.collect {
case (tq, pax) if pax > 0 => (tq, pax)
Expand All @@ -187,58 +119,19 @@ class RunnableDynamicDeskRecsSpec extends CrunchTestLike {
}
}

def manifestsByKey(manifest: VoyageManifest): Map[ArrivalKey, VoyageManifest] =
List(manifest)
.map { vm => vm.maybeKey.map(k => (k, vm)) }
.collect { case Some(k) => k }
.toMap

"Given an arrival with 100 pax " >> {

val arrival = ArrivalGenerator.live("BA0001", schDt = s"2021-06-01T12:00", origin = PortCode("JFK"), totalPax = Option(100))
.toArrival(LiveFeedSource)
.copy(PcpTime = Option(SDate("2021-06-01T11:30").millisSinceEpoch))

"When I provide no live and no historic manifests, terminal splits should be applied (50% desk, 50% egates)" >> {
val expected: Map[(Terminal, Queue), Int] = Map((T1, EGate) -> 50, (T1, EeaDesk) -> 50)
val expected: Map[(Terminal, Queue), Int] = Map((T1, EeaDesk) -> 3)
setupGraphAndCheckQueuePax(
flight = ApiFlightWithSplits(arrival, Set()),
expectedQueuePax = expected)

success
}

"When I provide only historic splits with 100% to eea desk, all pax should arrive at the eea desk " >> {
val expected: Map[(Terminal, Queue), Int] = Map((T1, EeaDesk) -> 100)
val historicSplits = Splits(Set(ApiPaxTypeAndQueueCount(PaxTypes.GBRNational, EeaDesk, 1, None, None)), Historical, None, Percentage)
setupGraphAndCheckQueuePax(
flight = ApiFlightWithSplits(arrival, Set(historicSplits)),
expectedQueuePax = expected
)

success
}
}

"validApiPercentage" >> {
val validApi = ApiFlightWithSplits(
ArrivalGenerator.live(totalPax = Option(100)).toArrival(LiveFeedSource),
Set(Splits(
Set(ApiPaxTypeAndQueueCount(PaxTypes.EeaMachineReadable, EeaDesk, 100, None, None)),
ApiSplitsWithHistoricalEGateAndFTPercentages, Option(EventTypes.DC))))
val invalidApi = ApiFlightWithSplits(
ArrivalGenerator.live(totalPax = Option(100)).toArrival(LiveFeedSource),
Set(Splits(
Set(ApiPaxTypeAndQueueCount(PaxTypes.EeaMachineReadable, EeaDesk, 50, None, None)),
ApiSplitsWithHistoricalEGateAndFTPercentages, Option(EventTypes.DC))))
"Given no flights, then validApiPercentage should give 100%" >> {
DynamicRunnablePassengerLoads.validApiPercentage(Seq()) === 100d
}
"Given 1 flight with live api splits, when it is valid, then validApiPercentage should give 100%" >> {
DynamicRunnablePassengerLoads.validApiPercentage(Seq(validApi)) === 100d
}
"Given 4 flights with live api splits, when 3 are categorised as valid, then validApiPercentage should give 75%" >> {
DynamicRunnablePassengerLoads.validApiPercentage(Seq(validApi, validApi, validApi, invalidApi)) === 75d
}
}
}
Loading

0 comments on commit d10834e

Please sign in to comment.