Skip to content

Commit

Permalink
fix: FoundationStreamBridge fixes from main branch (#708)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbelkins authored May 2, 2024
1 parent 64e66f3 commit 8a5b010
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@

#if os(iOS) || os(macOS) || os(watchOS) || os(tvOS) || os(visionOS)

import func Foundation.CFWriteStreamSetDispatchQueue
import class Foundation.DispatchQueue
import func Foundation.autoreleasepool
import class Foundation.NSObject
import class Foundation.Stream
import class Foundation.InputStream
import class Foundation.OutputStream
import class Foundation.Thread
import class Foundation.RunLoop
import class Foundation.Timer
import struct Foundation.TimeInterval
import protocol Foundation.StreamDelegate

/// Reads data from a smithy-swift native `ReadableStream` and streams the data to a Foundation `InputStream`.
Expand All @@ -39,35 +41,41 @@ class FoundationStreamBridge: NSObject, StreamDelegate {
/// A Foundation `OutputStream` that will read from the `ReadableStream`
private let outputStream: OutputStream

/// Actor used to isolate the stream status from multiple concurrent accesses.
actor ReadableStreamStatus {
/// A Logger for logging events.
private let logger: LogAgent

/// Actor used to ensure writes are performed in series.
actor WriteCoordinator {
var task: Task<Void, Error>?

/// `true` if the readable stream has been found to be empty, `false` otherwise. Will flip to `true` if the readable stream is read,
/// and `nil` is returned.
var isEmpty = false
var readableStreamIsEmpty = false

/// Sets stream status to indicate the stream is empty.
func setIsEmpty() async {
isEmpty = true
func setReadableStreamIsEmpty() async {
readableStreamIsEmpty = true
}
}

/// Actor used to isolate the stream status from multiple concurrent accesses.
private var readableStreamStatus = ReadableStreamStatus()
/// Creates a new concurrent Task that executes the passed block, ensuring that the previous Task
/// finishes before this task starts.
///
/// Acts as a sort of "serial queue" of Swift concurrency tasks.
/// - Parameter block: The code to be performed in this task.
func perform(_ block: @escaping @Sendable (WriteCoordinator) async throws -> Void) {
self.task = Task { [task] in
_ = await task?.result
try await block(self)
}
}
}

/// A shared serial DispatchQueue to run the `perform`-on-thread operations.
/// Performing thread operations on an async queue allows Swift concurrency tasks to not block.
private static let queue = DispatchQueue(label: "AWSFoundationStreamBridge")
/// Actor used to enforce the order of multiple concurrent stream writes.
private let writeCoordinator = WriteCoordinator()

/// Foundation Streams require a run loop on which to post callbacks for their delegates.
/// All stream operations should be performed on the same thread as the delegate callbacks.
/// A single shared `Thread` is started and is used to host the RunLoop for all Foundation Stream callbacks.
private static let thread: Thread = {
let thread = Thread { autoreleasepool { RunLoop.current.run() } }
thread.name = "AWSFoundationStreamBridge"
thread.start()
return thread
}()
/// A shared serial DispatchQueue to run the stream operations.
/// Performing operations on an async queue allows Swift concurrency tasks to not block.
private let queue = DispatchQueue(label: "AWSFoundationStreamBridge")

// MARK: - init & deinit

Expand All @@ -78,8 +86,8 @@ class FoundationStreamBridge: NSObject, StreamDelegate {
/// - Parameters:
/// - readableStream: The `ReadableStream` that serves as the input to the bridge.
/// - bufferSize: The number of bytes in the in-memory buffer. The buffer is allocated for this size no matter if in use or not.
/// Defaults to 4096 bytes.
init(readableStream: ReadableStream, bufferSize: Int = 4096) {
/// Defaults to 65536 bytes.
init(readableStream: ReadableStream, bufferSize: Int = 65_536, logger: LogAgent) {
var inputStream: InputStream?
var outputStream: OutputStream?

Expand All @@ -98,77 +106,73 @@ class FoundationStreamBridge: NSObject, StreamDelegate {
self.readableStream = readableStream
self.inputStream = inputStream
self.outputStream = outputStream
self.logger = logger

// The stream is configured to deliver its callbacks on the dispatch queue.
// This precludes the need for a Thread with RunLoop.
CFWriteStreamSetDispatchQueue(outputStream, queue)
}

// MARK: - Opening & closing

/// Schedule the output stream on the special thread reserved for stream callbacks.
/// Schedule the output stream on the queue for stream callbacks.
/// Do not wait to complete opening before returning.
func open() async {
await withCheckedContinuation { continuation in
Self.queue.async {
self.perform(#selector(self.openOnThread), on: Self.thread, with: nil, waitUntilDone: false)
queue.async {
self.outputStream.delegate = self
self.outputStream.open()
continuation.resume()
}
continuation.resume()
}
}

/// Configure the output stream to make StreamDelegate callback to this bridge using the special thread / run loop, and open the output stream.
/// The input stream is not included here. It will be configured by `URLSession` when the HTTP request is initiated.
@objc private func openOnThread() {
outputStream.delegate = self
outputStream.schedule(in: RunLoop.current, forMode: .default)
outputStream.open()
}

/// Unschedule the output stream on the special stream callback thread.
/// Do not wait to complete closing before returning.
func close() async {
await withCheckedContinuation { continuation in
Self.queue.async {
self.perform(#selector(self.closeOnThread), on: Self.thread, with: nil, waitUntilDone: false)
queue.async {
self.outputStream.close()
self.outputStream.delegate = nil
continuation.resume()
}
continuation.resume()
}
}

/// Close the output stream and remove it from the thread / run loop.
@objc private func closeOnThread() {
outputStream.close()
outputStream.remove(from: RunLoop.current, forMode: .default)
outputStream.delegate = nil
}

// MARK: - Writing to bridge

/// Tries to read from the readable stream if possible, then transfer the data to the output stream.
private func writeToOutput() async throws {
var data = Data()
if await !readableStreamStatus.isEmpty {
if let newData = try await readableStream.readAsync(upToCount: bufferSize) {
data = newData
} else {
await readableStreamStatus.setIsEmpty()
await close()
await writeCoordinator.perform { [self] writeCoordinator in
var data = Data()
if await !writeCoordinator.readableStreamIsEmpty {
if let newData = try await readableStream.readAsync(upToCount: bufferSize) {
data = newData
} else {
await writeCoordinator.setReadableStreamIsEmpty()
await close()
}
}
try await writeToOutputStream(data: data)
}
try await writeToOutputStream(data: data)
}

private class WriteToOutputStreamResult: NSObject {
var data = Data()
var error: Error?
}

/// Write the passed data to the output stream, using the reserved thread.
private func writeToOutputStream(data: Data) async throws {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
Self.queue.async {
let result = WriteToOutputStreamResult()
result.data = data
let selector = #selector(self.writeToOutputStreamOnThread)
self.perform(selector, on: Self.thread, with: result, waitUntilDone: true)
if let error = result.error {
queue.async { [self] in
guard !buffer.isEmpty || !data.isEmpty else { continuation.resume(); return }
buffer.append(data)
var writeCount = 0
buffer.withUnsafeBytes { bufferPtr in
guard let bytePtr = bufferPtr.bindMemory(to: UInt8.self).baseAddress else { return }
writeCount = outputStream.write(bytePtr, maxLength: buffer.count)
}
if writeCount > 0 {
logger.info("FoundationStreamBridge: wrote \(writeCount) bytes to request body")
buffer.removeFirst(writeCount)
}
if let error = outputStream.streamError {
continuation.resume(throwing: error)
} else {
continuation.resume()
Expand All @@ -177,34 +181,28 @@ class FoundationStreamBridge: NSObject, StreamDelegate {
}
}

/// Append the new data to the buffer, then write to the output stream. Return any error to the caller using the param object.
@objc private func writeToOutputStreamOnThread(_ result: WriteToOutputStreamResult) {
guard !buffer.isEmpty || !result.data.isEmpty else { return }
buffer.append(result.data)
var writeCount = 0
buffer.withUnsafeBytes { bufferPtr in
let bytePtr = bufferPtr.bindMemory(to: UInt8.self).baseAddress!
writeCount = outputStream.write(bytePtr, maxLength: buffer.count)
}
if writeCount > 0 {
buffer.removeFirst(writeCount)
}
result.error = outputStream.streamError
}

// MARK: - StreamDelegate protocol

/// The stream places this callback when appropriate. Call will be delivered on the special thread / run loop for stream callbacks.
/// The stream places this callback when appropriate. Call will be delivered on the GCD queue for stream callbacks.
/// `.hasSpaceAvailable` prompts this type to query the readable stream for more data.
@objc func stream(_ aStream: Foundation.Stream, handle eventCode: Foundation.Stream.Event) {
switch eventCode {
case .openCompleted:
break
case .hasBytesAvailable:
break
case .hasSpaceAvailable:
// Since space is available, try and read from the ReadableStream and
// transfer the data to the Foundation stream pair.
// Use a `Task` to perform the operation within Swift concurrency.
Task { try await writeToOutput() }
default:
case .errorOccurred:
logger.info("FoundationStreamBridge: .errorOccurred event")
logger.info("FoundationStreamBridge: Stream error: \(String(describing: aStream.streamError))")
case .endEncountered:
break
default:
logger.info("FoundationStreamBridge: Other stream event occurred: \(eventCode)")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public final class URLSessionHTTPClient: HTTPClient {

// If needed, create a stream bridge that streams data from a SDK stream to a Foundation InputStream
// that URLSession can stream its request body from.
let streamBridge = requestStream.map { FoundationStreamBridge(readableStream: $0, bufferSize: 4096) }
let streamBridge = requestStream.map { FoundationStreamBridge(readableStream: $0, bufferSize: 4096, logger: logger) }

// Create the request (with a streaming body when needed.)
let urlRequest = self.makeURLRequest(from: request, httpBodyStream: streamBridge?.inputStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class FoundationStreamBridgeTests: XCTestCase {

// Create a stream bridge with our original data & open it
let bufferedStream = BufferedStream(data: originalData, isClosed: true)
let subject = FoundationStreamBridge(readableStream: bufferedStream, bufferSize: bufferSize)
let subject = FoundationStreamBridge(readableStream: bufferedStream, bufferSize: bufferSize, logger: TestLogger())
await subject.open()

// This will hold the data that is bridged from the ReadableStream to the Foundation InputStream
Expand Down

0 comments on commit 8a5b010

Please sign in to comment.