diff --git a/.github/workflows/swift_test_matrix.yml b/.github/workflows/swift_test_matrix.yml index cec7fd70f5..9b4a36e935 100644 --- a/.github/workflows/swift_test_matrix.yml +++ b/.github/workflows/swift_test_matrix.yml @@ -19,8 +19,8 @@ concurrency: jobs: execute-matrix: - name: ${{ matrix.swift.platform }} (${{ matrix.swift.name }}) - runs-on: ${{ matrix.swift.runner }} + name: ${{ matrix.config.platform }} (${{ matrix.config.name }}) + runs-on: ${{ matrix.config.runner }} strategy: fail-fast: false matrix: ${{ fromJson(inputs.matrix_string) }} @@ -31,37 +31,37 @@ jobs: persist-credentials: false submodules: true - name: Pull Docker image - run: docker pull ${{ matrix.swift.image }} + run: docker pull ${{ matrix.config.image }} - name: Run matrix job - if: ${{ matrix.swift.platform != 'Windows' }} + if: ${{ matrix.config.platform != 'Windows' }} run: | - if [[ -n "${{ matrix.swift.setup_command }}" ]]; then - setup_command_expression="${{ matrix.swift.setup_command }} &&" + if [[ -n "${{ matrix.config.setup_command }}" ]]; then + setup_command_expression="${{ matrix.config.setup_command }} &&" else setup_command_expression="" fi workspace="/$(basename ${{ github.workspace }})" docker run -v ${{ github.workspace }}:"$workspace" \ -w "$workspace" \ - -e SWIFT_VERSION="${{ matrix.swift.swift_version }}" \ + -e SWIFT_VERSION="${{ matrix.config.swift_version }}" \ -e setup_command_expression="$setup_command_expression" \ -e workspace="$workspace" \ - ${{ matrix.swift.image }} \ - bash -c "swift --version && git config --global --add safe.directory \"$workspace\" && $setup_command_expression ${{ matrix.swift.command }} ${{ matrix.swift.command_arguments }}" + ${{ matrix.config.image }} \ + bash -c "swift --version && git config --global --add safe.directory \"$workspace\" && $setup_command_expression ${{ matrix.config.command }} ${{ matrix.config.command_arguments }}" - name: Run matrix job (Windows) - if: ${{ matrix.swift.platform == 'Windows' }} + if: ${{ matrix.config.platform == 'Windows' }} run: | - if (-not [string]::IsNullOrEmpty("${{ matrix.swift.setup_command }}")) { - $setup_command_expression = "${{ matrix.swift.setup_command }} &" + if (-not [string]::IsNullOrEmpty("${{ matrix.config.setup_command }}")) { + $setup_command_expression = "${{ matrix.config.setup_command }} &" } else { $setup_command_expression = "" } $workspace = "C:\" + (Split-Path ${{ github.workspace }} -Leaf) docker run -v ${{ github.workspace }}:$($workspace) ` -w $($workspace) ` - -e SWIFT_VERSION="${{ matrix.swift.swift_version }}" ` + -e SWIFT_VERSION="${{ matrix.config.swift_version }}" ` -e setup_command_expression=%setup_command_expression% ` - ${{ matrix.swift.image }} ` - cmd /s /c "swift --version & powershell Invoke-Expression ""$($setup_command_expression) ${{ matrix.swift.command }} ${{ matrix.swift.command_arguments }}""" + ${{ matrix.config.image }} ` + cmd /s /c "swift --version & powershell Invoke-Expression ""$($setup_command_expression) ${{ matrix.config.command }} ${{ matrix.config.command_arguments }}""" env: - SWIFT_VERSION: ${{ matrix.swift.swift_version }} + SWIFT_VERSION: ${{ matrix.config.swift_version }} diff --git a/NOTICE.txt b/NOTICE.txt index cbcc2e69bb..d79250ea55 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -95,3 +95,12 @@ This product contains a derivation of the mocking infrastructure from Swift Syst * https://www.apache.org/licenses/LICENSE-2.0 * HOMEPAGE: * https://github.com/apple/swift-system + +--- + +This product contains a derivation of "TokenBucket.swift" from Swift Package Manager. + + * LICENSE (Apache License 2.0): + * https://www.apache.org/licenses/LICENSE-2.0 + * HOMEPAGE: + * https://github.com/swiftlang/swift-package-manager diff --git a/Sources/NIOAsyncAwaitDemo/AsyncChannelIO.swift b/Sources/NIOAsyncAwaitDemo/AsyncChannelIO.swift index e5612e1f69..5e6f04a38d 100644 --- a/Sources/NIOAsyncAwaitDemo/AsyncChannelIO.swift +++ b/Sources/NIOAsyncAwaitDemo/AsyncChannelIO.swift @@ -24,8 +24,11 @@ struct AsyncChannelIO { } func start() async throws -> AsyncChannelIO { - try await channel.pipeline.addHandler(RequestResponseHandler()) - .get() + try await channel.eventLoop.submit { + try channel.pipeline.syncOperations.addHandler( + RequestResponseHandler() + ) + }.get() return self } diff --git a/Sources/NIOCore/ByteBuffer-aux.swift b/Sources/NIOCore/ByteBuffer-aux.swift index 232dffd874..1ec8e9cf3b 100644 --- a/Sources/NIOCore/ByteBuffer-aux.swift +++ b/Sources/NIOCore/ByteBuffer-aux.swift @@ -733,13 +733,13 @@ extension ByteBuffer: Codable { public init(from decoder: Decoder) throws { let container = try decoder.singleValueContainer() let base64String = try container.decode(String.self) - self = try ByteBuffer(bytes: base64String.base64Decoded()) + self = try ByteBuffer(bytes: base64String._base64Decoded()) } /// Encodes this buffer as a base64 string in a single value container. public func encode(to encoder: Encoder) throws { var container = encoder.singleValueContainer() - let base64String = String(base64Encoding: self.readableBytesView) + let base64String = String(_base64Encoding: self.readableBytesView) try container.encode(base64String) } } diff --git a/Sources/NIOCore/EventLoopFuture.swift b/Sources/NIOCore/EventLoopFuture.swift index 793d3a7ed5..4579576ed6 100644 --- a/Sources/NIOCore/EventLoopFuture.swift +++ b/Sources/NIOCore/EventLoopFuture.swift @@ -163,7 +163,12 @@ public struct EventLoopPromise { internal static func makeUnleakablePromise(eventLoop: EventLoop, line: UInt = #line) -> EventLoopPromise { EventLoopPromise( eventLoop: eventLoop, - file: "BUG in SwiftNIO (please report), unleakable promise leaked.", + file: """ + EventLoopGroup shut down with unfulfilled promises remaining. \ + This suggests that the EventLoopGroup was shut down with unfinished work outstanding which is \ + illegal. Either switch to using the singleton EventLoopGroups or fix the issue by only shutting down \ + the EventLoopGroups when all the work associated with them has finished. + """, line: line ) } diff --git a/Sources/NIOFileSystem/CopyStrategy.swift b/Sources/NIOFileSystem/CopyStrategy.swift deleted file mode 100644 index ae8b4770e1..0000000000 --- a/Sources/NIOFileSystem/CopyStrategy.swift +++ /dev/null @@ -1,121 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -/// How to perform copies. Currently only relevant to directory level copies when using -/// ``FileSystemProtocol/copyItem(at:to:strategy:shouldProceedAfterError:shouldCopyItem:)`` or other -/// overloads that use the default behaviour. -public struct CopyStrategy: Hashable, Sendable { - // Avoid exposing to prevent breaking changes - internal enum Wrapped: Hashable, Sendable { - // platformDefault is reified into one of the concrete options below: - - case sequential - // Constraints on this value are enforced only on creation of `CopyStrategy`. The early - // error check is desirable over validating on downstream use. - case parallel(_ maxDescriptors: Int) - } - - internal let wrapped: Wrapped - private init(_ wrapped: Wrapped) { - self.wrapped = wrapped - } - - // These selections are relatively arbitrary but the rationale is as follows: - // - // - Never exceed the default OS limits even if 4 such operations were happening at once. - // - Sufficient to enable significant speed up from parallelism - // - Not wasting effort by pushing contention to the underlying storage device. Further we - // assume an SSD or similar underlying storage tech. Users on spinning rust need to account - // for that themselves anyway. - // - // That said, empirical testing for this has not been performed, suggestions welcome. - // - // Note: for now we model the directory scan as needing two handles because, during the creation - // of the destination directory we hold the handle for a while copying attributes - // a much more complex internal state machine could allow doing two of these if desired - // This may not result in a faster copy though so things are left simple - internal static func determinePlatformDefault() -> Wrapped { - #if os(macOS) || os(Linux) || os(Windows) - // 4 concurrent file copies/directory scans. Avoiding storage system contention is of utmost - // importance. - // - // Testing was performed on an SSD, while copying objects (a dense directory of small files - // and subdirectories of similar shape) to the same volume, totalling 12GB. Results showed - // improvements in elapsed time for (expected) increases in CPU time up to parallel(8). - // Beyond this, the increases in CPU led to only moderate gains. - // - // Anyone tuning this is encouraged to cover worst case scenarios. - return .parallel(8) - #elseif os(iOS) || os(tvOS) || os(watchOS) || os(Android) - // Reduced maximum descriptors in embedded world. This is chosen based on biasing towards - // safety, not empirical testing. - return .parallel(4) - #else - // Safety first. If we do not know what system we run on, we keep it simple. - return .sequential - #endif - } -} - -extension CopyStrategy { - // A copy fundamentally can't work without two descriptors unless you copy - // everything into memory which is infeasible/inefficient for large copies. - private static let minDescriptorsAllowed = 2 - - /// Operate in whatever manner is deemed a reasonable default for the platform. This will limit - /// the maximum file descriptors usage based on reasonable defaults. - /// - /// Current assumptions (which are subject to change): - /// - Only one copy operation would be performed at once - /// - The copy operation is not intended to be the primary activity on the device - public static let platformDefault: Self = Self(Self.determinePlatformDefault()) - - /// The copy is done asynchronously, but only one operation will occur at a time. This is the - /// only way to guarantee only one callback to the `shouldCopyItem` will happen at a time. - public static let sequential: Self = Self(.sequential) - - /// Allow multiple IO operations to run concurrently, including file copies/directory creation and scanning - /// - /// - Parameter maxDescriptors: a conservative limit on the number of concurrently open - /// file descriptors involved in the copy. This number must be >= 2 though, if you are using a value that low - /// you should use ``sequential`` - /// - /// - Throws: ``FileSystemError/Code-swift.struct/invalidArgument`` if `maxDescriptors` - /// is less than 2. - /// - public static func parallel(maxDescriptors: Int) throws -> Self { - guard maxDescriptors >= Self.minDescriptorsAllowed else { - // 2 is not quite the same as sequential, you could have two concurrent directory listings for example - // less than 2 and you can't actually do a _copy_ though so it's non-sensical. - throw FileSystemError( - code: .invalidArgument, - message: "Can't do a copy operation without at least 2 file descriptors '\(maxDescriptors)' is illegal", - cause: nil, - location: .here() - ) - } - return .init(.parallel(maxDescriptors)) - } -} - -extension CopyStrategy: CustomStringConvertible { - public var description: String { - switch self.wrapped { - case .sequential: - return "sequential" - case let .parallel(maxDescriptors): - return "parallel with max \(maxDescriptors) descriptors" - } - } -} diff --git a/Sources/NIOFileSystem/FileSystem.swift b/Sources/NIOFileSystem/FileSystem.swift index 33bf8c605e..8349949254 100644 --- a/Sources/NIOFileSystem/FileSystem.swift +++ b/Sources/NIOFileSystem/FileSystem.swift @@ -366,6 +366,8 @@ public struct FileSystem: Sendable, FileSystemProtocol { } } + /// See ``FileSystemProtocol/removeItem(at:strategy:recursively:)`` + /// /// Deletes the file or directory (and its contents) at `path`. /// /// Only regular files, symbolic links and directories may be removed. If the file at `path` is @@ -387,11 +389,14 @@ public struct FileSystem: Sendable, FileSystemProtocol { /// /// - Parameters: /// - path: The path to delete. + /// - removalStrategy: Whether to delete files sequentially (one-by-one), or perform a + /// concurrent scan of the tree at `path` and delete files when they are found. /// - removeItemRecursively: Whether or not to remove items recursively. /// - Returns: The number of deleted items which may be zero if `path` did not exist. @discardableResult public func removeItem( at path: FilePath, + strategy removalStrategy: RemovalStrategy, recursively removeItemRecursively: Bool ) async throws -> Int { // Try to remove the item: we might just get lucky. @@ -421,39 +426,60 @@ public struct FileSystem: Sendable, FileSystemProtocol { ) } - var (subdirectories, filesRemoved) = try await self.withDirectoryHandle( - atPath: path - ) { directory in - var subdirectories = [FilePath]() - var filesRemoved = 0 + switch removalStrategy.wrapped { + case .sequential: + return try await self.removeItemSequentially(at: path) + case let .parallel(maxDescriptors): + return try await self.removeConcurrently(at: path, maxDescriptors) + } - for try await batch in directory.listContents().batched() { - for entry in batch { - switch entry.type { - case .directory: - subdirectories.append(entry.path) + case let .failure(errno): + throw FileSystemError.remove(errno: errno, path: path, location: .here()) + } + } - default: - filesRemoved += try await self.removeOneItem(at: entry.path) - } + @discardableResult + private func removeItemSequentially( + at path: FilePath + ) async throws -> Int { + var (subdirectories, filesRemoved) = try await self.withDirectoryHandle( + atPath: path + ) { directory in + var subdirectories = [FilePath]() + var filesRemoved = 0 + + for try await batch in directory.listContents().batched() { + for entry in batch { + switch entry.type { + case .directory: + subdirectories.append(entry.path) + + default: + filesRemoved += try await self.removeOneItem(at: entry.path) } } - - return (subdirectories, filesRemoved) } - for subdirectory in subdirectories { - filesRemoved += try await self.removeItem(at: subdirectory) - } + return (subdirectories, filesRemoved) + } - // The directory should be empty now. Remove ourself. - filesRemoved += try await self.removeOneItem(at: path) + for subdirectory in subdirectories { + filesRemoved += try await self.removeItemSequentially(at: subdirectory) + } - return filesRemoved + // The directory should be empty now. Remove ourself. + filesRemoved += try await self.removeOneItem(at: path) - case let .failure(errno): - throw FileSystemError.remove(errno: errno, path: path, location: .here()) - } + return filesRemoved + + } + + private func removeConcurrently( + at path: FilePath, + _ maxDescriptors: Int + ) async throws -> Int { + let bucket: TokenBucket = .init(tokens: maxDescriptors) + return try await self.discoverAndRemoveItemsInTree(at: path, bucket) } /// Moves the named file or directory to a new location. @@ -490,7 +516,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { case .differentLogicalDevices: // Fall back to copy and remove. try await self.copyItem(at: sourcePath, to: destinationPath) - try await self.removeItem(at: sourcePath) + try await self.removeItem(at: sourcePath, strategy: .platformDefault) } } @@ -518,9 +544,9 @@ public struct FileSystem: Sendable, FileSystemProtocol { withItemAt existingPath: FilePath ) async throws { do { - try await self.removeItem(at: destinationPath) + try await self.removeItem(at: destinationPath, strategy: .platformDefault) try await self.moveItem(at: existingPath, to: destinationPath) - try await self.removeItem(at: existingPath) + try await self.removeItem(at: existingPath, strategy: .platformDefault) } catch let error as FileSystemError { throw FileSystemError( message: "Can't replace '\(destinationPath)' with '\(existingPath)'.", diff --git a/Sources/NIOFileSystem/FileSystemProtocol.swift b/Sources/NIOFileSystem/FileSystemProtocol.swift index b5bf701435..934220762c 100644 --- a/Sources/NIOFileSystem/FileSystemProtocol.swift +++ b/Sources/NIOFileSystem/FileSystemProtocol.swift @@ -249,19 +249,23 @@ public protocol FileSystemProtocol: Sendable { /// at the given path then this function returns zero. /// /// If the item at the `path` is a directory and `removeItemRecursively` is `true` then the - /// contents of all of its subdirectories will be removed recursively before the directory - /// at `path`. Symbolic links are removed (but their targets are not deleted). + /// contents of all of its subdirectories will be removed recursively before the directory at + /// `path`. Symbolic links are removed (but their targets are not deleted). /// /// - Parameters: /// - path: The path to delete. - /// - removeItemRecursively: If the item being removed is a directory, remove it by - /// recursively removing its children. Setting this to `true` is synonymous with - /// calling `rm -r`, setting this false is synonymous to calling `rmdir`. Ignored if + /// - removalStrategy: Whether to delete files sequentially (one-by-one), or perform a + /// concurrent scan of the tree at `path` and delete files when they are found. Ignored if /// the item being removed isn't a directory. + /// - removeItemRecursively: If the item being removed is a directory, remove it by + /// recursively removing its children. Setting this to `true` is synonymous with calling + /// `rm -r`, setting this false is synonymous to calling `rmdir`. Ignored if the item + /// being removed isn't a directory. /// - Returns: The number of deleted items which may be zero if `path` did not exist. @discardableResult func removeItem( at path: FilePath, + strategy removalStrategy: RemovalStrategy, recursively removeItemRecursively: Bool ) async throws -> Int @@ -588,9 +592,12 @@ extension FileSystemProtocol { /// The item to be removed must be a regular file, symbolic link or directory. If no file exists /// at the given path then this function returns zero. /// - /// If the item at the `path` is a directory then the contents of all of its subdirectories - /// will be removed recursively before the directory at `path`. Symbolic links are removed (but - /// their targets are not deleted). + /// If the item at the `path` is a directory then the contents of all of its subdirectories will + /// be removed recursively before the directory at `path`. Symbolic links are removed (but their + /// targets are not deleted). + /// + /// The strategy for deletion will be determined automatically depending on the discovered + /// platform. /// /// - Parameters: /// - path: The path to delete. @@ -599,7 +606,56 @@ extension FileSystemProtocol { public func removeItem( at path: FilePath ) async throws -> Int { - try await self.removeItem(at: path, recursively: true) + try await self.removeItem(at: path, strategy: .platformDefault, recursively: true) + } + + /// Deletes the file or directory (and its contents) at `path`. + /// + /// The item to be removed must be a regular file, symbolic link or directory. If no file exists + /// at the given path then this function returns zero. + /// + /// If the item at the `path` is a directory then the contents of all of its subdirectories will + /// be removed recursively before the directory at `path`. Symbolic links are removed (but their + /// targets are not deleted). + /// + /// The strategy for deletion will be determined automatically depending on the discovered + /// platform. + /// + /// - Parameters: + /// - path: The path to delete. + /// - removeItemRecursively: If the item being removed is a directory, remove it by + /// recursively removing its children. Setting this to `true` is synonymous with calling + /// `rm -r`, setting this false is synonymous to calling `rmdir`. Ignored if the item + /// being removed isn't a directory. + /// - Returns: The number of deleted items which may be zero if `path` did not exist. + @discardableResult + public func removeItem( + at path: FilePath, + recursively removeItemRecursively: Bool + ) async throws -> Int { + try await self.removeItem(at: path, strategy: .platformDefault, recursively: removeItemRecursively) + } + + /// Deletes the file or directory (and its contents) at `path`. + /// + /// The item to be removed must be a regular file, symbolic link or directory. If no file exists + /// at the given path then this function returns zero. + /// + /// If the item at the `path` is a directory then the contents of all of its subdirectories will + /// be removed recursively before the directory at `path`. Symbolic links are removed (but their + /// targets are not deleted). + /// + /// - Parameters: + /// - path: The path to delete. + /// - removalStrategy: Whether to delete files sequentially (one-by-one), or perform a + /// concurrent scan of the tree at `path` and delete files when they are found. + /// - Returns: The number of deleted items which may be zero if `path` did not exist. + @discardableResult + public func removeItem( + at path: FilePath, + strategy removalStrategy: RemovalStrategy + ) async throws -> Int { + try await self.removeItem(at: path, strategy: removalStrategy, recursively: true) } /// Create a directory at the given path. @@ -659,7 +715,7 @@ extension FileSystemProtocol { try await execute(handle, directory) } } tearDown: { _ in - try await self.removeItem(at: directory, recursively: true) + try await self.removeItem(at: directory, strategy: .platformDefault, recursively: true) } } } diff --git a/Sources/NIOFileSystem/IOStrategy.swift b/Sources/NIOFileSystem/IOStrategy.swift new file mode 100644 index 0000000000..fcb7fb432a --- /dev/null +++ b/Sources/NIOFileSystem/IOStrategy.swift @@ -0,0 +1,184 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +/// How many file descriptors to open when performing I/O operations. +enum IOStrategy: Hashable, Sendable { + // platformDefault is reified into one of the concrete options below: + case sequential + case parallel(_ maxDescriptors: Int) + + // These selections are relatively arbitrary but the rationale is as follows: + // + // - Never exceed the default OS limits even if 4 such operations were happening at once. + // - Sufficient to enable significant speed up from parallelism + // - Not wasting effort by pushing contention to the underlying storage device. Further we + // assume an SSD or similar underlying storage tech. Users on spinning rust need to account + // for that themselves anyway. + // + // That said, empirical testing for this has not been performed, suggestions welcome. + // + // Note: The directory scan is modelled after a copy strategy needing two handles: during the + // creation of the destination directory we hold the handle while copying attributes. A much + // more complex internal state machine could allow doing two of these if desired. This may not + // result in a faster copy though so things are left simple. + internal static func determinePlatformDefault() -> Self { + #if os(macOS) || os(Linux) || os(Windows) + // Eight file descriptors allow for four concurrent file copies/directory scans. Avoiding + // storage system contention is of utmost importance. + // + // Testing was performed on an SSD, while copying objects (a dense directory of small files + // and subdirectories of similar shape) to the same volume, totalling 12GB. Results showed + // improvements in elapsed time for (expected) increases in CPU time up to parallel(8). + // Beyond this, the increases in CPU led to only moderate gains. + // + // Anyone tuning this is encouraged to cover worst case scenarios. + return .parallel(8) + #elseif os(iOS) || os(tvOS) || os(watchOS) || os(Android) + // Reduced maximum descriptors in embedded world. This is chosen based on biasing towards + // safety, not empirical testing. + return .parallel(4) + #else + // Safety first. If we do not know what system we run on, we keep it simple. + return .sequential + #endif + } +} + +/// How to perform copies. Currently only relevant to directory level copies when using +/// ``FileSystemProtocol/copyItem(at:to:strategy:shouldProceedAfterError:shouldCopyItem:)`` or other +/// overloads that use the default behaviour. +public struct CopyStrategy: Hashable, Sendable { + internal let wrapped: IOStrategy + private init(_ strategy: IOStrategy) { + switch strategy { + case .sequential: + self.wrapped = .sequential + case let .parallel(maxDescriptors): + self.wrapped = .parallel(maxDescriptors) + } + } + + // A copy fundamentally can't work without two descriptors unless you copy everything into + // memory which is infeasible/inefficient for large copies. + private static let minDescriptorsAllowed = 2 + + /// Operate in whatever manner is deemed a reasonable default for the platform. This will limit + /// the maximum file descriptors usage based on reasonable defaults. + /// + /// Current assumptions (which are subject to change): + /// - Only one copy operation would be performed at once + /// - The copy operation is not intended to be the primary activity on the device + public static let platformDefault: Self = Self(IOStrategy.determinePlatformDefault()) + + /// The copy is done asynchronously, but only one operation will occur at a time. This is the + /// only way to guarantee only one callback to the `shouldCopyItem` will happen at a time. + public static let sequential: Self = Self(.sequential) + + /// Allow multiple I/O operations to run concurrently, including file copies/directory creation + /// and scanning. + /// + /// - Parameter maxDescriptors: a conservative limit on the number of concurrently open file + /// descriptors involved in the copy. This number must be >= 2 though, if you are using a + /// value that low you should use ``sequential`` + /// + /// - Throws: ``FileSystemError/Code-swift.struct/invalidArgument`` if `maxDescriptors` is less + /// than 2. + /// + public static func parallel(maxDescriptors: Int) throws -> Self { + guard maxDescriptors >= Self.minDescriptorsAllowed else { + // 2 is not quite the same as sequential, you could have two concurrent directory + // listings for example less than 2 and you can't actually do a _copy_ though so it's + // non-sensical. + throw FileSystemError( + code: .invalidArgument, + message: "Can't do a copy operation without at least 2 file descriptors '\(maxDescriptors)' is illegal", + cause: nil, + location: .here() + ) + } + return .init(.parallel(maxDescriptors)) + } +} + +extension CopyStrategy: CustomStringConvertible { + public var description: String { + switch self.wrapped { + case .sequential: + return "sequential" + case let .parallel(maxDescriptors): + return "parallel with max \(maxDescriptors) descriptors" + } + } +} + +/// How to perform file deletions. Currently only relevant to directory level deletions when using +/// ``FileSystemProtocol/removeItem(at:strategy:recursively:)`` or other overloads that use the +/// default behaviour. +public struct RemovalStrategy: Hashable, Sendable { + internal let wrapped: IOStrategy + private init(_ strategy: IOStrategy) { + switch strategy { + case .sequential: + self.wrapped = .sequential + case let .parallel(maxDescriptors): + self.wrapped = .parallel(maxDescriptors) + } + } + + // A deletion requires no file descriptors. We only consume a file descriptor while scanning the + // contents of a directory, so the minimum is 1. + private static let minRequiredDescriptors = 1 + + /// Operate in whatever manner is deemed a reasonable default for the platform. This will limit + /// the maximum file descriptors usage based on reasonable defaults. + /// + /// Current assumptions (which are subject to change): + /// - Only one delete operation would be performed at once + /// - The delete operation is not intended to be the primary activity on the device + public static let platformDefault: Self = Self(IOStrategy.determinePlatformDefault()) + + /// Traversal of directories and removal of files will be done sequentially without any + /// parallelization. + public static let sequential: Self = Self(.sequential) + + /// Allow for one or more directory scans to run at the same time. Removal of files will happen + /// on asynchronous tasks in parallel. + /// + /// Setting `maxDescriptors` to 1, will limit the speed of directory discovery. Deletion of + /// files within that directory will run in parallel, but discovery of subdirectories will be + /// limited to one at a time. + public static func parallel(maxDescriptors: Int) throws -> Self { + guard maxDescriptors >= Self.minRequiredDescriptors else { + throw FileSystemError( + code: .invalidArgument, + message: + "Can't do a remove operation without at least one file descriptor '\(maxDescriptors)' is illegal", + cause: nil, + location: .here() + ) + } + return .init(.parallel(maxDescriptors)) + } +} + +extension RemovalStrategy: CustomStringConvertible { + public var description: String { + switch self.wrapped { + case .sequential: + return "sequential" + case let .parallel(maxDescriptors): + return "parallel with max \(maxDescriptors) descriptors" + } + } +} diff --git a/Sources/NIOFileSystem/Internal/Concurrency Primitives/TokenBucket.swift b/Sources/NIOFileSystem/Internal/Concurrency Primitives/TokenBucket.swift new file mode 100644 index 0000000000..3a998f5b2a --- /dev/null +++ b/Sources/NIOFileSystem/Internal/Concurrency Primitives/TokenBucket.swift @@ -0,0 +1,88 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See http://swift.org/LICENSE.txt for license information +// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import DequeModule +import NIOConcurrencyHelpers + +/// Type modeled after a "token bucket" pattern, which is similar to a semaphore, but is built with +/// Swift Concurrency primitives. +/// +/// This is an adaptation of the TokenBucket found in Swift Package Manager. +/// Instead of using an ``actor``, we define a class and limit access through +/// ``NIOLock``. +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +final class TokenBucket: @unchecked Sendable { + private var tokens: Int + private var waiters: Deque> + private let lock: NIOLock + + init(tokens: Int) { + precondition(tokens >= 1, "Need at least one token!") + self.tokens = tokens + self.waiters = Deque() + self.lock = NIOLock() + } + + /// Executes an `async` closure immediately when a token is available. + /// Only the same number of closures will be executed concurrently as the number + /// of `tokens` passed to ``TokenBucket/init(tokens:)``, all subsequent + /// invocations of `withToken` will suspend until a "free" token is available. + /// - Parameter body: The closure to invoke when a token is available. + /// - Returns: Resulting value returned by `body`. + func withToken( + _ body: @Sendable () async throws -> ReturnType + ) async rethrows -> ReturnType { + await self.getToken() + defer { self.returnToken() } + return try await body() + } + + private func getToken() async { + self.lock.lock() + if self.tokens > 0 { + self.tokens -= 1 + self.lock.unlock() + return + } + + await withCheckedContinuation { + self.waiters.append($0) + self.lock.unlock() + } + } + + private func returnToken() { + if let waiter = self.lock.withLock({ () -> CheckedContinuation? in + if let nextWaiter = self.waiters.popFirst() { + return nextWaiter + } + + self.tokens += 1 + return nil + }) { + waiter.resume() + } + } +} diff --git a/Sources/NIOFileSystem/Internal/ParallelRemoval.swift b/Sources/NIOFileSystem/Internal/ParallelRemoval.swift new file mode 100644 index 0000000000..cd8b6702d5 --- /dev/null +++ b/Sources/NIOFileSystem/Internal/ParallelRemoval.swift @@ -0,0 +1,72 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +extension FileSystem { + /// Recursively walk all objects found in `path`. Call ourselves recursively + /// on each directory that we find, as soon as the file descriptor for + /// `path` has been closed; also delete all files that we come across. + func discoverAndRemoveItemsInTree( + at path: FilePath, + _ bucket: TokenBucket + ) async throws -> Int { + // Discover current directory and find all files/directories. Free up + // the handle as fast as possible. + let (directoriesToRecurseInto, itemsToDelete) = try await bucket.withToken { + try await self.withDirectoryHandle(atPath: path) { directory in + var subdirectories: [FilePath] = [] + var itemsInDirectory: [FilePath] = [] + + for try await batch in directory.listContents().batched() { + for entry in batch { + switch entry.type { + case .directory: + subdirectories.append(entry.path) + default: + itemsInDirectory.append(entry.path) + } + } + } + + return (subdirectories, itemsInDirectory) + } + } + + return try await withThrowingTaskGroup(of: Int.self) { group in + // Delete all items we found in the current directory. + for item in itemsToDelete { + group.addTask { + try await self.removeOneItem(at: item) + } + } + + // Recurse into all newly found subdirectories. + for directory in directoriesToRecurseInto { + group.addTask { + try await self.discoverAndRemoveItemsInTree(at: directory, bucket) + } + } + + // Await task groups to finish and sum all items deleted so far. + var numberOfDeletedItems = try await group.reduce(0, +) + + // Remove top level directory. + numberOfDeletedItems += try await self.removeOneItem(at: path) + + return numberOfDeletedItems + } + } +} diff --git a/Sources/NIOFileSystem/Internal/System Calls/Syscall.swift b/Sources/NIOFileSystem/Internal/System Calls/Syscall.swift index de53259404..fe0318707a 100644 --- a/Sources/NIOFileSystem/Internal/System Calls/Syscall.swift +++ b/Sources/NIOFileSystem/Internal/System Calls/Syscall.swift @@ -400,14 +400,7 @@ public enum Libc { return valueOrErrno { pathBytes.withUnsafeMutableBufferPointer { pointer in // The array must be terminated with a nil. - #if os(Android) - libc_fts_open( - [pointer.baseAddress!, unsafeBitCast(0, to: UnsafeMutablePointer.self)], - options.rawValue - ) - #else libc_fts_open([pointer.baseAddress, nil], options.rawValue) - #endif } } } diff --git a/Sources/NIOPosix/Bootstrap.swift b/Sources/NIOPosix/Bootstrap.swift index fa117e8576..e3b5710ffe 100644 --- a/Sources/NIOPosix/Bootstrap.swift +++ b/Sources/NIOPosix/Bootstrap.swift @@ -2420,6 +2420,8 @@ extension NIOPipeBootstrap { let channel: PipeChannel let pipeChannelInput: SelectablePipeHandle? let pipeChannelOutput: SelectablePipeHandle? + let hasNoInputPipe: Bool + let hasNoOutputPipe: Bool do { if let input = input { try self.validateFileDescriptorIsNotAFile(input) @@ -2430,6 +2432,8 @@ extension NIOPipeBootstrap { pipeChannelInput = input.flatMap { SelectablePipeHandle(takingOwnershipOfDescriptor: $0) } pipeChannelOutput = output.flatMap { SelectablePipeHandle(takingOwnershipOfDescriptor: $0) } + hasNoInputPipe = pipeChannelInput == nil + hasNoOutputPipe = pipeChannelOutput == nil do { channel = try self.hooks.makePipeChannel( eventLoop: eventLoop as! SelectableEventLoop, @@ -2458,10 +2462,10 @@ extension NIOPipeBootstrap { channel.registerAlreadyConfigured0(promise: promise) return promise.futureResult.map { result } }.flatMap { result -> EventLoopFuture in - if pipeChannelInput == nil { + if hasNoInputPipe { return channel.close(mode: .input).map { result } } - if pipeChannelOutput == nil { + if hasNoOutputPipe { return channel.close(mode: .output).map { result } } return channel.selectableEventLoop.makeSucceededFuture(result) diff --git a/Sources/NIOPosix/MultiThreadedEventLoopGroup.swift b/Sources/NIOPosix/MultiThreadedEventLoopGroup.swift index b92a4bb01b..0361e1a197 100644 --- a/Sources/NIOPosix/MultiThreadedEventLoopGroup.swift +++ b/Sources/NIOPosix/MultiThreadedEventLoopGroup.swift @@ -51,9 +51,6 @@ typealias ThreadInitializer = (NIOThread) -> Void /// all run their own `EventLoop`. Those threads will not be shut down until `shutdownGracefully` or /// `syncShutdownGracefully` is called. /// -/// - Note: It's good style to call `MultiThreadedEventLoopGroup.shutdownGracefully` or -/// `MultiThreadedEventLoopGroup.syncShutdownGracefully` when you no longer need this `EventLoopGroup`. In -/// many cases that is just before your program exits. /// - warning: Unit tests often spawn one `MultiThreadedEventLoopGroup` per unit test to force isolation between the /// tests. In those cases it's important to shut the `MultiThreadedEventLoopGroup` down at the end of the /// test. A good place to start a `MultiThreadedEventLoopGroup` is the `setUp` method of your `XCTestCase` diff --git a/Sources/NIOWebSocket/NIOWebSocketClientUpgrader.swift b/Sources/NIOWebSocket/NIOWebSocketClientUpgrader.swift index a25e69df90..cc5fa16e33 100644 --- a/Sources/NIOWebSocket/NIOWebSocketClientUpgrader.swift +++ b/Sources/NIOWebSocket/NIOWebSocketClientUpgrader.swift @@ -149,7 +149,7 @@ extension NIOWebSocketClientUpgrader { UInt64.random(in: UInt64.min...UInt64.max, using: &generator), UInt64.random(in: UInt64.min...UInt64.max, using: &generator) ) - return String(base64Encoding: buffer.readableBytesView) + return String(_base64Encoding: buffer.readableBytesView) } /// Generates a random WebSocket Request Key by generating 16 bytes randomly using the `SystemRandomNumberGenerator` and encoding them as a base64 string as defined in RFC6455 https://tools.ietf.org/html/rfc6455#section-4.1. /// - Returns: base64 encoded request key @@ -179,7 +179,7 @@ private func _shouldAllowUpgrade(upgradeResponse: HTTPResponseHead, requestKey: var hasher = SHA1() hasher.update(string: requestKey) hasher.update(string: magicWebSocketGUID) - let expectedAcceptValue = String(base64Encoding: hasher.finish()) + let expectedAcceptValue = String(_base64Encoding: hasher.finish()) return expectedAcceptValue == acceptValueHeader[0] } diff --git a/Sources/NIOWebSocket/NIOWebSocketServerUpgrader.swift b/Sources/NIOWebSocket/NIOWebSocketServerUpgrader.swift index 8aa16eec7c..4f888a3e34 100644 --- a/Sources/NIOWebSocket/NIOWebSocketServerUpgrader.swift +++ b/Sources/NIOWebSocket/NIOWebSocketServerUpgrader.swift @@ -15,6 +15,7 @@ import CNIOSHA1 import NIOCore import NIOHTTP1 +import _NIOBase64 let magicWebSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" @@ -303,7 +304,7 @@ private func _buildUpgradeResponse( var hasher = SHA1() hasher.update(string: key) hasher.update(string: magicWebSocketGUID) - acceptValue = String(base64Encoding: hasher.finish()) + acceptValue = String(_base64Encoding: hasher.finish()) } extraHeaders.replaceOrAdd(name: "Upgrade", value: "websocket") diff --git a/Sources/_NIOBase64/Base64.swift b/Sources/_NIOBase64/Base64.swift index dc001f85b4..15bf79e3be 100644 --- a/Sources/_NIOBase64/Base64.swift +++ b/Sources/_NIOBase64/Base64.swift @@ -16,15 +16,27 @@ // https://github.com/fabianfett/swift-base64-kit extension String { - /// Base64 encode a collection of UInt8 to a string, without the use of Foundation. + @available(*, deprecated, message: "This API was unintentionally made public.") @inlinable public init(base64Encoding bytes: Buffer) where Buffer.Element == UInt8 { - self = Base64.encode(bytes: bytes) + self.init(_base64Encoding: bytes) } + @available(*, deprecated, message: "This API was unintentionally made public.") @inlinable public func base64Decoded() throws -> [UInt8] { + try self._base64Decoded() + } + + /// Base64 encode a collection of UInt8 to a string, without the use of Foundation. + @inlinable + public init(_base64Encoding bytes: Buffer) where Buffer.Element == UInt8 { + self = Base64.encode(bytes: bytes) + } + + @inlinable + public func _base64Decoded() throws -> [UInt8] { try Base64.decode(string: self) } } diff --git a/Tests/NIOBase64Tests/Base64Test.swift b/Tests/NIOBase64Tests/Base64Test.swift index dd6262a9d8..5c320706bf 100644 --- a/Tests/NIOBase64Tests/Base64Test.swift +++ b/Tests/NIOBase64Tests/Base64Test.swift @@ -13,59 +13,58 @@ //===----------------------------------------------------------------------===// import XCTest - -@testable import _NIOBase64 +import _NIOBase64 class Base64Test: XCTestCase { func testEncodeEmptyData() throws { let data = [UInt8]() - let encodedData = String(base64Encoding: data) + let encodedData = String(_base64Encoding: data) XCTAssertEqual(encodedData.count, 0) } func testBase64EncodingOfEmptyString() throws { let string = "" - let encoded = String(base64Encoding: string.utf8) + let encoded = String(_base64Encoding: string.utf8) XCTAssertEqual(encoded, "") } func testBase64DecodingOfEmptyString() throws { let encoded = "" XCTAssertNoThrow { - let decoded = try encoded.base64Decoded() + let decoded = try encoded._base64Decoded() XCTAssertEqual(decoded, [UInt8]()) } } func testBase64EncodingArrayOfNulls() throws { let data = Array(repeating: UInt8(0), count: 10) - let encodedData = String(base64Encoding: data) + let encodedData = String(_base64Encoding: data) XCTAssertEqual(encodedData, "AAAAAAAAAAAAAA==") } func testBase64DecodeArrayOfNulls() throws { let encoded = "AAAAAAAAAAAAAA==" - let decoded = try! encoded.base64Decoded() + let decoded = try! encoded._base64Decoded() let expected = Array(repeating: UInt8(0), count: 10) XCTAssertEqual(decoded, expected) } func testBase64EncodeingHelloWorld() throws { let string = "Hello, world!" - let encoded = String(base64Encoding: string.utf8) + let encoded = String(_base64Encoding: string.utf8) let expected = "SGVsbG8sIHdvcmxkIQ==" XCTAssertEqual(encoded, expected) } func testBase64DecodeHelloWorld() throws { let encoded = "SGVsbG8sIHdvcmxkIQ==" - let decoded = try! encoded.base64Decoded() + let decoded = try! encoded._base64Decoded() XCTAssertEqual(decoded, "Hello, world!".utf8.map { UInt8($0) }) } func testBase64EncodingAllTheBytesSequentially() throws { let data = Array(UInt8(0)...UInt8(255)) - let encodedData = String(base64Encoding: data) + let encodedData = String(_base64Encoding: data) XCTAssertEqual( encodedData, "AAECAwQFBgcICQoLDA0ODxAREhMUFRYXGBkaGxwdHh8gISIjJCUmJygpKissLS4vMDEyMzQ1Njc4OTo7PD0+P0BBQkNERUZHSElKS0xNTk9QUVJTVFVWV1hZWltcXV5fYGFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6e3x9fn+AgYKDhIWGh4iJiouMjY6PkJGSk5SVlpeYmZqbnJ2en6ChoqOkpaanqKmqq6ytrq+wsbKztLW2t7i5uru8vb6/wMHCw8TFxsfIycrLzM3Oz9DR0tPU1dbX2Nna29zd3t/g4eLj5OXm5+jp6uvs7e7v8PHy8/T19vf4+fr7/P3+/w==" @@ -74,14 +73,14 @@ class Base64Test: XCTestCase { func testBase64DecodingWithInvalidLength() { let encoded = "dGVzbA!==" - XCTAssertThrowsError(try encoded.base64Decoded()) { error in + XCTAssertThrowsError(try encoded._base64Decoded()) { error in XCTAssertEqual(error as? Base64Error, .invalidLength) } } func testBase64DecodeWithInvalidCharacter() throws { let encoded = "SGVsbG8sI_dvcmxkIQ==" - XCTAssertThrowsError(try encoded.base64Decoded()) { error in + XCTAssertThrowsError(try encoded._base64Decoded()) { error in XCTAssertEqual(error as? Base64Error, .invalidCharacter) } } diff --git a/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift b/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift index 1619f95638..5fa112f6d1 100644 --- a/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift +++ b/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift @@ -21,7 +21,7 @@ import XCTest @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) class AsyncTestingChannelTests: XCTestCase { func testSingleHandlerInit() async throws { - class Handler: ChannelInboundHandler { + final class Handler: ChannelInboundHandler, Sendable { typealias InboundIn = Never } @@ -43,7 +43,7 @@ class AsyncTestingChannelTests: XCTestCase { } func testMultipleHandlerInit() async throws { - class Handler: ChannelInboundHandler, RemovableChannelHandler { + final class Handler: ChannelInboundHandler, RemovableChannelHandler, Sendable { typealias InboundIn = Never let identifier: String @@ -334,7 +334,7 @@ class AsyncTestingChannelTests: XCTestCase { try await XCTAsyncAssertTrue(await channel.finish().isClean) // channelInactive should fire only once. - XCTAssertEqual(inactiveHandler.inactiveNotifications, 1) + XCTAssertEqual(inactiveHandler.inactiveNotifications.load(ordering: .sequentiallyConsistent), 1) } func testEmbeddedLifecycle() async throws { @@ -355,7 +355,7 @@ class AsyncTestingChannelTests: XCTestCase { XCTAssertFalse(channel.isActive) } - private final class ExceptionThrowingInboundHandler: ChannelInboundHandler { + private final class ExceptionThrowingInboundHandler: ChannelInboundHandler, Sendable { typealias InboundIn = String public func channelRead(context: ChannelHandlerContext, data: NIOAny) { @@ -363,7 +363,7 @@ class AsyncTestingChannelTests: XCTestCase { } } - private final class ExceptionThrowingOutboundHandler: ChannelOutboundHandler { + private final class ExceptionThrowingOutboundHandler: ChannelOutboundHandler, Sendable { typealias OutboundIn = String typealias OutboundOut = Never @@ -372,12 +372,12 @@ class AsyncTestingChannelTests: XCTestCase { } } - private final class CloseInChannelInactiveHandler: ChannelInboundHandler { + private final class CloseInChannelInactiveHandler: ChannelInboundHandler, Sendable { typealias InboundIn = ByteBuffer - public var inactiveNotifications = 0 + public let inactiveNotifications = ManagedAtomic(0) public func channelInactive(context: ChannelHandlerContext) { - inactiveNotifications += 1 + inactiveNotifications.wrappingIncrement(by: 1, ordering: .sequentiallyConsistent) context.close(promise: nil) } } diff --git a/Tests/NIOFileSystemIntegrationTests/FileSystemTests.swift b/Tests/NIOFileSystemIntegrationTests/FileSystemTests.swift index 5de5846603..e262147fa2 100644 --- a/Tests/NIOFileSystemIntegrationTests/FileSystemTests.swift +++ b/Tests/NIOFileSystemIntegrationTests/FileSystemTests.swift @@ -222,7 +222,7 @@ final class FileSystemTests: XCTestCase { // Avoid dirtying the current working directory. if path.isRelative { self.addTeardownBlock { [fileSystem = self.fs] in - try await fileSystem.removeItem(at: path) + try await fileSystem.removeItem(at: path, strategy: .platformDefault) } } @@ -351,7 +351,7 @@ final class FileSystemTests: XCTestCase { // Avoid dirtying the current working directory. if directoryPath.isRelative { self.addTeardownBlock { [fileSystem = self.fs] in - try await fileSystem.removeItem(at: directoryPath) + try await fileSystem.removeItem(at: directoryPath, strategy: .platformDefault) } } @@ -399,7 +399,7 @@ final class FileSystemTests: XCTestCase { if directoryPath.isRelative { self.addTeardownBlock { [fileSystem = self.fs] in - try await fileSystem.removeItem(at: directoryPath, recursively: true) + try await fileSystem.removeItem(at: directoryPath, strategy: .platformDefault, recursively: true) } } @@ -586,8 +586,8 @@ final class FileSystemTests: XCTestCase { let sourcePath = try await self.fs.temporaryFilePath() let destPath = try await self.fs.temporaryFilePath() self.addTeardownBlock { - _ = try? await self.fs.removeItem(at: sourcePath) - _ = try? await self.fs.removeItem(at: destPath) + _ = try? await self.fs.removeItem(at: sourcePath, strategy: .platformDefault) + _ = try? await self.fs.removeItem(at: destPath, strategy: .platformDefault) } let sourceInfo = try await self.fs.withFileHandle( @@ -991,7 +991,7 @@ final class FileSystemTests: XCTestCase { let infoAfterCreation = try await self.fs.info(forFileAt: path) XCTAssertNotNil(infoAfterCreation) - let removed = try await self.fs.removeItem(at: path) + let removed = try await self.fs.removeItem(at: path, strategy: .platformDefault) XCTAssertEqual(removed, 1) let infoAfterRemoval = try await self.fs.info(forFileAt: path) @@ -1002,11 +1002,11 @@ final class FileSystemTests: XCTestCase { let path = try await self.fs.temporaryFilePath() let info = try await self.fs.info(forFileAt: path) XCTAssertNil(info) - let removed = try await self.fs.removeItem(at: path) + let removed = try await self.fs.removeItem(at: path, strategy: .platformDefault) XCTAssertEqual(removed, 0) } - func testRemoveDirectory() async throws { + func testRemoveDirectorySequentially() async throws { let path = try await self.fs.temporaryFilePath() let created = try await self.generateDirectoryStructure( root: path, @@ -1019,12 +1019,37 @@ final class FileSystemTests: XCTestCase { // Removing a non-empty directory recursively should throw 'notEmpty' await XCTAssertThrowsFileSystemErrorAsync { - try await self.fs.removeItem(at: path, recursively: false) + try await self.fs.removeItem(at: path, strategy: .sequential, recursively: false) } onError: { error in XCTAssertEqual(error.code, .notEmpty) } - let removed = try await self.fs.removeItem(at: path) + let removed = try await self.fs.removeItem(at: path, strategy: .sequential) + XCTAssertEqual(created, removed) + + let infoAfterRemoval = try await self.fs.info(forFileAt: path) + XCTAssertNil(infoAfterRemoval) + } + + func testRemoveDirectoryConcurrently() async throws { + let path = try await self.fs.temporaryFilePath() + let created = try await self.generateDirectoryStructure( + root: path, + maxDepth: 3, + maxFilesPerDirectory: 10 + ) + + let infoAfterCreation = try await self.fs.info(forFileAt: path) + XCTAssertNotNil(infoAfterCreation) + + // Removing a non-empty directory recursively should throw 'notEmpty' + await XCTAssertThrowsFileSystemErrorAsync { + try await self.fs.removeItem(at: path, strategy: .parallel(maxDescriptors: 2), recursively: false) + } onError: { error in + XCTAssertEqual(error.code, .notEmpty) + } + + let removed = try await self.fs.removeItem(at: path, strategy: .parallel(maxDescriptors: 2)) XCTAssertEqual(created, removed) let infoAfterRemoval = try await self.fs.info(forFileAt: path) @@ -1602,7 +1627,7 @@ extension FileSystemTests { // Clean up after ourselves. self.addTeardownBlock { [fileSystem = self.fs] in - try await fileSystem.removeItem(at: temporaryDirectoryPath) + try await fileSystem.removeItem(at: temporaryDirectoryPath, strategy: .platformDefault) } guard let info = try await self.fs.info(forFileAt: temporaryDirectoryPath) else { @@ -1640,7 +1665,7 @@ extension FileSystemTests { let temporaryDirectoryPath = try await self.fs.createTemporaryDirectory(template: template) self.addTeardownBlock { [fileSystem = self.fs] in - try await fileSystem.removeItem(at: templateRoot, recursively: true) + try await fileSystem.removeItem(at: templateRoot, strategy: .platformDefault, recursively: true) } guard diff --git a/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift index 43cfbb4c6b..97d9df956a 100644 --- a/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift @@ -318,7 +318,7 @@ private func assertPipelineContainsUpgradeHandler(channel: Channel) { @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) class HTTPClientUpgradeTestCase: XCTestCase { func setUpClientChannel( - clientHTTPHandler: RemovableChannelHandler, + clientHTTPHandler: RemovableChannelHandler & Sendable, clientUpgraders: [any TypedAndUntypedHTTPClientProtocolUpgrader], _ upgradeCompletionHandler: @escaping (ChannelHandlerContext) -> Void ) throws -> EmbeddedChannel { @@ -1063,7 +1063,7 @@ class HTTPClientUpgradeTestCase: XCTestCase { @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) final class TypedHTTPClientUpgradeTestCase: HTTPClientUpgradeTestCase { override func setUpClientChannel( - clientHTTPHandler: RemovableChannelHandler, + clientHTTPHandler: RemovableChannelHandler & Sendable, clientUpgraders: [any TypedAndUntypedHTTPClientProtocolUpgrader], _ upgradeCompletionHandler: @escaping (ChannelHandlerContext) -> Void ) throws -> EmbeddedChannel { diff --git a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift index 3a69f806e3..9049c5122d 100644 --- a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift @@ -1388,7 +1388,7 @@ class HTTPServerUpgradeTestCase: XCTestCase { XCTAssertNil(upgradeRequest.wrappedValue) upgradeHandlerCbFired.wrappedValue = true - _ = context.channel.pipeline.addHandler( + try! context.channel.pipeline.syncOperations.addHandler( CheckWeReadInlineAndExtraData( firstByteDonePromise: firstByteDonePromise, secondByteDonePromise: secondByteDonePromise, @@ -2145,7 +2145,7 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { XCTAssertNotNil(upgradeRequest.wrappedValue) upgradeHandlerCbFired.wrappedValue = true - _ = context.channel.pipeline.addHandler( + try! context.channel.pipeline.syncOperations.addHandler( CheckWeReadInlineAndExtraData( firstByteDonePromise: firstByteDonePromise, secondByteDonePromise: secondByteDonePromise, diff --git a/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift b/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift index 847a5ac782..68604ef0f4 100644 --- a/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift +++ b/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift @@ -284,7 +284,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: 0 ) { channel in channel.eventLoop.makeCompletedFuture { - try self.configureProtocolNegotiationHandlers(channel: channel) + try Self.configureProtocolNegotiationHandlers(channel: channel) } } @@ -366,7 +366,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: 0 ) { channel in channel.eventLoop.makeCompletedFuture { - try self.configureNestedProtocolNegotiationHandlers(channel: channel) + try Self.configureNestedProtocolNegotiationHandlers(channel: channel) } } @@ -508,7 +508,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: 0 ) { channel in channel.eventLoop.makeCompletedFuture { - try self.configureProtocolNegotiationHandlers(channel: channel) + try Self.configureProtocolNegotiationHandlers(channel: channel) } } @@ -958,7 +958,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { output: pipe2WriteFD ) { channel in channel.eventLoop.makeCompletedFuture { - try self.configureProtocolNegotiationHandlers(channel: channel) + try Self.configureProtocolNegotiationHandlers(channel: channel) } } } catch { @@ -1251,7 +1251,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { try channel.pipeline.syncOperations.addHandler( AddressedEnvelopingHandler(remoteAddress: SocketAddress(ipAddress: "127.0.0.1", port: 0)) ) - return try self.configureProtocolNegotiationHandlers( + return try Self.configureProtocolNegotiationHandlers( channel: channel, proposedALPN: nil, inboundID: 1, @@ -1275,7 +1275,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { try channel.pipeline.syncOperations.addHandler( AddressedEnvelopingHandler(remoteAddress: SocketAddress(ipAddress: "127.0.0.1", port: 0)) ) - return try self.configureProtocolNegotiationHandlers( + return try Self.configureProtocolNegotiationHandlers( channel: channel, proposedALPN: proposedALPN, inboundID: 2, @@ -1329,7 +1329,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { to: .init(ipAddress: "127.0.0.1", port: port) ) { channel in channel.eventLoop.makeCompletedFuture { - try self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN) + try Self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN) } } } @@ -1345,7 +1345,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { to: .init(ipAddress: "127.0.0.1", port: port) ) { channel in channel.eventLoop.makeCompletedFuture { - try self.configureNestedProtocolNegotiationHandlers( + try Self.configureNestedProtocolNegotiationHandlers( channel: channel, proposedOuterALPN: proposedOuterALPN, proposedInnerALPN: proposedInnerALPN @@ -1382,7 +1382,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { ) { channel in channel.eventLoop.makeCompletedFuture { try channel.pipeline.syncOperations.addHandler(AddressedEnvelopingHandler()) - return try self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN) + return try Self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN) } } } @@ -1418,13 +1418,13 @@ final class AsyncChannelBootstrapTests: XCTestCase { ) { channel in channel.eventLoop.makeCompletedFuture { try channel.pipeline.syncOperations.addHandler(AddressedEnvelopingHandler()) - return try self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN) + return try Self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN) } } } @discardableResult - private func configureProtocolNegotiationHandlers( + private static func configureProtocolNegotiationHandlers( channel: Channel, proposedALPN: TLSUserEventHandler.ALPN? = nil, inboundID: UInt8? = nil, @@ -1437,7 +1437,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } @discardableResult - private func configureNestedProtocolNegotiationHandlers( + private static func configureNestedProtocolNegotiationHandlers( channel: Channel, proposedOuterALPN: TLSUserEventHandler.ALPN? = nil, proposedInnerALPN: TLSUserEventHandler.ALPN? = nil @@ -1456,7 +1456,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { try channel.pipeline.syncOperations.addHandler( TLSUserEventHandler(proposedALPN: proposedInnerALPN) ) - let negotiationFuture = try self.addTypedApplicationProtocolNegotiationHandler(to: channel) + let negotiationFuture = try Self.addTypedApplicationProtocolNegotiationHandler(to: channel) return negotiationFuture } @@ -1465,7 +1465,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { try channel.pipeline.syncOperations.addHandler( TLSUserEventHandler(proposedALPN: proposedInnerALPN) ) - let negotiationHandler = try self.addTypedApplicationProtocolNegotiationHandler(to: channel) + let negotiationHandler = try Self.addTypedApplicationProtocolNegotiationHandler(to: channel) return negotiationHandler } @@ -1481,7 +1481,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } @discardableResult - private func addTypedApplicationProtocolNegotiationHandler( + private static func addTypedApplicationProtocolNegotiationHandler( to channel: Channel ) throws -> EventLoopFuture { let negotiationHandler = NIOTypedApplicationProtocolNegotiationHandler { diff --git a/Tests/NIOPosixTests/NIOScheduledCallbackTests.swift b/Tests/NIOPosixTests/NIOScheduledCallbackTests.swift index b115ac2f8f..9940b74e12 100644 --- a/Tests/NIOPosixTests/NIOScheduledCallbackTests.swift +++ b/Tests/NIOPosixTests/NIOScheduledCallbackTests.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import Atomics import NIOCore import NIOEmbedded import NIOPosix @@ -26,9 +27,6 @@ protocol ScheduledCallbackTestRequirements { // ELG-backed ELs need to be shutdown via the ELG. func shutdownEventLoop() async throws - - // This is here for NIOAsyncTestingEventLoop only. - func maybeInContext(_ body: @escaping @Sendable () throws -> R) async throws -> R } final class MTELGScheduledCallbackTests: _BaseScheduledCallbackTests { @@ -43,10 +41,6 @@ final class MTELGScheduledCallbackTests: _BaseScheduledCallbackTests { func shutdownEventLoop() async throws { try await self.group.shutdownGracefully() } - - func maybeInContext(_ body: @escaping @Sendable () throws -> R) async throws -> R { - try body() - } } override func setUp() async throws { @@ -66,10 +60,6 @@ final class NIOAsyncTestingEventLoopScheduledCallbackTests: _BaseScheduledCallba func shutdownEventLoop() async throws { await self._loop.shutdownGracefully() } - - func maybeInContext(_ body: @escaping @Sendable () throws -> R) async throws -> R { - try await self._loop.executeInContext(body) - } } override func setUp() async throws { @@ -98,10 +88,6 @@ extension _BaseScheduledCallbackTests { func shutdownEventLoop() async throws { try await self.requirements.shutdownEventLoop() } - - func maybeInContext(_ body: @escaping @Sendable () throws -> R) async throws -> R { - try await self.requirements.maybeInContext(body) - } } // The tests, abstracted over any of the event loops. @@ -111,10 +97,10 @@ extension _BaseScheduledCallbackTests { let handler = MockScheduledCallbackHandler() _ = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 0) } + handler.assert(callbackCount: 0, cancelCount: 0) try await self.advanceTime(by: .microseconds(1)) - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 0) } + handler.assert(callbackCount: 0, cancelCount: 0) } func testSheduledCallbackExecutedAtDeadline() async throws { @@ -123,7 +109,7 @@ extension _BaseScheduledCallbackTests { _ = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) try await self.advanceTime(by: .milliseconds(1)) try await handler.waitForCallback(timeout: .seconds(1)) - try await self.maybeInContext { handler.assert(callbackCount: 1, cancelCount: 0) } + handler.assert(callbackCount: 1, cancelCount: 0) } func testMultipleSheduledCallbacksUsingSameHandler() async throws { @@ -135,7 +121,7 @@ extension _BaseScheduledCallbackTests { try await self.advanceTime(by: .milliseconds(1)) try await handler.waitForCallback(timeout: .seconds(1)) try await handler.waitForCallback(timeout: .seconds(1)) - try await self.maybeInContext { handler.assert(callbackCount: 2, cancelCount: 0) } + handler.assert(callbackCount: 2, cancelCount: 0) _ = try self.loop.scheduleCallback(in: .milliseconds(2), handler: handler) _ = try self.loop.scheduleCallback(in: .milliseconds(3), handler: handler) @@ -143,7 +129,7 @@ extension _BaseScheduledCallbackTests { try await self.advanceTime(by: .milliseconds(3)) try await handler.waitForCallback(timeout: .seconds(1)) try await handler.waitForCallback(timeout: .seconds(1)) - try await self.maybeInContext { handler.assert(callbackCount: 4, cancelCount: 0) } + handler.assert(callbackCount: 4, cancelCount: 0) } func testMultipleSheduledCallbacksUsingDifferentHandlers() async throws { @@ -156,8 +142,8 @@ extension _BaseScheduledCallbackTests { try await self.advanceTime(by: .milliseconds(1)) try await handlerA.waitForCallback(timeout: .seconds(1)) try await handlerB.waitForCallback(timeout: .seconds(1)) - try await self.maybeInContext { handlerA.assert(callbackCount: 1, cancelCount: 0) } - try await self.maybeInContext { handlerB.assert(callbackCount: 1, cancelCount: 0) } + handlerA.assert(callbackCount: 1, cancelCount: 0) + handlerB.assert(callbackCount: 1, cancelCount: 0) } func testCancelExecutesCancellationCallback() async throws { @@ -165,7 +151,7 @@ extension _BaseScheduledCallbackTests { let scheduledCallback = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) scheduledCallback.cancel() - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 1) } + handler.assert(callbackCount: 0, cancelCount: 1) } func testCancelAfterDeadlineDoesNotExecutesCancellationCallback() async throws { @@ -175,7 +161,7 @@ extension _BaseScheduledCallbackTests { try await self.advanceTime(by: .milliseconds(1)) try await handler.waitForCallback(timeout: .seconds(1)) scheduledCallback.cancel() - try await self.maybeInContext { handler.assert(callbackCount: 1, cancelCount: 0) } + handler.assert(callbackCount: 1, cancelCount: 0) } func testCancelAfterCancelDoesNotCallCancellationCallbackAgain() async throws { @@ -184,7 +170,7 @@ extension _BaseScheduledCallbackTests { let scheduledCallback = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) scheduledCallback.cancel() scheduledCallback.cancel() - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 1) } + handler.assert(callbackCount: 0, cancelCount: 1) } func testCancelAfterShutdownDoesNotCallCancellationCallbackAgain() async throws { @@ -192,10 +178,10 @@ extension _BaseScheduledCallbackTests { let scheduledCallback = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) try await self.shutdownEventLoop() - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 1) } + handler.assert(callbackCount: 0, cancelCount: 1) scheduledCallback.cancel() - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 1) } + handler.assert(callbackCount: 0, cancelCount: 1) } func testShutdownCancelsOutstandingScheduledCallbacks() async throws { @@ -203,7 +189,7 @@ extension _BaseScheduledCallbackTests { _ = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) try await self.shutdownEventLoop() - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 1) } + handler.assert(callbackCount: 0, cancelCount: 1) } func testShutdownDoesNotCancelCancelledCallbacksAgain() async throws { @@ -211,10 +197,10 @@ extension _BaseScheduledCallbackTests { let handle = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) handle.cancel() - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 1) } + handler.assert(callbackCount: 0, cancelCount: 1) try await self.shutdownEventLoop() - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 1) } + handler.assert(callbackCount: 0, cancelCount: 1) } func testShutdownDoesNotCancelPastCallbacks() async throws { @@ -223,16 +209,16 @@ extension _BaseScheduledCallbackTests { _ = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) try await self.advanceTime(by: .milliseconds(1)) try await handler.waitForCallback(timeout: .seconds(1)) - try await self.maybeInContext { handler.assert(callbackCount: 1, cancelCount: 0) } + handler.assert(callbackCount: 1, cancelCount: 0) try await self.shutdownEventLoop() - try await self.maybeInContext { handler.assert(callbackCount: 1, cancelCount: 0) } + handler.assert(callbackCount: 1, cancelCount: 0) } } -private final class MockScheduledCallbackHandler: NIOScheduledCallbackHandler { - var callbackCount = 0 - var cancelCount = 0 +private final class MockScheduledCallbackHandler: NIOScheduledCallbackHandler, Sendable { + let callbackCount = ManagedAtomic(0) + let cancelCount = ManagedAtomic(0) let callbackStream: AsyncStream private let callbackStreamContinuation: AsyncStream.Continuation @@ -246,17 +232,29 @@ private final class MockScheduledCallbackHandler: NIOScheduledCallbackHandler { } func handleScheduledCallback(eventLoop: some EventLoop) { - self.callbackCount += 1 + self.callbackCount.wrappingIncrement(by: 1, ordering: .sequentiallyConsistent) self.callbackStreamContinuation.yield() } func didCancelScheduledCallback(eventLoop: some EventLoop) { - self.cancelCount += 1 + self.cancelCount.wrappingIncrement(by: 1, ordering: .sequentiallyConsistent) } func assert(callbackCount: Int, cancelCount: Int, file: StaticString = #file, line: UInt = #line) { - XCTAssertEqual(self.callbackCount, callbackCount, "Unexpected callback count", file: file, line: line) - XCTAssertEqual(self.cancelCount, cancelCount, "Unexpected cancel count", file: file, line: line) + XCTAssertEqual( + self.callbackCount.load(ordering: .sequentiallyConsistent), + callbackCount, + "Unexpected callback count", + file: file, + line: line + ) + XCTAssertEqual( + self.cancelCount.load(ordering: .sequentiallyConsistent), + cancelCount, + "Unexpected cancel count", + file: file, + line: line + ) } func waitForCallback(timeout: TimeAmount, file: StaticString = #file, line: UInt = #line) async throws { diff --git a/Tests/NIOPosixTests/RawSocketBootstrapTests.swift b/Tests/NIOPosixTests/RawSocketBootstrapTests.swift index a65ad1ae91..a7c42c3c82 100644 --- a/Tests/NIOPosixTests/RawSocketBootstrapTests.swift +++ b/Tests/NIOPosixTests/RawSocketBootstrapTests.swift @@ -47,8 +47,13 @@ final class RawSocketBootstrapTests: XCTestCase { let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } let channel = try NIORawSocketBootstrap(group: elg) - .channelInitializer { - $0.pipeline.addHandler(DatagramReadRecorder(), name: "ByteReadRecorder") + .channelInitializer { channel in + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + DatagramReadRecorder(), + name: "ByteReadRecorder" + ) + } } .bind(host: "127.0.0.1", ipProtocol: .reservedForTesting).wait() defer { XCTAssertNoThrow(try channel.close().wait()) } @@ -93,15 +98,25 @@ final class RawSocketBootstrapTests: XCTestCase { let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } let readChannel = try NIORawSocketBootstrap(group: elg) - .channelInitializer { - $0.pipeline.addHandler(DatagramReadRecorder(), name: "ByteReadRecorder") + .channelInitializer { channel in + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + DatagramReadRecorder(), + name: "ByteReadRecorder" + ) + } } .bind(host: "127.0.0.1", ipProtocol: .reservedForTesting).wait() defer { XCTAssertNoThrow(try readChannel.close().wait()) } let writeChannel = try NIORawSocketBootstrap(group: elg) - .channelInitializer { - $0.pipeline.addHandler(DatagramReadRecorder(), name: "ByteReadRecorder") + .channelInitializer { channel in + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + DatagramReadRecorder(), + name: "ByteReadRecorder" + ) + } } .bind(host: "127.0.0.1", ipProtocol: .reservedForTesting).wait() defer { XCTAssertNoThrow(try writeChannel.close().wait()) } @@ -147,8 +162,13 @@ final class RawSocketBootstrapTests: XCTestCase { defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } let channel = try NIORawSocketBootstrap(group: elg) .channelOption(.ipOption(.ip_hdrincl), value: 1) - .channelInitializer { - $0.pipeline.addHandler(DatagramReadRecorder(), name: "ByteReadRecorder") + .channelInitializer { channel in + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + DatagramReadRecorder(), + name: "ByteReadRecorder" + ) + } } .bind(host: "127.0.0.1", ipProtocol: .reservedForTesting).wait() defer { XCTAssertNoThrow(try channel.close().wait()) } diff --git a/Tests/NIOPosixTests/SocketChannelTest.swift b/Tests/NIOPosixTests/SocketChannelTest.swift index cb8161a7e5..a42eefb0fc 100644 --- a/Tests/NIOPosixTests/SocketChannelTest.swift +++ b/Tests/NIOPosixTests/SocketChannelTest.swift @@ -1138,11 +1138,12 @@ class DropAllReadsOnTheFloorHandler: ChannelDuplexHandler { // What we're trying to do here is forcing a close without calling `close`. We know that the other side of // the connection is fully closed but because we support half-closure, we need to write to 'learn' that the // other side has actually fully closed the socket. + let promise = self.waitUntilWriteFailedPromise func writeUntilError() { context.writeAndFlush(Self.wrapOutboundOut(buffer)).map { writeUntilError() }.whenFailure { (_: Error) in - self.waitUntilWriteFailedPromise.succeed(()) + promise.succeed(()) } } writeUntilError() diff --git a/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift b/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift index 8b671fce63..9d729c5532 100644 --- a/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift +++ b/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift @@ -83,7 +83,13 @@ class UniversalBootstrapSupportTest: XCTestCase { let client = try NIOClientTCPBootstrap(ClientBootstrap(group: group), tls: DummyTLSProvider()) .channelInitializer { channel in - channel.pipeline.addHandlers(counter1, DropChannelReadsHandler(), counter2) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandlers( + counter1, + DropChannelReadsHandler(), + counter2 + ) + } } .channelOption(.autoRead, value: false) .connectTimeout(.hours(1)) diff --git a/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift b/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift index 23202c2c4b..71a86bdc1e 100644 --- a/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift +++ b/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift @@ -170,7 +170,9 @@ class WebSocketClientEndToEndTests: XCTestCase { let basicUpgrader = NIOWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -244,7 +246,9 @@ class WebSocketClientEndToEndTests: XCTestCase { let basicUpgrader = NIOWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -275,7 +279,9 @@ class WebSocketClientEndToEndTests: XCTestCase { let basicUpgrader = NIOWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -307,7 +313,9 @@ class WebSocketClientEndToEndTests: XCTestCase { let basicUpgrader = NIOWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -332,13 +340,12 @@ class WebSocketClientEndToEndTests: XCTestCase { } fileprivate func runSuccessfulUpgrade() throws -> (EmbeddedChannel, WebSocketRecorderHandler) { - - let handler = WebSocketRecorderHandler() - let basicUpgrader = NIOWebSocketClientUpgrader( requestKey: "OfS0wDaT5NoxF2gqm7Zj2YtetzM=", upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(handler) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -361,6 +368,10 @@ class WebSocketClientEndToEndTests: XCTestCase { clientChannel.embeddedEventLoop.run() + // Ok, now grab the handler. We can do this with sync operations, because this is an + // EmbeddedChannel. + let handler = try clientChannel.pipeline.syncOperations.handler(type: WebSocketRecorderHandler.self) + return (clientChannel, handler) } @@ -501,7 +512,9 @@ final class TypedWebSocketClientEndToEndTests: WebSocketClientEndToEndTests { let basicUpgrader = NIOTypedWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -570,7 +583,9 @@ final class TypedWebSocketClientEndToEndTests: WebSocketClientEndToEndTests { let basicUpgrader = NIOTypedWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -603,7 +618,9 @@ final class TypedWebSocketClientEndToEndTests: WebSocketClientEndToEndTests { let basicUpgrader = NIOTypedWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -637,7 +654,9 @@ final class TypedWebSocketClientEndToEndTests: WebSocketClientEndToEndTests { let basicUpgrader = NIOTypedWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -665,12 +684,12 @@ final class TypedWebSocketClientEndToEndTests: WebSocketClientEndToEndTests { } override fileprivate func runSuccessfulUpgrade() throws -> (EmbeddedChannel, WebSocketRecorderHandler) { - let handler = WebSocketRecorderHandler() - let basicUpgrader = NIOTypedWebSocketClientUpgrader( requestKey: "OfS0wDaT5NoxF2gqm7Zj2YtetzM=", upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(handler) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -695,6 +714,10 @@ final class TypedWebSocketClientEndToEndTests: WebSocketClientEndToEndTests { try upgradeResult.wait() + // Ok, now grab the handler. We can do this with sync operations, because this is an + // EmbeddedChannel. + let handler = try clientChannel.pipeline.syncOperations.handler(type: WebSocketRecorderHandler.self) + return (clientChannel, handler) } } diff --git a/scripts/generate_matrix.sh b/scripts/generate_matrix.sh index dbab4f69a3..c7915b4783 100755 --- a/scripts/generate_matrix.sh +++ b/scripts/generate_matrix.sh @@ -52,7 +52,7 @@ windows_nightly_main_runner="windows-2019" windows_nightly_main_container_image="swiftlang/swift:nightly-main-windowsservercore-1809" # Create matrix from inputs -matrix='{"swift": []}' +matrix='{"config": []}' ## Linux if [[ "$linux_5_9_enabled" == "true" || "$linux_5_10_enabled" == "true" || "$linux_6_0_enabled" == "true" || \ @@ -70,7 +70,7 @@ if [[ "$linux_5_9_enabled" == "true" ]]; then --arg command_arguments "$linux_5_9_command_arguments" \ --arg container_image "$linux_5_9_container_image" \ --arg runner "$linux_runner" \ - '.swift[.swift| length] |= . + { "name": "5.9", "image": $container_image, "swift_version": "5.9", "platform": "Linux", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner}') + '.config[.config| length] |= . + { "name": "5.9", "image": $container_image, "swift_version": "5.9", "platform": "Linux", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner}') fi if [[ "$linux_5_10_enabled" == "true" ]]; then @@ -80,7 +80,7 @@ if [[ "$linux_5_10_enabled" == "true" ]]; then --arg command_arguments "$linux_5_10_command_arguments" \ --arg container_image "$linux_5_10_container_image" \ --arg runner "$linux_runner" \ - '.swift[.swift| length] |= . + { "name": "5.10", "image": $container_image, "swift_version": "5.10", "platform": "Linux", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner}') + '.config[.config| length] |= . + { "name": "5.10", "image": $container_image, "swift_version": "5.10", "platform": "Linux", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner}') fi if [[ "$linux_6_0_enabled" == "true" ]]; then @@ -90,7 +90,7 @@ if [[ "$linux_6_0_enabled" == "true" ]]; then --arg command_arguments "$linux_6_0_command_arguments" \ --arg container_image "$linux_6_0_container_image" \ --arg runner "$linux_runner" \ - '.swift[.swift| length] |= . + { "name": "6.0", "image": $container_image, "swift_version": "6.0", "platform": "Linux", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner}') + '.config[.config| length] |= . + { "name": "6.0", "image": $container_image, "swift_version": "6.0", "platform": "Linux", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner}') fi if [[ "$linux_nightly_6_0_enabled" == "true" ]]; then @@ -100,7 +100,7 @@ if [[ "$linux_nightly_6_0_enabled" == "true" ]]; then --arg command_arguments "$linux_nightly_6_0_command_arguments" \ --arg container_image "$linux_nightly_6_0_container_image" \ --arg runner "$linux_runner" \ - '.swift[.swift| length] |= . + { "name": "nightly-6.0", "image": $container_image, "swift_version": "nightly-6.0", "platform": "Linux", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner}') + '.config[.config| length] |= . + { "name": "nightly-6.0", "image": $container_image, "swift_version": "nightly-6.0", "platform": "Linux", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner}') fi if [[ "$linux_nightly_main_enabled" == "true" ]]; then @@ -110,7 +110,7 @@ if [[ "$linux_nightly_main_enabled" == "true" ]]; then --arg command_arguments "$linux_nightly_main_command_arguments" \ --arg container_image "$linux_nightly_main_container_image" \ --arg runner "$linux_runner" \ - '.swift[.swift| length] |= . + { "name": "nightly-main", "image": $container_image, "swift_version": "nightly-main", "platform": "Linux", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner}') + '.config[.config| length] |= . + { "name": "nightly-main", "image": $container_image, "swift_version": "nightly-main", "platform": "Linux", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner}') fi ## Windows @@ -127,7 +127,7 @@ if [[ "$windows_6_0_enabled" == "true" ]]; then --arg command_arguments "$windows_6_0_command_arguments" \ --arg container_image "$windows_6_0_container_image" \ --arg runner "$windows_6_0_runner" \ - '.swift[.swift| length] |= . + { "name": "6.0", "image": $container_image, "swift_version": "6.0", "platform": "Windows", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner }') + '.config[.config| length] |= . + { "name": "6.0", "image": $container_image, "swift_version": "6.0", "platform": "Windows", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner }') fi if [[ "$windows_nightly_6_0_enabled" == "true" ]]; then @@ -137,7 +137,7 @@ if [[ "$windows_nightly_6_0_enabled" == "true" ]]; then --arg command_arguments "$windows_nightly_6_0_command_arguments" \ --arg container_image "$windows_nightly_6_0_container_image" \ --arg runner "$windows_nightly_6_0_runner" \ - '.swift[.swift| length] |= . + { "name": "nightly-6.0", "image": $container_image, "swift_version": "nightly-6.0", "platform": "Windows", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner }') + '.config[.config| length] |= . + { "name": "nightly-6.0", "image": $container_image, "swift_version": "nightly-6.0", "platform": "Windows", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner }') fi if [[ "$windows_nightly_main_enabled" == "true" ]]; then @@ -147,7 +147,7 @@ if [[ "$windows_nightly_main_enabled" == "true" ]]; then --arg command_arguments "$windows_nightly_main_command_arguments" \ --arg container_image "$windows_nightly_main_container_image" \ --arg runner "$windows_nightly_main_runner" \ - '.swift[.swift| length] |= . + { "name": "nightly-main", "image": $container_image, "swift_version": "nightly-main", "platform": "Windows", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner }') + '.config[.config| length] |= . + { "name": "nightly-main", "image": $container_image, "swift_version": "nightly-main", "platform": "Windows", "command": $command, "command_arguments": $command_arguments, "setup_command": $setup_command, "runner": $runner }') fi echo "$matrix" | jq -c \ No newline at end of file