Skip to content

Commit

Permalink
Add Stream All group messages (#258)
Browse files Browse the repository at this point in the history
* add all the stream all messaging methods

* bump the pod spec

* add tests for all the new streaming methods
  • Loading branch information
nplasterer authored Feb 21, 2024
1 parent 587e13c commit fcd7b81
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 7 deletions.
95 changes: 93 additions & 2 deletions Sources/XMTPiOS/Conversations.swift
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public actor Conversations {
return messages
}

public func streamAllMessages() async throws -> AsyncThrowingStream<DecodedMessage, Error> {
func streamAllV2Messages() async throws -> AsyncThrowingStream<DecodedMessage, Error> {
return AsyncThrowingStream { continuation in
Task {
while true {
Expand Down Expand Up @@ -278,8 +278,99 @@ public actor Conversations {
}
}
}

public func streamAllGroupMessages() -> AsyncThrowingStream<DecodedMessage, Error> {
AsyncThrowingStream { continuation in
Task {
do {
self.streamHolder.stream = try await self.client.v3Client?.conversations().streamAllMessages(
messageCallback: MessageCallback(client: self.client) { message in
do {
continuation.yield(try message.fromFFI(client: self.client))
} catch {
print("Error onMessage \(error)")
}
}
)
} catch {
print("STREAM ERR: \(error)")
}
}
}
}

public func streamAllMessages(includeGroups: Bool = false) async throws -> AsyncThrowingStream<DecodedMessage, Error> {
AsyncThrowingStream<DecodedMessage, Error> { continuation in
@Sendable func forwardStreamToMerged(stream: AsyncThrowingStream<DecodedMessage, Error>) async {
do {
var iterator = stream.makeAsyncIterator()
while let element = try await iterator.next() {
continuation.yield(element)
}
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}

Task {
await forwardStreamToMerged(stream: try streamAllV2Messages())
}
if (includeGroups) {
Task {
await forwardStreamToMerged(stream: streamAllGroupMessages())
}
}
}
}

public func streamAllGroupDecryptedMessages() -> AsyncThrowingStream<DecryptedMessage, Error> {
AsyncThrowingStream { continuation in
Task {
do {
self.streamHolder.stream = try await self.client.v3Client?.conversations().streamAllMessages(
messageCallback: MessageCallback(client: self.client) { message in
do {
continuation.yield(try message.fromFFIDecrypted(client: self.client))
} catch {
print("Error onMessage \(error)")
}
}
)
} catch {
print("STREAM ERR: \(error)")
}
}
}
}

public func streamAllDecryptedMessages(includeGroups: Bool = false) -> AsyncThrowingStream<DecryptedMessage, Error> {
AsyncThrowingStream<DecryptedMessage, Error> { continuation in
@Sendable func forwardStreamToMerged(stream: AsyncThrowingStream<DecryptedMessage, Error>) async {
do {
var iterator = stream.makeAsyncIterator()
while let element = try await iterator.next() {
continuation.yield(element)
}
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}

Task {
await forwardStreamToMerged(stream: try streamAllV2DecryptedMessages())
}
if (includeGroups) {
Task {
await forwardStreamToMerged(stream: streamAllGroupDecryptedMessages())
}
}
}
}


public func streamAllDecryptedMessages() async throws -> AsyncThrowingStream<DecryptedMessage, Error> {
func streamAllV2DecryptedMessages() async throws -> AsyncThrowingStream<DecryptedMessage, Error> {
return AsyncThrowingStream { continuation in
Task {
while true {
Expand Down
68 changes: 64 additions & 4 deletions Tests/XMTPTests/GroupTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ class GroupTests: XCTestCase {
let fixtures = try await localFixtures()

let expectation1 = expectation(description: "got a conversation")
expectation1.expectedFulfillmentCount = 2

Task(priority: .userInitiated) {
for try await _ in try await fixtures.aliceClient.conversations.streamAll() {
Expand All @@ -402,23 +403,82 @@ class GroupTests: XCTestCase {
}

_ = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address])
// _ = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address)
_ = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address)

await waitForExpectations(timeout: 3)
}

func testCanStreamGroupsAndConversationsWorksConvos() async throws {
func testCanStreamAllMessages() async throws {
let fixtures = try await localFixtures()

let expectation1 = expectation(description: "got a conversation")
expectation1.expectedFulfillmentCount = 2
let convo = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address)
let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address])
try await fixtures.aliceClient.conversations.sync()
Task(priority: .userInitiated) {
for try await _ in try await fixtures.aliceClient.conversations.streamAllMessages(includeGroups: true) {
expectation1.fulfill()
}
}

try await group.send(content: "hi")
try await convo.send(content: "hi")

await waitForExpectations(timeout: 3)
}

func testCanStreamAllDecryptedMessages() async throws {
let fixtures = try await localFixtures()

let expectation1 = expectation(description: "got a conversation")
expectation1.expectedFulfillmentCount = 2
let convo = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address)
let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address])
try await fixtures.aliceClient.conversations.sync()
Task(priority: .userInitiated) {
for try await _ in try await fixtures.aliceClient.conversations.streamAll() {
for try await _ in try await fixtures.aliceClient.conversations.streamAllDecryptedMessages(includeGroups: true) {
expectation1.fulfill()
}
}

_ = try await fixtures.bobClient.conversations.newConversation(with: fixtures.alice.address)
try await group.send(content: "hi")
try await convo.send(content: "hi")

await waitForExpectations(timeout: 3)
}

func testCanStreamAllGroupMessages() async throws {
let fixtures = try await localFixtures()

let expectation1 = expectation(description: "got a conversation")

let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address])
try await fixtures.aliceClient.conversations.sync()
Task(priority: .userInitiated) {
for try await _ in try await fixtures.aliceClient.conversations.streamAllGroupMessages() {
expectation1.fulfill()
}
}

try await group.send(content: "hi")

await waitForExpectations(timeout: 3)
}

func testCanStreamAllGroupDecryptedMessages() async throws {
let fixtures = try await localFixtures()

let expectation1 = expectation(description: "got a conversation")
let group = try await fixtures.bobClient.conversations.newGroup(with: [fixtures.alice.address])
try await fixtures.aliceClient.conversations.sync()
Task(priority: .userInitiated) {
for try await _ in try await fixtures.aliceClient.conversations.streamAllGroupDecryptedMessages() {
expectation1.fulfill()
}
}

try await group.send(content: "hi")

await waitForExpectations(timeout: 3)
}
Expand Down
2 changes: 1 addition & 1 deletion XMTP.podspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Pod::Spec.new do |spec|
#

spec.name = "XMTP"
spec.version = "0.8.9"
spec.version = "0.8.10"
spec.summary = "XMTP SDK Cocoapod"

# This description is used to generate tags and improve search results.
Expand Down

0 comments on commit fcd7b81

Please sign in to comment.