Skip to content

Commit

Permalink
Merge pull request #334 from DongHoonYu96/feture-be-#333
Browse files Browse the repository at this point in the history
[BE] feat#333  updatePos socket batch 처리
  • Loading branch information
DongHoonYu96 authored Dec 23, 2024
2 parents 7309170 + 7ddb652 commit 6c0957e
Showing 1 changed file with 44 additions and 11 deletions.
55 changes: 44 additions & 11 deletions BE/src/game/redis/subscribers/player.subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import { MetricService } from '../../../metric/metric.service';

@Injectable()
export class PlayerSubscriber extends RedisSubscriber {
private positionUpdates: Map<string, any> = new Map(); // Map<gameId, {playerId, playerPosition}[]>
private positionUpdatesForDead: Map<string, any> = new Map(); // Map<gameId, {playerId, playerPosition}[]>
private positionUpdatesMetrics: Map<string, any> = new Map(); // Map<gameId, {startedAt}[]>

constructor(
@InjectRedis() redis: Redis,
private metricService: MetricService,
private metricService: MetricService
) {
super(redis);
}
Expand All @@ -28,25 +32,48 @@ export class PlayerSubscriber extends RedisSubscriber {
if (!playerId || message !== 'hset') {
return;
}
const key = `Player:${playerId}`;
const { changes, playerData } = await this.handlePlayerChanges(key, playerId, server);
const playerKey = REDIS_KEY.PLAYER(playerId);
const gameId = await this.redis.hget(playerKey, 'gameId');
const changes = await this.redis.get(`${playerKey}:Changes`);

const endedAt = process.hrtime(startedAt);
const delta = endedAt[0] * 1e9 + endedAt[1];
const executionTime = delta / 1e6;
if (!this.positionUpdatesMetrics.has(gameId)) {
this.positionUpdatesMetrics.set(gameId, []); // 빈 배열로 초기화
}
this.positionUpdatesMetrics.get(gameId).push(startedAt);

this.metricService.recordResponse(changes, 'success');
this.metricService.recordLatency(changes, 'response', executionTime);
await this.handlePlayerChanges(changes, playerId, server);
});

setInterval(() => {
// 배치 처리
this.positionUpdates.forEach((queue, gameId) => {
if (queue.length > 0) {
const batch = queue.splice(0, queue.length); //O(N)
server.to(gameId).emit(SocketEvents.UPDATE_POSITION, batch);
}
});

// 배치 처리에 관한 메트릭 측정
this.positionUpdatesMetrics.forEach((metrics) => {
if (metrics.length > 0) {
const batch = metrics.splice(0, metrics.length);
batch.forEach((startedAt) => {
const endedAt = process.hrtime(startedAt);
const delta = endedAt[0] * 1e9 + endedAt[1];
const executionTime = delta / 1e6;
this.metricService.recordLatency('Position', 'response', executionTime);
});
}
});
}, 100);
}

private extractPlayerId(channel: string): string | null {
const splitKey = channel.replace('__keyspace@0__:', '').split(':');
return splitKey.length === 2 ? splitKey[1] : null;
}

private async handlePlayerChanges(key: string, playerId: string, server: Namespace) {
const changes = await this.redis.get(`${key}:Changes`);
private async handlePlayerChanges(changes: string, playerId: string, server: Namespace) {
const playerKey = REDIS_KEY.PLAYER(playerId);
const playerData = await this.redis.hgetall(playerKey);
const result = { changes, playerData };
Expand Down Expand Up @@ -102,7 +129,13 @@ export class PlayerSubscriber extends RedisSubscriber {
const isAlivePlayer = await this.redis.hget(REDIS_KEY.PLAYER(playerId), 'isAlive');

if (isAlivePlayer === SurvivalStatus.ALIVE) {
server.to(gameId).emit(SocketEvents.UPDATE_POSITION, updateData);
// 1. Map에 배열을 만들고 set
if (!this.positionUpdates.has(gameId)) {
this.positionUpdates.set(gameId, []); // 빈 배열로 초기화
}
this.positionUpdates.get(gameId).push(updateData);

// server.to(gameId).emit(SocketEvents.UPDATE_POSITION, updateData);
} else if (isAlivePlayer === SurvivalStatus.DEAD) {
const players = await this.redis.smembers(REDIS_KEY.ROOM_PLAYERS(gameId));
const pipeline = this.redis.pipeline();
Expand Down

0 comments on commit 6c0957e

Please sign in to comment.