Skip to content

Commit

Permalink
add swiftnio implement
Browse files Browse the repository at this point in the history
  • Loading branch information
huiping192 committed May 5, 2024
1 parent e7ae515 commit 76f1b4e
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 79 deletions.
32 changes: 32 additions & 0 deletions Package.resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"pins" : [
{
"identity" : "swift-atomics",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-atomics.git",
"state" : {
"revision" : "cd142fd2f64be2100422d658e7411e39489da985",
"version" : "1.2.0"
}
},
{
"identity" : "swift-collections",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-collections.git",
"state" : {
"revision" : "a902f1823a7ff3c9ab2fba0f992396b948eda307",
"version" : "1.0.5"
}
},
{
"identity" : "swift-nio",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio",
"state" : {
"revision" : "702cd7c56d5d44eeba73fdf83918339b26dc855c",
"version" : "2.62.0"
}
}
],
"version" : 2
}
11 changes: 5 additions & 6 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,22 @@ import PackageDescription

let package = Package(
name: "HPRTMP",
platforms: [.iOS(.v14),.macOS(.v11)],
platforms: [.iOS(.v14), .macOS(.v11)],
products: [
.library(
name: "HPRTMP",
targets: ["HPRTMP"]
),
],
dependencies: [
// Dependencies declare other packages that this package depends on.
// .package(url: /* package url */, from: "1.0.0"),
.package(url: "https://github.com/apple/swift-nio", from: "2.0.0"),
],
targets: [
// Targets are the basic building blocks of a package. A target can define a module or a test suite.
// Targets can depend on other targets in this package, and on products in packages this package depends on.
.target(
name: "HPRTMP",
dependencies: []),
dependencies: [
.product(name: "NIO", package: "swift-nio")
]),
.testTarget(
name: "HPRTMPTests",
dependencies: ["HPRTMP"]),
Expand Down
43 changes: 0 additions & 43 deletions Sources/HPRTMP/Network.swift

This file was deleted.

120 changes: 120 additions & 0 deletions Sources/HPRTMP/Network/NetworkClient.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import Foundation
import NIO
import os

protocol NetworkConnectable {
func connect(host: String, port: Int) async throws
func sendData(_ data: Data) async throws
func receiveData() async throws -> Data
func close() async throws
}

final class RTMPClientHandler: ChannelInboundHandler {
typealias InboundIn = ByteBuffer
private let responseCallback: (Data) -> Void

init(responseCallback: @escaping (Data) -> Void) {
self.responseCallback = responseCallback
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
var buffer = self.unwrapInboundIn(data)

var data = Data()
buffer.readWithUnsafeReadableBytes { ptr in
data.append(ptr.baseAddress!.assumingMemoryBound(to: UInt8.self), count: ptr.count)
return ptr.count
}

guard !data.isEmpty else { return }
responseCallback(data)
}

func errorCaught(context: ChannelHandlerContext, error: Error) {
print("error: ", error)
context.close(promise: nil)
}
}

class NetworkClient: NetworkConnectable {
private let group: EventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
private var channel: Channel?
private var host: String?
private var port: Int?

private var cachedReceivedData: Data = .init()
private var dataPromise: EventLoopPromise<Data>?

private let logger = Logger(subsystem: "HPRTMP", category: "NetworkClient")


init() {
}

deinit {
let group = group
Task {
try? await group.shutdownGracefully()
}
}

func connect(host: String, port: Int) async throws {
self.host = host
self.port = port

let bootstrap = ClientBootstrap(group: group)
.channelInitializer { channel in
channel.pipeline.addHandlers([
RTMPClientHandler(responseCallback: self.responseReceived)
])
}

do {
self.channel = try await bootstrap.connect(host: host, port: Int(port)).get()
logger.info("[HPRTMP] Connected to \(host):\(port)")
} catch {
logger.error("[HPRTMP] Failed to connect: \(error)")
throw error
}
}

func sendData(_ data: Data) async throws {
guard let channel = self.channel else {
throw NSError(domain: "RTMPClientError", code: -1, userInfo: [NSLocalizedDescriptionKey: "Connection not established"])
}

let buffer = channel.allocator.buffer(bytes: data)
try await channel.writeAndFlush(buffer)
}

func receiveData() async throws -> Data {
guard let channel = self.channel else {
throw NSError(domain: "RTMPClientError", code: -1, userInfo: [NSLocalizedDescriptionKey: "Connection not established"])
}
if !cachedReceivedData.isEmpty {
let data = cachedReceivedData
cachedReceivedData = Data()
return data
}

self.dataPromise = channel.eventLoop.makePromise(of: Data.self)
return try await dataPromise!.futureResult.get()
}

private func responseReceived(data: Data) {
cachedReceivedData.append(data)
if let dataPromise {
dataPromise.succeed(cachedReceivedData)
cachedReceivedData = Data()
self.dataPromise = nil
}
}

func close() async throws {
dataPromise?.fail(NSError(domain: "RTMPClientError", code: -2, userInfo: [NSLocalizedDescriptionKey: "Connection invalidated"]))
let channel = self.channel
dataPromise = nil
self.channel = nil
try await channel?.close()
}
}
41 changes: 11 additions & 30 deletions Sources/HPRTMP/RTMPSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
//

import Foundation
import Network
import os

public enum RTMPStatus {
Expand Down Expand Up @@ -67,7 +66,7 @@ protocol RTMPSocketDelegate: Actor {

public actor RTMPSocket {

private var connection: NWConnection?
private let connection: NetworkConnectable = NetworkClient()

private var status: RTMPStatus = .none

Expand Down Expand Up @@ -127,36 +126,20 @@ extension RTMPSocket {
public func resume() async {
guard status != .connected else { return }
guard let urlInfo else { return }

let port = NWEndpoint.Port(rawValue: UInt16(urlInfo.port))
let host = NWEndpoint.Host(urlInfo.host)
let connection = NWConnection(host: host, port: port ?? 1935, using: .tcp)
self.connection = connection
connection.stateUpdateHandler = { [weak self]newState in
guard let self else { return }
do {
try await connection.connect(host: urlInfo.host, port: urlInfo.port)
status = .open
Task {
switch newState {
case .ready:
self.logger.info("connection state: ready")
guard await self.status == .open else { return }
await self.startShakeHands()
case .failed(let error):
self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)")
await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription))
await self.invalidate()
default:
self.logger.info("connection state: other")
break
}
await self.startShakeHands()
}
} catch {
self.logger.error("[HPRTMP] connection error: \(error.localizedDescription)")
await self.delegate?.socketError(self, err: .uknown(desc: error.localizedDescription))
await self.invalidate()
}
NWConnection.maxReadSize = Int((await windowControl.windowSize))
status = .open
connection.start(queue: DispatchQueue.global(qos: .default))
}

private func startShakeHands() async {
guard let connection = connection else { return }
self.handshake = RTMPHandshake(dataSender: connection.sendData(_:), dataReceiver: receiveData)
await self.handshake?.setDelegate(delegate: self)
do {
Expand All @@ -173,8 +156,7 @@ extension RTMPSocket {
}
await handshake?.reset()
await decoder.reset()
connection?.cancel()
connection = nil
try? await connection.close()
urlInfo = nil
status = .closed
await delegate?.socketDisconnected(self)
Expand Down Expand Up @@ -262,15 +244,14 @@ extension RTMPSocket {

extension RTMPSocket {
private func sendData(_ data: Data) async throws {
try await connection?.sendData(data)
try await connection.sendData(data)
await windowControl.addOutBytesCount(UInt32(data.count))
}
func send(message: RTMPMessage, firstType: Bool) async {
await messagePriorityQueue.enqueue(message, firstType: firstType)
}

private func receiveData() async throws -> Data {
guard let connection = self.connection else { return Data() }
return try await connection.receiveData()
}
}
Expand Down

0 comments on commit 76f1b4e

Please sign in to comment.