Skip to content

Commit

Permalink
Fix: Stream All Streams All (#249)
Browse files Browse the repository at this point in the history
* try and fix the stream all method

* add tests for the streaming

* bump all the pods
  • Loading branch information
nplasterer authored Feb 16, 2024
1 parent 757da65 commit 10bc7ca
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ let package = Package(
.package(url: "https://github.com/1024jp/GzipSwift", from: "5.2.0"),
.package(url: "https://github.com/bufbuild/connect-swift", exact: "0.3.0"),
.package(url: "https://github.com/apple/swift-docc-plugin.git", from: "1.0.0"),
.package(url: "https://github.com/xmtp/libxmtp-swift", exact: "0.4.1-beta3"),
.package(url: "https://github.com/xmtp/libxmtp-swift", exact: "0.4.2-beta1"),
],
targets: [
// Targets are the basic building blocks of a package. A target can define a module or a test suite.
Expand Down
41 changes: 24 additions & 17 deletions Sources/XMTPiOS/Conversations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -409,23 +409,30 @@ public actor Conversations {
}
}
}

public func streamAll() -> AsyncThrowingStream<Conversation, Error> {
AsyncThrowingStream<Conversation, Error> { continuation in
Task {
do {
for try await conversation in streamGroupConversations() {
continuation.yield(conversation)
}
for try await conversation in stream() {
continuation.yield(conversation)
}
} catch {
continuation.finish(throwing: error)
}
}
}
}

public func streamAll() -> AsyncThrowingStream<Conversation, Error> {
AsyncThrowingStream<Conversation, Error> { continuation in
@Sendable func forwardStreamToMerged(stream: AsyncThrowingStream<Conversation, Error>) async {
do {
var iterator = stream.makeAsyncIterator()
while let element = try await iterator.next() {
continuation.yield(element)
}
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}

Task {
await forwardStreamToMerged(stream: stream())
}
Task {
await forwardStreamToMerged(stream: streamGroupConversations())
}
}
}

private func makeConversation(from sealedInvitation: SealedInvitation) throws -> ConversationV2 {
let unsealed = try sealedInvitation.v1.getInvitation(viewer: client.keys)
let conversation = try ConversationV2.create(client: client, invitation: unsealed, header: sealedInvitation.v1.header)
Expand Down
49 changes: 49 additions & 0 deletions Tests/XMTPTests/GroupTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -306,4 +306,53 @@ class GroupTests: XCTestCase {
XCTAssertEqual("sup gang", String(data: Data(aliceMessage.encodedContent.content), encoding: .utf8))
XCTAssertEqual("sup gang", String(data: Data(bobMessage.encodedContent.content), encoding: .utf8))
}

func testCanStreamGroups() async throws {
let fixtures = try await localFixtures()

let expectation1 = expectation(description: "got a group")

Task(priority: .userInitiated) {
for try await _ in try await fixtures.aliceClient.conversations.streamGroups() {
expectation1.fulfill()
}
}

try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address])

await waitForExpectations(timeout: 3)
}

func testCanStreamGroupsAndConversationsWorksGroups() async throws {
let fixtures = try await localFixtures()

let expectation1 = expectation(description: "got a conversation")

Task(priority: .userInitiated) {
for try await _ in try await fixtures.aliceClient.conversations.streamAll() {
expectation1.fulfill()
}
}

_ = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address])
// _ = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address)

await waitForExpectations(timeout: 3)
}

func testCanStreamGroupsAndConversationsWorksConvos() async throws {
let fixtures = try await localFixtures()

let expectation1 = expectation(description: "got a conversation")

Task(priority: .userInitiated) {
for try await _ in try await fixtures.aliceClient.conversations.streamAll() {
expectation1.fulfill()
}
}

_ = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address)

await waitForExpectations(timeout: 3)
}
}
4 changes: 2 additions & 2 deletions XMTP.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Pod::Spec.new do |spec|
#

spec.name = "XMTP"
spec.version = "0.8.5"
spec.version = "0.8.6"
spec.summary = "XMTP SDK Cocoapod"

# This description is used to generate tags and improve search results.
Expand Down Expand Up @@ -44,5 +44,5 @@ Pod::Spec.new do |spec|
spec.dependency "web3.swift"
spec.dependency "GzipSwift"
spec.dependency "Connect-Swift", "= 0.3.0"
spec.dependency 'LibXMTP', '= 0.4.1-beta3'
spec.dependency 'LibXMTP', '= 0.4.2-beta1'
end

0 comments on commit 10bc7ca

Please sign in to comment.