diff --git a/user/src/main/kotlin/com/van1164/user/UserRepository.kt b/user/src/main/kotlin/com/van1164/user/UserRepository.kt new file mode 100644 index 0000000..1031ff9 --- /dev/null +++ b/user/src/main/kotlin/com/van1164/user/UserRepository.kt @@ -0,0 +1,11 @@ +package com.van1164.user + +import com.van1164.common.domain.UserR2dbc +import org.springframework.data.r2dbc.repository.R2dbcRepository +import org.springframework.stereotype.Repository +import reactor.core.publisher.Mono + +@Repository +interface UserRepository : R2dbcRepository { + fun findFirstByUserId(userId: String): Mono +} \ No newline at end of file diff --git a/user/src/main/kotlin/com/van1164/user/UserService.kt b/user/src/main/kotlin/com/van1164/user/UserService.kt new file mode 100644 index 0000000..a3993b8 --- /dev/null +++ b/user/src/main/kotlin/com/van1164/user/UserService.kt @@ -0,0 +1,22 @@ +package com.van1164.user + +import com.van1164.common.domain.UserR2dbc +import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories +import org.springframework.stereotype.Service +import reactor.core.publisher.Mono + + +@Service +@EnableR2dbcRepositories(basePackageClasses = [UserRepository::class]) +class UserService( + val userRepository: UserRepository +) { + + fun findByUserId(userId : String): Mono { + return userRepository.findFirstByUserId(userId) + } + + fun save(user: UserR2dbc): Mono { + return userRepository.save(user) + } +} \ No newline at end of file diff --git a/util/src/main/kotlin/com/van1164/common/s3/S3UploadComponent.kt b/util/src/main/kotlin/com/van1164/common/s3/S3UploadComponent.kt index 3a3eaf8..7339a87 100644 --- a/util/src/main/kotlin/com/van1164/common/s3/S3UploadComponent.kt +++ b/util/src/main/kotlin/com/van1164/common/s3/S3UploadComponent.kt @@ -9,6 +9,7 @@ import org.springframework.core.io.buffer.DataBufferUtils import org.springframework.http.codec.multipart.FilePart import org.springframework.stereotype.Component import reactor.core.publisher.Mono +import reactor.kotlin.core.publisher.toMono import java.io.File @Component @@ -57,17 +58,25 @@ class S3UploadComponent( s3Utils.delete("$videoUUID.part$i") } - fun uploadM3U8(m3u8Path: String, outputUUID: String) { + fun uploadM3U8(m3u8Path: String, outputUUID: String): Mono { println("Upload M3U8") - val m3u8File = File(m3u8Path) - s3Utils.put(key = "$outputUUID/$m3u8Path", file = m3u8File) - fileUtils.delete(m3u8File) + return File(m3u8Path).toMono() + .doOnNext { m3U8File -> + s3Utils.put(key = "$outputUUID/$m3u8Path",file=m3U8File) + } + .doOnNext {m3U8File -> + fileUtils.delete(m3U8File) + } + } - fun uploadThumbnail(thumbNailPath: String) { - val thumbNailFile = File(thumbNailPath) - s3Utils.put("thumb/$thumbNailPath", thumbNailFile) - fileUtils.delete(thumbNailFile) + fun uploadThumbnail(thumbNailPath: String): Mono { + return File(thumbNailPath).toMono() + .doOnNext { thumbNailFile -> + s3Utils.put("thumb/$thumbNailPath", thumbNailFile) + }.doOnNext{thumbNailFile-> + fileUtils.delete(thumbNailFile) + } } diff --git a/util/src/main/kotlin/com/van1164/common/util/S3Utils.kt b/util/src/main/kotlin/com/van1164/common/util/S3Utils.kt index 8135bc3..4691511 100644 --- a/util/src/main/kotlin/com/van1164/common/util/S3Utils.kt +++ b/util/src/main/kotlin/com/van1164/common/util/S3Utils.kt @@ -10,7 +10,9 @@ import org.springframework.http.codec.multipart.FilePart import org.springframework.stereotype.Component import org.springframework.web.multipart.MultipartFile import reactor.core.publisher.Mono +import reactor.kotlin.core.publisher.toMono import java.io.File +import java.io.FileInputStream import java.io.InputStream @Component(value = "s3Utils") @@ -67,6 +69,29 @@ class S3Utils( } } + fun put( + key: String, + videoInputStream: FileInputStream + ): Mono { + return videoInputStream.toMono() + .map { + val request = PutObjectRequest( + bucketName, + key, + videoInputStream, + ObjectMetadata() + ) + request.requestClientOptions.readLimit = 80000 + request + } + .doOnNext {request-> + amazonS3.putObject(request) + } + .map { + true + } + } + fun get(key : String): S3Object { return amazonS3.getObject( diff --git a/util/src/main/kotlin/com/van1164/common/util/Utils.kt b/util/src/main/kotlin/com/van1164/common/util/Utils.kt index e5e9aa4..ae0bc1b 100644 --- a/util/src/main/kotlin/com/van1164/common/util/Utils.kt +++ b/util/src/main/kotlin/com/van1164/common/util/Utils.kt @@ -1,8 +1,25 @@ package com.van1164.common.util import mu.KotlinLogging +import java.util.UUID object Utils { val logger = KotlinLogging.logger { } + + fun createFilePath(fileUUID : String, extension : String): String { + return "$fileUUID.$extension" + } + + fun createImagePath(): String { + return UUID.randomUUID().toString() + ".jpg" + } + + fun createVideoPath(): String { + return UUID.randomUUID().toString() + ".mp4" + } + + fun createM3U8Path() : String{ + return UUID.randomUUID().toString() + ".m3u8" + } } \ No newline at end of file diff --git a/video/src/main/kotlin/com/van1164/video/VideoController.kt b/video/src/main/kotlin/com/van1164/video/VideoController.kt index e066ef9..890b918 100644 --- a/video/src/main/kotlin/com/van1164/video/VideoController.kt +++ b/video/src/main/kotlin/com/van1164/video/VideoController.kt @@ -40,26 +40,13 @@ class VideoController( @RequestPart chunkNumber : String, @RequestPart totalChunk : String, @RequestPart fileUUID : String): Mono> { - println("FFFFFFFFFFFFFFFFFFFFFFFF$chunkNumber") - logger.info { "TTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT" } return Mono.just(UploadVideoPartDTO(title,chunkNumber.toInt(),totalChunk.toInt(),fileUUID)) - .doOnNext { - logger.info{ "여기에 있어요!!!"} - logger.info { video.filename() } - } .flatMap {videoData-> videoService.uploadVideoPart(video, videoData) } - .doOnError { - logger.error { "에러" } - } .onErrorReturn ( ResponseEntity.badRequest().build() ) -// val videoData = UploadVideoPartDTO(title,chunkNumber,totalChunk,fileUUID) -// return videoService.uploadVideoPart(video, videoData) - - } @ResponseBody @GetMapping("/videoPartLast/{id}/{totalChunk}", produces = [MediaType.TEXT_EVENT_STREAM_VALUE]) @@ -77,17 +64,12 @@ class VideoController( @AuthenticationPrincipal user : PrincipalDetails, @RequestBody uploadVideoDataDTO: UploadVideoDataDTO ): Mono> { - println(user) - logger.info { uploadVideoDataDTO.title } - logger.info { uploadVideoDataDTO.fileUUID } - logger.info { "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" } return videoService.saveVideoData(uploadVideoDataDTO.title,uploadVideoDataDTO.fileUUID,user.name) } //@PreAuthorize("isAuthenticated()") @GetMapping("/uploadPage") fun uploadPage() : Mono { - logger.info { "업로드 페이지" } return Mono.just("uploadPage") } diff --git a/video/src/main/kotlin/com/van1164/video/VideoService.kt b/video/src/main/kotlin/com/van1164/video/VideoService.kt index 57a0d62..5fe5a1a 100644 --- a/video/src/main/kotlin/com/van1164/video/VideoService.kt +++ b/video/src/main/kotlin/com/van1164/video/VideoService.kt @@ -23,14 +23,16 @@ import reactor.core.publisher.Mono import reactor.core.publisher.Sinks import reactor.core.scheduler.Schedulers import com.van1164.common.s3.S3UploadComponent +import com.van1164.common.util.Utils import org.springframework.http.codec.multipart.FilePart +import reactor.kotlin.core.publisher.toMono import java.io.File import java.nio.file.Files import java.nio.file.Path import java.nio.file.Paths import java.nio.file.StandardOpenOption -import java.util.* -import java.util.concurrent.CompletableFuture +import reactor.kotlin.core.util.function.* +import reactor.util.function.Tuple2 @Service class VideoService( @@ -45,37 +47,28 @@ class VideoService( val sink = Sinks.many().multicast().onBackpressureBuffer() @Transactional fun uploadVideoPartLast(fileUUID : String, totalChunk : Int): Flux> { - val stopWatch = StopWatch() - val inputFilePath = Paths.get(UUID.randomUUID().toString() + ".mp4") - val m3u8Path = "$fileUUID.m3u8" - val thumbNailPath = UUID.randomUUID().toString() + ".jpg" - - stopWatch.start("mp4로 만드는데 걸린 시간") + val inputFilePath = Paths.get(Utils.createVideoPath()) + val m3u8Path = Utils.createFilePath(fileUUID,"m3u8") + val thumbNailPath = Utils.createImagePath() runBlocking { Files.createFile(inputFilePath) } val videoFlux = videoMergeFlux(totalChunk, fileUUID, inputFilePath) - stopWatch.stop() val deleteChunkFileFlux = deleteChunkFileFlux(totalChunk, fileUUID) val thumbNailFlux = - Mono.zip(Mono.just(inputFilePath),Mono.just(thumbNailPath), Mono.just(fileUUID)) - .flatMap{ - Mono.just(it).doOnNext { - createThumbNail(it.t1,it.t2) - } - .flatMap { - saveThumbnailData(it.t3,it.t2) - } - .subscribeOn(Schedulers.parallel()) - } + createThumbNail(inputFilePath,thumbNailPath) + .flatMap { + saveThumbnailData(fileUUID,thumbNailPath) + } + .subscribeOn(Schedulers.parallel()) + val mp4ToHlsFlux = - Mono.zip(Mono.just(inputFilePath),Mono.just(m3u8Path),Mono.just(fileUUID)) - .flatMap{ Mono.just(mp4ToHls(it.t1,it.t2,it.t3)) - .subscribeOn(Schedulers.parallel())} + mp4ToHls(inputFilePath,m3u8Path,fileUUID) + .subscribeOn(Schedulers.parallel()) .doFirst { sink.tryEmitNext(EventDTO("ing","파일 변환 처리중...")) } Flux.concat(videoFlux,Flux.merge(deleteChunkFileFlux,thumbNailFlux,mp4ToHlsFlux)) @@ -93,83 +86,73 @@ class VideoService( } private fun deleteChunkFileFlux(totalChunk: Int, fileUUID: String): Flux { - return Flux.range(0, totalChunk).flatMap { - Flux.just( - s3Repository.deletePart(fileUUID, it) - ) - }.doOnComplete { - sink.tryEmitNext(EventDTO("ing", "썸네일 생성중..")) - }.subscribeOn(Schedulers.boundedElastic()) + return Flux.range(0, totalChunk) + .flatMap { + Flux.just( + s3Repository.deletePart(fileUUID, it) + ) + }.doOnComplete { + sink.tryEmitNext(EventDTO("ing", "썸네일 생성중..")) + }.subscribeOn(Schedulers.boundedElastic()) } private fun videoMergeFlux( totalChunk: Int, fileUUID: String, inputFilePath: Path - ) = Flux.range(0, totalChunk) - .publishOn(Schedulers.boundedElastic()) - .flatMapSequential { - getPartByteArray(fileUUID, it) - } - .doFirst { - sink.tryEmitNext(EventDTO("ing", "파일 업로드 완료하는 중..")) - } - .doOnNext { videoPart -> - logger.info { "파일write" } - Mono.fromCallable { - Files.write(inputFilePath, videoPart, StandardOpenOption.APPEND) - }.subscribeOn(Schedulers.boundedElastic()).subscribe() - } + ): Flux { + return Flux.range(0, totalChunk) + .publishOn(Schedulers.boundedElastic()) + .flatMapSequential { + getPartByteArray(fileUUID, it) + } + .doFirst { + sink.tryEmitNext(EventDTO("ing", "파일 업로드 완료하는 중..")) + } + .flatMap { videoPart -> + Mono.fromCallable { + Files.write(inputFilePath, videoPart, StandardOpenOption.APPEND) + }.subscribeOn(Schedulers.boundedElastic()) + } + } private fun getPartByteArray(fileUUID: String, it: Int): Flux { - val flux = s3Repository.getPartByteArray( + return s3Repository.getPartByteArray( bucketUrl, fileUUID, it - ) - return if (flux != null) { - Flux.just(flux) - } else { - Flux.empty() - } + )?.let{Flux.just(it)} ?:let{Flux.empty()} } private fun mp4ToHls( inputFilePath: Path, m3u8Path: String, outputUUID: String - ) { - logger.info("hls시작") - val stopWatch = StopWatch() - stopWatch.start("mp4를 hls로 바꾸고 업로드하는 데 걸린 시간") + ): Mono { //mp4 to ts - - - mp4ToM3U8(inputFilePath, m3u8Path, outputUUID) - - + return mp4ToM3U8(inputFilePath, m3u8Path, outputUUID) + .flatMap { + uploadVideoTs(outputUUID) + } + .flatMap { + s3Repository.uploadM3U8(m3u8Path, outputUUID) + } // 여러 TS들을 S3에 업로드 - uploadVideoTs(outputUUID) - s3Repository.uploadM3U8(m3u8Path, outputUUID) - stopWatch.stop() - println(stopWatch.prettyPrint()) + } fun createThumbNail( inputFilePath: Path, thumbNailPath: String - ) { - logger.info("썸네일") - val stopWatch = StopWatch() - stopWatch.start("썸네일 만들고 업로드하는 데 걸린 시간") + ): Mono { //thumbnail by ffmpeg + return extractThumbnail(inputFilePath.toString(), thumbNailPath) + .flatMap{ + //uploadThumbnail + s3Repository.uploadThumbnail(thumbNailPath) + } - extractThumbnail(inputFilePath.toString(), thumbNailPath) - //uploadThumbnail - s3Repository.uploadThumbnail(thumbNailPath) - stopWatch.stop() - println(stopWatch.prettyPrint()) } @Transactional @@ -183,31 +166,23 @@ class VideoService( } } - private fun uploadVideoTs(outputUUID: String) { - val futures = mutableListOf>() - var count = 0 - while (true) { - val tsPath = outputUUID + "_" + count.toString().padStart(3, '0') + ".ts" - val tsFile = File(tsPath) - if (tsFile.isFile) { - futures.add( - CompletableFuture.runAsync { - s3Repository.uploadVideoTs("$outputUUID/$tsPath", tsFile) - tsFile.delete() - } - ) - count++ - } else { - break + private fun uploadVideoTs(outputUUID: String): Mono> { + return Flux.range(0, Int.MAX_VALUE) + .flatMap{count-> + val tsPath = Utils.createFilePath(outputUUID + "_" + count.toString().padStart(3, '0'),"ts") + Mono.zip(tsPath.toMono(),File(tsPath).toMono()) } + .takeWhile{ it.t2.isFile} + .filter { it.t2.isFile } + .doOnNext {(tsPath,tsFile)-> + s3Repository.uploadVideoTs("$outputUUID/$tsPath", tsFile) + tsFile.delete() + } + .last() - } - - CompletableFuture.allOf(*futures.toTypedArray()).get() } fun uploadVideoPart(video: FilePart, videoData: UploadVideoPartDTO): Mono> { - logger.info { videoData.chunkNumber } return s3Repository.uploadVideoPart(video, videoData.chunkNumber,videoData.fileUUID) .map{ if(it){ @@ -234,24 +209,25 @@ class VideoService( .onErrorReturn(ResponseEntity.badRequest().build()) } - private fun extractThumbnail(inputFilePath: String, thumbNailPath: String) { - FFmpegBuilder() + private fun extractThumbnail(inputFilePath: String, thumbNailPath: String): Mono { + return FFmpegBuilder() .setInput(inputFilePath) .addOutput(thumbNailPath) .addExtraArgs("-ss", "00:00:1") .addExtraArgs("-vframes", "1") .setStrict(FFmpegBuilder.Strict.EXPERIMENTAL) .done() - .apply { - FFmpegExecutor(ffmpeg, ffprobe).createJob(this).run() + .toMono() + .doOnNext {builder-> + FFmpegExecutor(ffmpeg, ffprobe).createJob(builder).run() } } - private fun mp4ToM3U8(inputFilePath: Path, m3u8Path: String, tsFilePath: String) { - val builder = FFmpegBuilder() + private fun mp4ToM3U8(inputFilePath: Path, m3u8Path: String, tsFilePath: String): Mono { + return FFmpegBuilder() .setInput(inputFilePath.toString()) .addOutput(m3u8Path) .addExtraArgs("-c", "copy") @@ -260,11 +236,20 @@ class VideoService( .addExtraArgs("-start_number", "0") .addExtraArgs("-hls_time", "5") .addExtraArgs("-hls_list_size", "0") - .addExtraArgs("-hls_base_url", "https://video-stream-spring.s3.ap-northeast-2.amazonaws.com/$tsFilePath/") + .addExtraArgs("-hls_base_url", "$bucketUrl$tsFilePath/") .addExtraArgs("-f", "hls") .setStrict(FFmpegBuilder.Strict.EXPERIMENTAL).done() - FFmpegExecutor(ffmpeg, ffprobe).createJob(builder).run() - File(inputFilePath.toString()).delete() + .toMono() + .doOnNext { logger.info{"mp4ToM3U*"} } + .map {builder-> + FFmpegExecutor(ffmpeg, ffprobe).createJob(builder).run() + } + .doOnNext{ + File(inputFilePath.toString()).delete() + } + + + } fun findAll(sort : Sort): Flux { return videoRepository.findAll(sort) diff --git a/video/src/main/kotlin/com/van1164/video/detail/DetailController.kt b/video/src/main/kotlin/com/van1164/video/detail/DetailController.kt new file mode 100644 index 0000000..d2612e0 --- /dev/null +++ b/video/src/main/kotlin/com/van1164/video/detail/DetailController.kt @@ -0,0 +1,41 @@ +package com.van1164.video.detail + +import com.van1164.video.VideoService +import org.springframework.beans.factory.annotation.Value +import org.springframework.stereotype.Controller +import org.springframework.ui.Model +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.PathVariable +import org.springframework.web.bind.annotation.RequestMapping + +@Controller +@RequestMapping("/detail") +class DetailController( + @Value("\${aws.s3.bucketUrl}") + val bucketUrl : String, + val videoService: VideoService +) { + + @GetMapping("/{detail_id}") + fun loadDetailPage(model:Model, @PathVariable(name = "detail_id") detailId : String): String { +// videoService.findById(detailId) +// .map {video-> +// VideoDetailResponseDTO( +// videoUrl = "https://video-stream-spring.s3.ap-northeast-2.amazonaws.com/${video.url}/${video.url}.m3u8", +// title = video.title, +// commentList = video. +// ) +// +// } + + model.addAttribute("m3u8Url" , "$bucketUrl$detailId/$detailId.m3u8") + return "videoDetail" + } + + @GetMapping("/live/{live_id}") + fun loadLivePage(model:Model, @PathVariable(name = "live_id") liveId : String): String { + model.addAttribute("streamKey" , liveId) + return "streamDetail" + } + +} \ No newline at end of file diff --git a/video/src/main/kotlin/com/van1164/video/detail/DetailService.kt b/video/src/main/kotlin/com/van1164/video/detail/DetailService.kt new file mode 100644 index 0000000..1749e3b --- /dev/null +++ b/video/src/main/kotlin/com/van1164/video/detail/DetailService.kt @@ -0,0 +1,8 @@ +package com.van1164.video.detail + +import org.springframework.stereotype.Service + + +@Service +class DetailService { +} \ No newline at end of file diff --git a/video/src/main/resources/templates/videoDetail.html b/video/src/main/resources/templates/videoDetail.html index 798caeb..dd4f4eb 100644 --- a/video/src/main/resources/templates/videoDetail.html +++ b/video/src/main/resources/templates/videoDetail.html @@ -31,8 +31,6 @@

홈으로

-

/* video Element */