Skip to content

Commit

Permalink
Route ice event messages over RabbitMQ to allow running as multi-inst…
Browse files Browse the repository at this point in the history
…ance
  • Loading branch information
Brutus5000 committed Nov 29, 2024
1 parent 5738df4 commit 2aa4c7a
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 6 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
implementation("io.quarkus:quarkus-config-yaml")
implementation("io.quarkus:quarkus-scheduler")
implementation("io.quarkus:quarkus-resteasy-reactive-jackson")
implementation("io.quarkus:quarkus-smallrye-reactive-messaging-rabbitmq")
implementation("io.quarkus:quarkus-container-image-docker")
implementation("io.quarkus:quarkus-hibernate-orm-panache-kotlin")
implementation("io.quarkus:quarkus-jdbc-mariadb")
Expand Down
9 changes: 9 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,12 @@ services:
MARIADB_PASSWORD: banana
ports:
- "3306:3306"

rabbitmq:
image: rabbitmq:3.12-management
environment:
RABBITMQ_DEFAULT_VHOST: /faf-core
RABBITMQ_DEFAULT_USER: faf-icebreaker
RABBITMQ_DEFAULT_PASS: banana
ports:
- "5672:5672"
25 changes: 20 additions & 5 deletions src/main/kotlin/com/faforever/icebreaker/service/SessionService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,23 @@ import com.faforever.icebreaker.persistence.IceSessionEntity
import com.faforever.icebreaker.persistence.IceSessionRepository
import com.faforever.icebreaker.security.CurrentUserService
import com.faforever.icebreaker.util.AsyncRunner
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.convertValue
import io.quarkus.scheduler.Scheduled
import io.quarkus.security.ForbiddenException
import io.quarkus.security.UnauthorizedException
import io.quarkus.security.identity.SecurityIdentity
import io.smallrye.jwt.build.Jwt
import io.smallrye.mutiny.Multi
import io.smallrye.mutiny.helpers.MultiEmitterProcessor
import io.vertx.core.json.JsonObject
import jakarta.enterprise.inject.Instance
import jakarta.inject.Singleton
import jakarta.transaction.Transactional
import org.eclipse.microprofile.jwt.JsonWebToken
import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import org.eclipse.microprofile.reactive.messaging.Incoming
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Instant
Expand All @@ -30,10 +36,13 @@ class SessionService(
private val iceSessionRepository: IceSessionRepository,
private val securityIdentity: SecurityIdentity,
private val currentUserService: CurrentUserService,
private val objectMapper: ObjectMapper,
@Channel("events-out")
private val rabbitmqEventEmitter: Emitter<EventMessage>,
) {
private val activeSessionHandlers = sessionHandlers.filter { it.active }
private val eventEmitter = MultiEmitterProcessor.create<EventMessage>()
private val eventBroadcast: Multi<EventMessage> = eventEmitter.toMulti().broadcast().toAllSubscribers()
private val localEventEmitter = MultiEmitterProcessor.create<EventMessage>()
private val localEventBroadcast: Multi<EventMessage> = localEventEmitter.toMulti().broadcast().toAllSubscribers()

fun buildToken(gameId: Long): String {
val userId =
Expand Down Expand Up @@ -125,9 +134,9 @@ class SessionService(

fun listenForEventMessages(gameId: Long): Multi<EventMessage> {
val userId = currentUserService.getCurrentUserId()
eventEmitter.emit(ConnectedMessage(gameId = gameId, senderId = currentUserService.getCurrentUserId()!!))
rabbitmqEventEmitter.send(ConnectedMessage(gameId = gameId, senderId = currentUserService.getCurrentUserId()!!))

return eventBroadcast.filter {
return localEventBroadcast.filter {
it.gameId == gameId && (it.recipientId == userId || (it.recipientId == null && it.senderId != userId))
}
}
Expand All @@ -143,6 +152,12 @@ class SessionService(
"current user id $currentUserId from endpoint does not match sourceId ${candidatesMessage.senderId} in candidateMessage"
}

eventEmitter.emit(candidatesMessage)
rabbitmqEventEmitter.send(candidatesMessage)
}

@Incoming("events-in")
fun onEventMessage(eventMessage: JsonObject) {
val parsedMessage = objectMapper.convertValue<EventMessage>(eventMessage.map)
localEventEmitter.emit(parsedMessage)
}
}
25 changes: 24 additions & 1 deletion src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ quarkus:
# (also requires the CustomTenantResolver)
self-tenant:
# There is no .well-known/openid-configuration
auth-server-url: ${SELF_URL:https://ice.faforever.com}
discovery-enabled: false
token:
# Hard coded JWT settings, as there is no JWKS
Expand Down Expand Up @@ -61,6 +60,30 @@ smallrye:
jwt:
sign:
key: ${JWT_PRIVATE_KEY_PATH}
mp:
messaging:
incoming:
events-in:
connector: smallrye-rabbitmq
virtual-host: ${RABBITMQ_VHOST:/faf-core}
queue:
name: events.${HOSTNAME:local}
auto-delete: true
exclusive: true
exchange:
name: ice
outgoing:
events-out:
connector: smallrye-rabbitmq
virtual-host: ${RABBITMQ_VHOST:/faf-core}
exchange:
name: ice

rabbitmq-host: ${RABBITMQ_HOST:localhost}
rabbitmq-port: ${RABBITMQ_PORT:5672}
rabbitmq-username: ${RABBITMQ_USER:faf-icebreaker}
rabbitmq-password: ${RABBITMQ_PASSWORD:banana}

"%dev":
xirsys:
enabled: ${XIRSYS_ENABLED:false}
Expand Down

0 comments on commit 2aa4c7a

Please sign in to comment.