Skip to content

Commit

Permalink
feat: 스트리밍 기능 구현 및 테스트코드 작성
Browse files Browse the repository at this point in the history
  • Loading branch information
van1164 committed Apr 23, 2024
1 parent 7178b0f commit 8113f7e
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ class SecurityConfig(
csrf { disable() }
cors { }
authorizeRequests {
authorize("/api/v1/stream/verify", permitAll)
authorize("/api/v1/stream/live/**",permitAll)
authorize("/api/v1/stream/**",authenticated)
authorize("/api/v1/upload/**",authenticated)
authorize("/api/v1/stream/live/**",permitAll)
authorize("/**",permitAll)
}
oauth2Login {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import org.springframework.web.bind.annotation.ResponseBody
@Controller
@RequestMapping("/detail")
class DetailController(
val streamService : StreamService,
val videoService: VideoService
) {

@GetMapping("/{detail_id}")
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/com/KY/KoreanYoutube/domain/LiveStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ data class LiveStream(
),

@Column(name = "on_air")
val onAir : Boolean = true,
var onAir : Boolean = false,

@Id
@GeneratedValue(strategy = GenerationType.AUTO)
Expand Down
4 changes: 4 additions & 0 deletions src/main/kotlin/com/KY/KoreanYoutube/redis/RedisRepository.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,8 @@ class RedisRepository(private val redisTemplate: RedisTemplate<String, String>,
return mapper.readValue(redisTemplate.opsForValue().get(key), type)
}

fun removeRtmp(streamKey: String): String? {
return redisTemplate.opsForValue().getAndDelete(streamKey)
}

}
20 changes: 18 additions & 2 deletions src/main/kotlin/com/KY/KoreanYoutube/redis/RedisService.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.KY.KoreanYoutube.redis

import com.KY.KoreanYoutube.domain.User
import com.KY.KoreanYoutube.utils.RTMP_ING_PREFIX
import com.KY.KoreanYoutube.utils.RTMP_PREFIX
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.stereotype.Service
import java.time.Duration
Expand All @@ -13,11 +15,15 @@ class RedisService(
) {

fun saveRtmp(streamKey : String){
redisRepository.save(streamKey,"",Duration.ofMinutes(10))
redisRepository.save(RTMP_PREFIX + streamKey,"off",Duration.ofMinutes(10))
}

fun loadRtmp(streamKey: String): String? {
return redisRepository.load(streamKey,String::class.java)
return redisRepository.load(RTMP_PREFIX + streamKey,String::class.java)
}

fun loadRtmpAndRemove(streamKey: String): String? {
return redisRepository.removeRtmp(RTMP_PREFIX + streamKey)
}


Expand All @@ -29,4 +35,14 @@ class RedisService(
return redisRepository.load(jwt, User::class.java)
}

fun saveRtmpIng(key: String) {
redisRepository.save(RTMP_ING_PREFIX + key,"live",Duration.ofHours(12))
}

fun doneRtmpIng(streamKey: String) {
redisRepository.removeRtmp(RTMP_ING_PREFIX +streamKey)
}



}
29 changes: 13 additions & 16 deletions src/main/kotlin/com/KY/KoreanYoutube/stream/StreamController.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.KY.KoreanYoutube.stream

import com.KY.KoreanYoutube.dto.StreamDTO
import com.KY.KoreanYoutube.redis.RedisService
import com.KY.KoreanYoutube.security.PrincipalDetails
import mu.KotlinLogging
import org.springframework.http.HttpStatus
import org.springframework.http.MediaType
import org.springframework.http.ResponseEntity
import org.springframework.http.codec.ServerSentEvent
Expand All @@ -15,7 +17,7 @@ import reactor.core.publisher.Flux
@Controller
@RequestMapping("/api/v1/stream")
class StreamController(
val streamService: StreamService
val streamService: StreamService,
) {
val logger = KotlinLogging.logger{}
@GetMapping("/streamPage")
Expand All @@ -36,25 +38,11 @@ class StreamController(
@PostMapping("/save_stream")
fun saveStream(
@AuthenticationPrincipal user : PrincipalDetails,
streamDTO : StreamDTO // 나중에 jwt로 변경 예정
streamDTO : StreamDTO
): ResponseEntity<Any> {
return streamService.saveStream(streamDTO,user.name)
}

@GetMapping("/stop")
fun streamStop(
@RequestParam("user_id") userId : String // 나중에 jwt로 변경 예정
){

}

// @GetMapping("/live/{user_id}")
// fun getStream(
// @PathVariable("user_id") userId : String // 나중에 jwt로 변경 예정
// ){
//
// }

@ResponseBody
@GetMapping("/live/{key}")
fun getm3u8(
Expand All @@ -75,4 +63,13 @@ class StreamController(
return streamService.getTsFile(key,fileName)
}

@GetMapping("/verify")
fun verify(@RequestParam name : String): ResponseEntity<HttpStatus> {
return streamService.verifyStream(name)
}

@GetMapping("/done")
fun done(@RequestParam name : String): ResponseEntity<HttpStatus> {
return streamService.doneStream(name)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ import org.springframework.stereotype.Repository
@Repository
interface StreamRepository :JpaRepository<LiveStream,Long>{
fun findByOnAirIsTrueOrderByCreateDateDesc():List<LiveStream>

fun findFirstByStreamKey(streamKey : String):LiveStream?
}
101 changes: 49 additions & 52 deletions src/main/kotlin/com/KY/KoreanYoutube/stream/StreamService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import net.bramp.ffmpeg.job.FFmpegJob
import org.springframework.core.io.FileSystemResource
import org.springframework.data.domain.Sort
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpStatus
import org.springframework.http.HttpStatus.*
import org.springframework.http.ResponseEntity
import org.springframework.http.codec.ServerSentEvent
Expand All @@ -23,6 +24,7 @@ import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import reactor.core.scheduler.Schedulers
import reactor.kotlin.core.publisher.toMono
import java.io.File
import java.nio.file.Files
import java.nio.file.Path
Expand Down Expand Up @@ -54,7 +56,6 @@ class StreamService(
}
val streamKey = UUID.randomUUID().toString()
val stream = LiveStream(streamDTO,userId,streamKey)
user.onAir = true//나중에 요청으로 변경
streamRepository.save(stream)
return ResponseEntity(streamKey, OK)
} catch (e: Exception) {
Expand All @@ -63,14 +64,24 @@ class StreamService(

}

@Transactional("transactionManager")
fun startStream(key: String): Flux<ServerSentEvent<String>> {
val stream = checkNotNull(streamRepository.findFirstByStreamKey(key))
redisService.saveRtmp(key)
logger.info{"==========================startStream=========================="}
val m3u8Path = Paths.get(File.separatorChar+"tmp",File.separatorChar + "hls", File.separatorChar + key,File.separatorChar + "index.m3u8" )
userService.findByUserId(stream.userName)
.toMono()
.doOnNext {user ->
user.onAir = true
}
.flatMapMany {
checkStreamStart(key).subscribeOn(Schedulers.parallel())
}
.subscribe()

// logger.info{"==========================startStream=========================="}
// val m3u8Path = Paths.get(File.separatorChar+"tmp",File.separatorChar + "hls", File.separatorChar + key,File.separatorChar + "index.m3u8" )


//Flux.merge(checkStreamStart(m3u8Path),startStreamListen(streamPath, key, m3u8Path, filePath)).subscribeOn(Schedulers.parallel()).subscribe()
//Flux.merge(startStream,checkStreamStart).subscribe()
checkStreamStart(m3u8Path).subscribeOn(Schedulers.parallel()).subscribe()
logger.info { "middle point" }
return sink.asFlux().map { event ->
ServerSentEvent.builder<String>(event.message)
Expand All @@ -79,14 +90,15 @@ class StreamService(
}
}

private fun checkStreamStart(m3u8Path: Path): Flux<Boolean> {
private fun checkStreamStart(key: String): Flux<Boolean> {
logger.info { "checking" }
return Flux.interval(Duration.ofSeconds(1))
.take(600)
.map {
logger.info { m3u8Path }
if (m3u8Path.exists()) {
logger.info { key }
if (redisService.loadRtmp(key) ==null) {
logger.info { "finish" }
redisService.saveRtmpIng(key)
sink.tryEmitNext(Event("finish", "finish"))
return@map true
}
Expand All @@ -97,48 +109,6 @@ class StreamService(
.takeUntil{it == true}
}

private fun startStreamListen(
streamPath: Path,
key: String,
m3u8Path: String,
filePath: String
): Mono<FFmpegJob> {
return Mono.just(streamPath).doOnNext {
if (!streamPath.isDirectory()) {
Files.createDirectory(Paths.get("stream"))
}
}.map {
Paths.get("stream/$key")
}
.doOnNext { keyPath->
if (!keyPath.isDirectory()) {
Files.createDirectory(keyPath)
}
}.map{
val builder = FFmpegBuilder()
.addExtraArgs("-listen", "1")
.setInput("rtmp://localhost:1935/live/$key")
.addExtraArgs("-timeout", "600")
.addOutput(m3u8Path)
.addExtraArgs("-c", "copy")
.addExtraArgs("-bsf:v", "h264_mp4toannexb")
.addExtraArgs("-hls_segment_filename", "${filePath}/${key}_%04d.ts")
.addExtraArgs("-start_number", "0")
.addExtraArgs("-hls_time", "5")
.addExtraArgs("-hls_list_size", "0")
.addExtraArgs("-hls_base_url", "$HLS_BASE_URL$key/")
.addExtraArgs("-f", "hls")
.setStrict(FFmpegBuilder.Strict.EXPERIMENTAL).done()
logger.info { "job실행" }
FFmpegExecutor(ffmpeg, ffprobe).createJob(builder)
}
.doOnNext {job->
job.run()
}



}

fun getTsFile(key: String, fileName: String): ResponseEntity<Any> {
//val path = Paths.get("stream",File.separatorChar + key, File.separatorChar + fileName)
Expand All @@ -151,7 +121,7 @@ class StreamService(
val headers = HttpHeaders()
val resource = FileSystemResource(path)
headers.add("Content-Type", Files.probeContentType(path))
headers.add(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + resource.getFilename() + "\"")
headers.add(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + resource.filename + "\"")
headers.add("Content-Transfer-Encoding", "binary;")
ResponseEntity.ok().headers(headers).body(resource)
}
Expand All @@ -162,4 +132,31 @@ class StreamService(
fun findAllOnAir(): List<LiveStream> {
return streamRepository.findByOnAirIsTrueOrderByCreateDateDesc()
}

fun verifyStream(streamKey: String) : ResponseEntity<HttpStatus> {
return if(redisService.loadRtmpAndRemove(streamKey) ==null){
ResponseEntity(BAD_REQUEST)
} else{
ResponseEntity(OK)
}
}


@Transactional("transactionManager")
fun doneStream(streamKey: String) : ResponseEntity<HttpStatus> {
try{
redisService.doneRtmpIng(streamKey)
val stream = checkNotNull(streamRepository.findFirstByStreamKey(streamKey))
stream.onAir = false
val user = checkNotNull(userService.findByUserId(stream.userName))
user.onAir = false
return ResponseEntity(HttpStatus.OK)
} catch (e : Exception){
return ResponseEntity(BAD_REQUEST)
}
}

fun findByStreamKey(streamKey : String): LiveStream? {
return streamRepository.findFirstByStreamKey(streamKey)
}
}
4 changes: 3 additions & 1 deletion src/main/kotlin/com/KY/KoreanYoutube/utils/Constant.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ package com.KY.KoreanYoutube.utils


const val BUCKET_NAME = "video-stream-spring"
const val s3URL = "https://video-stream-spring.s3.ap-northeast-2.amazonaws.com/"
const val s3URL = "https://video-stream-spring.s3.ap-northeast-2.amazonaws.com/"
const val RTMP_PREFIX = "RTMP::"
const val RTMP_ING_PREFIX = "RTMP_ING::"
1 change: 0 additions & 1 deletion src/main/resources/templates/streaming.html
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ <h3>연결되면 나옵니다.</h3>
body: formData,
headers: {"Authorization": "Bearer "+jwt}
}).then(resp => {
// 진행률 표시
console.log(resp.body)
resp.text().then(data => {
console.log(data)
Expand Down
40 changes: 39 additions & 1 deletion src/test/kotlin/com/KY/KoreanYoutube/LiveStreamTest.kt
Original file line number Diff line number Diff line change
@@ -1,4 +1,42 @@
package com.KY.KoreanYoutube

class LiveStreamTest {
import com.KY.KoreanYoutube.domain.User
import com.KY.KoreanYoutube.dto.StreamDTO
import com.KY.KoreanYoutube.security.OAuthProvider
import com.KY.KoreanYoutube.stream.StreamService
import com.KY.KoreanYoutube.user.UserService
import io.kotest.core.spec.style.AnnotationSpec
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.context.TestConstructor


@SpringBootTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
class LiveStreamTest @Autowired constructor(
val streamService: StreamService,
val userService: UserService,
) {

@Test
fun liveStreamTest(){
userService.save(User("test_user","name","email", OAuthProvider.GOOGLE))
val streamKey = streamService.saveStream(StreamDTO("testTitle","des"),"test_user").body as String
streamService.startStream(streamKey).subscribe()
streamService.verifyStream(streamKey)
streamService.doneStream(streamKey)

val stream = streamService.findByStreamKey(streamKey)
assert (stream !=null)
assert (!stream!!.onAir)

val user = userService.findByUserId("test_user")
assert (user != null)
assert (!user!!.onAir)
}


}

0 comments on commit 8113f7e

Please sign in to comment.