From d60ea4bd57b115754f9ad57fc01e1f64407359f5 Mon Sep 17 00:00:00 2001 From: Pat Nakajima Date: Sat, 1 Apr 2023 14:17:19 -0700 Subject: [PATCH 1/9] Add support for ephemeral messages --- Sources/XMTP/Conversation.swift | 18 ++++++++++++++ Sources/XMTP/ConversationV2.swift | 26 +++++++++++++++++--- Tests/XMTPTests/IntegrationTests.swift | 34 ++++++++++++++++++++++++++ dev/local/docker-compose.yml | 13 +++++----- 4 files changed, 82 insertions(+), 9 deletions(-) diff --git a/Sources/XMTP/Conversation.swift b/Sources/XMTP/Conversation.swift index a0943088..2e826ebf 100644 --- a/Sources/XMTP/Conversation.swift +++ b/Sources/XMTP/Conversation.swift @@ -135,6 +135,24 @@ public enum Conversation { } } + public func streamEphemeral() -> AsyncThrowingStream? { + switch self { + case .v1(_): + return nil + case let .v2(conversation): + return conversation.streamEphemeral() + } + } + + @discardableResult public func sendEphemeral(content: T, options: SendOptions? = nil) async throws -> String { + switch self { + case .v1(_): + throw ConversationError.v1NotSupported("ephemeral messages not supported for v1 conversations") + case let .v2(conversationV2): + return try await conversationV2.send(content: content, options: options, ephemeral: true) + } + } + /// Returns a stream you can iterate through to receive new messages in this conversation. /// /// > Note: All messages in the conversation are returned by this stream. If you want to filter out messages diff --git a/Sources/XMTP/ConversationV2.swift b/Sources/XMTP/ConversationV2.swift index a6524cb8..c6cd6134 100644 --- a/Sources/XMTP/ConversationV2.swift +++ b/Sources/XMTP/ConversationV2.swift @@ -73,7 +73,7 @@ public struct ConversationV2 { ConversationV2Container(topic: topic, keyMaterial: keyMaterial, conversationID: context.conversationID, metadata: context.metadata, peerAddress: peerAddress, header: header) } - func prepareMessage(content: T, options: SendOptions?) async throws -> PreparedMessage { + func prepareMessage(content: T, options: SendOptions?, ephemeral: Bool = false) async throws -> PreparedMessage { let codec = Client.codecRegistry.find(for: options?.contentType) func encode(codec: Codec, content: Any) throws -> EncodedContent { @@ -91,6 +91,8 @@ public struct ConversationV2 { encoded = try encoded.compress(compression) } + let topic = ephemeral ? ephemeralTopic : topic + let message = try await MessageV2.encode( client: client, content: encoded, @@ -119,6 +121,24 @@ public struct ConversationV2 { } } + var ephemeralTopic: String { + topic.replacingOccurrences(of: "/xmtp/0/m", with: "/xmtp/0/mE") + } + + public func streamEphemeral() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + Task { + do { + for try await envelope in client.subscribe(topics: [ephemeralTopic]) { + continuation.yield(envelope) + } + } catch { + continuation.finish(throwing: error) + } + } + } + } + public func streamMessages() -> AsyncThrowingStream { AsyncThrowingStream { continuation in Task { @@ -148,8 +168,8 @@ public struct ConversationV2 { try MessageV2.decode(message, keyMaterial: keyMaterial) } - @discardableResult func send(content: T, options: SendOptions? = nil) async throws -> String { - let preparedMessage = try await prepareMessage(content: content, options: options) + @discardableResult func send(content: T, options: SendOptions? = nil, ephemeral: Bool = false) async throws -> String { + let preparedMessage = try await prepareMessage(content: content, options: options, ephemeral: true) try await preparedMessage.send() return preparedMessage.messageID } diff --git a/Tests/XMTPTests/IntegrationTests.swift b/Tests/XMTPTests/IntegrationTests.swift index 2f46efa6..8f84eb7d 100644 --- a/Tests/XMTPTests/IntegrationTests.swift +++ b/Tests/XMTPTests/IntegrationTests.swift @@ -282,6 +282,40 @@ final class IntegrationTests: XCTestCase { await waitForExpectations(timeout: 3) } + func testStreamEphemeralInV2Conversation() async throws { + let alice = try PrivateKey.generate() + let bob = try PrivateKey.generate() + + let clientOptions = ClientOptions(api: .init(env: .local, isSecure: false)) + let aliceClient = try await Client.create(account: alice, options: clientOptions) + let bobClient = try await Client.create(account: bob, options: clientOptions) + + let aliceConversation = try await aliceClient.conversations.newConversation(with: bob.walletAddress, context: .init(conversationID: "https://example.com/3")) + + let expectation = expectation(description: "bob gets a streamed message") + + guard case let .v2(bobConversation) = try await + bobClient.conversations.newConversation(with: alice.walletAddress, context: .init(conversationID: "https://example.com/3")) else { + XCTFail("Did not create v2 convo") + return + } + + XCTAssertEqual(bobConversation.topic, aliceConversation.topic) + + Task(priority: .userInitiated) { + for try await _ in bobConversation.streamEphemeral() { + expectation.fulfill() + } + } + + try await aliceConversation.sendEphemeral(content: "hi") + + let messages = try await aliceConversation.messages() + XCTAssertEqual(0, messages.count) + + await waitForExpectations(timeout: 3) + } + func testCanPaginateV1Messages() async throws { throw XCTSkip("integration only (requires local node)") diff --git a/dev/local/docker-compose.yml b/dev/local/docker-compose.yml index ebd2112a..6241b5f4 100644 --- a/dev/local/docker-compose.yml +++ b/dev/local/docker-compose.yml @@ -8,6 +8,7 @@ services: - --ws - --store - --message-db-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable + - --message-db-reader-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable - --lightpush - --filter - --ws-port=9001 @@ -28,9 +29,9 @@ services: image: postgres:13 environment: POSTGRES_PASSWORD: xmtp - js: - restart: always - depends_on: - wakunode: - condition: service_healthy - build: ./dev/test + # js: + # restart: always + # depends_on: + # wakunode: + # condition: service_healthy + # build: ./dev/test From 549599835611993b769cf73ab00cace31fa51127 Mon Sep 17 00:00:00 2001 From: Pat Nakajima Date: Sat, 1 Apr 2023 14:17:33 -0700 Subject: [PATCH 2/9] Format --- Sources/XMTP/Conversation.swift | 4 ++-- Sources/XMTP/ConversationV2.swift | 2 +- Tests/XMTPTests/IntegrationTests.swift | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Sources/XMTP/Conversation.swift b/Sources/XMTP/Conversation.swift index 2e826ebf..7405e2f4 100644 --- a/Sources/XMTP/Conversation.swift +++ b/Sources/XMTP/Conversation.swift @@ -137,7 +137,7 @@ public enum Conversation { public func streamEphemeral() -> AsyncThrowingStream? { switch self { - case .v1(_): + case .v1: return nil case let .v2(conversation): return conversation.streamEphemeral() @@ -146,7 +146,7 @@ public enum Conversation { @discardableResult public func sendEphemeral(content: T, options: SendOptions? = nil) async throws -> String { switch self { - case .v1(_): + case .v1: throw ConversationError.v1NotSupported("ephemeral messages not supported for v1 conversations") case let .v2(conversationV2): return try await conversationV2.send(content: content, options: options, ephemeral: true) diff --git a/Sources/XMTP/ConversationV2.swift b/Sources/XMTP/ConversationV2.swift index c6cd6134..9792f985 100644 --- a/Sources/XMTP/ConversationV2.swift +++ b/Sources/XMTP/ConversationV2.swift @@ -168,7 +168,7 @@ public struct ConversationV2 { try MessageV2.decode(message, keyMaterial: keyMaterial) } - @discardableResult func send(content: T, options: SendOptions? = nil, ephemeral: Bool = false) async throws -> String { + @discardableResult func send(content: T, options: SendOptions? = nil, ephemeral _: Bool = false) async throws -> String { let preparedMessage = try await prepareMessage(content: content, options: options, ephemeral: true) try await preparedMessage.send() return preparedMessage.messageID diff --git a/Tests/XMTPTests/IntegrationTests.swift b/Tests/XMTPTests/IntegrationTests.swift index 8f84eb7d..bc8140b1 100644 --- a/Tests/XMTPTests/IntegrationTests.swift +++ b/Tests/XMTPTests/IntegrationTests.swift @@ -295,7 +295,8 @@ final class IntegrationTests: XCTestCase { let expectation = expectation(description: "bob gets a streamed message") guard case let .v2(bobConversation) = try await - bobClient.conversations.newConversation(with: alice.walletAddress, context: .init(conversationID: "https://example.com/3")) else { + bobClient.conversations.newConversation(with: alice.walletAddress, context: .init(conversationID: "https://example.com/3")) + else { XCTFail("Did not create v2 convo") return } From c784e4f2d80bb035d9f2e75faa4651baae904d1c Mon Sep 17 00:00:00 2001 From: Pat Nakajima Date: Sat, 1 Apr 2023 16:03:04 -0700 Subject: [PATCH 3/9] Add clientAddress to conversation --- Sources/XMTP/Conversation.swift | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Sources/XMTP/Conversation.swift b/Sources/XMTP/Conversation.swift index 7405e2f4..8d628a7e 100644 --- a/Sources/XMTP/Conversation.swift +++ b/Sources/XMTP/Conversation.swift @@ -125,6 +125,10 @@ public enum Conversation { } } + public var clientAddress: String { + return client.address + } + /// The topic identifier for this conversation public var topic: String { switch self { From 55c6c8051501071ebcf30631bf636155bd5c3601 Mon Sep 17 00:00:00 2001 From: Pat Nakajima Date: Sun, 2 Apr 2023 12:47:51 -0700 Subject: [PATCH 4/9] Skip integration test normally --- Tests/XMTPTests/IntegrationTests.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Tests/XMTPTests/IntegrationTests.swift b/Tests/XMTPTests/IntegrationTests.swift index bc8140b1..9c3cd6ca 100644 --- a/Tests/XMTPTests/IntegrationTests.swift +++ b/Tests/XMTPTests/IntegrationTests.swift @@ -283,6 +283,8 @@ final class IntegrationTests: XCTestCase { } func testStreamEphemeralInV2Conversation() async throws { + throw XCTSkip("integration only (requires local node)") + let alice = try PrivateKey.generate() let bob = try PrivateKey.generate() From 302c81b8b265fe9c39f61fa4553fd1c877a53d64 Mon Sep 17 00:00:00 2001 From: Pat Nakajima Date: Sun, 2 Apr 2023 12:48:07 -0700 Subject: [PATCH 5/9] Don't always send ephemeral --- Sources/XMTP/ConversationV2.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/XMTP/ConversationV2.swift b/Sources/XMTP/ConversationV2.swift index 9792f985..68751522 100644 --- a/Sources/XMTP/ConversationV2.swift +++ b/Sources/XMTP/ConversationV2.swift @@ -168,8 +168,8 @@ public struct ConversationV2 { try MessageV2.decode(message, keyMaterial: keyMaterial) } - @discardableResult func send(content: T, options: SendOptions? = nil, ephemeral _: Bool = false) async throws -> String { - let preparedMessage = try await prepareMessage(content: content, options: options, ephemeral: true) + @discardableResult func send(content: T, options: SendOptions? = nil, ephemeral: Bool = false) async throws -> String { + let preparedMessage = try await prepareMessage(content: content, options: options, ephemeral: ephemeral) try await preparedMessage.send() return preparedMessage.messageID } From 97842e75537d6a2b1064b6e0969e5fa1b99c0f37 Mon Sep 17 00:00:00 2001 From: Pat Nakajima Date: Tue, 4 Apr 2023 17:57:59 -0700 Subject: [PATCH 6/9] Add support for conversation v1 ephemeral topics Also bring API in line with JS sdk --- Sources/XMTP/Conversation.swift | 9 -------- Sources/XMTP/ConversationV1.swift | 29 +++++++++++++++++++++-- Sources/XMTP/ConversationV2.swift | 4 ++-- Sources/XMTP/SendOptions.swift | 4 +++- Tests/XMTPTests/IntegrationTests.swift | 32 +++++++++++++++++++++++++- 5 files changed, 63 insertions(+), 15 deletions(-) diff --git a/Sources/XMTP/Conversation.swift b/Sources/XMTP/Conversation.swift index 8d628a7e..938e5ae9 100644 --- a/Sources/XMTP/Conversation.swift +++ b/Sources/XMTP/Conversation.swift @@ -148,15 +148,6 @@ public enum Conversation { } } - @discardableResult public func sendEphemeral(content: T, options: SendOptions? = nil) async throws -> String { - switch self { - case .v1: - throw ConversationError.v1NotSupported("ephemeral messages not supported for v1 conversations") - case let .v2(conversationV2): - return try await conversationV2.send(content: content, options: options, ephemeral: true) - } - } - /// Returns a stream you can iterate through to receive new messages in this conversation. /// /// > Note: All messages in the conversation are returned by this stream. If you want to filter out messages diff --git a/Sources/XMTP/ConversationV1.swift b/Sources/XMTP/ConversationV1.swift index 02885caa..6537492d 100644 --- a/Sources/XMTP/ConversationV1.swift +++ b/Sources/XMTP/ConversationV1.swift @@ -76,8 +76,15 @@ public struct ConversationV1 { timestamp: date ) + let isEphemeral: Bool + if let options, options.ephemeral { + isEphemeral = true + } else { + isEphemeral = false + } + let messageEnvelope = Envelope( - topic: .directMessageV1(client.address, peerAddress), + topic: isEphemeral ? ephemeralTopic : topic.description, timestamp: date, message: try Message(v1: message).serializedData() ) @@ -85,7 +92,7 @@ public struct ConversationV1 { return PreparedMessage(messageEnvelope: messageEnvelope, conversation: .v1(self)) { var envelopes = [messageEnvelope] - if client.contacts.needsIntroduction(peerAddress) { + if client.contacts.needsIntroduction(peerAddress) && !isEphemeral { envelopes.append(contentsOf: [ Envelope( topic: .userIntro(peerAddress), @@ -133,6 +140,24 @@ public struct ConversationV1 { } } + var ephemeralTopic: String { + topic.description.replacingOccurrences(of: "/xmtp/0/dm-", with: "/xmtp/0/dmE-") + } + + public func streamEphemeral() -> AsyncThrowingStream { + AsyncThrowingStream { continuation in + Task { + do { + for try await envelope in client.subscribe(topics: [ephemeralTopic]) { + continuation.yield(envelope) + } + } catch { + continuation.finish(throwing: error) + } + } + } + } + func messages(limit: Int? = nil, before: Date? = nil, after: Date? = nil) async throws -> [DecodedMessage] { let pagination = Pagination(limit: limit, startTime: before, endTime: after) diff --git a/Sources/XMTP/ConversationV2.swift b/Sources/XMTP/ConversationV2.swift index 68751522..d77d6e2f 100644 --- a/Sources/XMTP/ConversationV2.swift +++ b/Sources/XMTP/ConversationV2.swift @@ -168,8 +168,8 @@ public struct ConversationV2 { try MessageV2.decode(message, keyMaterial: keyMaterial) } - @discardableResult func send(content: T, options: SendOptions? = nil, ephemeral: Bool = false) async throws -> String { - let preparedMessage = try await prepareMessage(content: content, options: options, ephemeral: ephemeral) + @discardableResult func send(content: T, options: SendOptions? = nil, ephemeral _: Bool = false) async throws -> String { + let preparedMessage = try await prepareMessage(content: content, options: options, ephemeral: options?.ephemeral == true) try await preparedMessage.send() return preparedMessage.messageID } diff --git a/Sources/XMTP/SendOptions.swift b/Sources/XMTP/SendOptions.swift index b1c7cc1f..49f8171c 100644 --- a/Sources/XMTP/SendOptions.swift +++ b/Sources/XMTP/SendOptions.swift @@ -11,10 +11,12 @@ public struct SendOptions { public var compression: EncodedContentCompression? public var contentType: ContentTypeID? public var contentFallback: String? + public var ephemeral: Bool = false - public init(compression: EncodedContentCompression? = nil, contentType: ContentTypeID? = nil, contentFallback: String? = nil) { + public init(compression: EncodedContentCompression? = nil, contentType: ContentTypeID? = nil, contentFallback: String? = nil, ephemeral: Bool = false) { self.compression = compression self.contentType = contentType self.contentFallback = contentFallback + self.ephemeral = ephemeral } } diff --git a/Tests/XMTPTests/IntegrationTests.swift b/Tests/XMTPTests/IntegrationTests.swift index 9c3cd6ca..7aca6f01 100644 --- a/Tests/XMTPTests/IntegrationTests.swift +++ b/Tests/XMTPTests/IntegrationTests.swift @@ -282,6 +282,36 @@ final class IntegrationTests: XCTestCase { await waitForExpectations(timeout: 3) } + func testStreamEphemeralInV1Conversation() async throws { + throw XCTSkip("integration only (requires local node)") + + let alice = try PrivateKey.generate() + let bob = try PrivateKey.generate() + + let clientOptions = ClientOptions(api: .init(env: .local, isSecure: false)) + let aliceClient = try await Client.create(account: alice, options: clientOptions) + try await aliceClient.publishUserContact(legacy: true) + let bobClient = try await Client.create(account: bob, options: clientOptions) + try await bobClient.publishUserContact(legacy: true) + + let expectation = expectation(description: "bob gets a streamed message") + + let convo = ConversationV1(client: bobClient, peerAddress: alice.address, sentAt: Date()) + + Task(priority: .userInitiated) { + for try await _ in convo.streamEphemeral() { + expectation.fulfill() + } + } + + try await convo.send(content: "hi", options: .init(ephemeral: true)) + + let messages = try await convo.messages() + XCTAssertEqual(0, messages.count) + + await waitForExpectations(timeout: 3) + } + func testStreamEphemeralInV2Conversation() async throws { throw XCTSkip("integration only (requires local node)") @@ -311,7 +341,7 @@ final class IntegrationTests: XCTestCase { } } - try await aliceConversation.sendEphemeral(content: "hi") + try await aliceConversation.send(content: "hi", options: .init(ephemeral: true)) let messages = try await aliceConversation.messages() XCTAssertEqual(0, messages.count) From af2bb66fdfe5d207b38d49da9a7a49b6b9eb4153 Mon Sep 17 00:00:00 2001 From: Pat Nakajima Date: Tue, 4 Apr 2023 18:13:18 -0700 Subject: [PATCH 7/9] Just remove this --- dev/local/docker-compose.yml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/dev/local/docker-compose.yml b/dev/local/docker-compose.yml index 6241b5f4..c2926ee2 100644 --- a/dev/local/docker-compose.yml +++ b/dev/local/docker-compose.yml @@ -29,9 +29,3 @@ services: image: postgres:13 environment: POSTGRES_PASSWORD: xmtp - # js: - # restart: always - # depends_on: - # wakunode: - # condition: service_healthy - # build: ./dev/test From c46244979596894c6dbfae4a6f71ad503291a8b2 Mon Sep 17 00:00:00 2001 From: Pat Nakajima Date: Tue, 4 Apr 2023 18:18:19 -0700 Subject: [PATCH 8/9] Remove unused param --- Sources/XMTP/ConversationV2.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/XMTP/ConversationV2.swift b/Sources/XMTP/ConversationV2.swift index d77d6e2f..6f2fc205 100644 --- a/Sources/XMTP/ConversationV2.swift +++ b/Sources/XMTP/ConversationV2.swift @@ -168,7 +168,7 @@ public struct ConversationV2 { try MessageV2.decode(message, keyMaterial: keyMaterial) } - @discardableResult func send(content: T, options: SendOptions? = nil, ephemeral _: Bool = false) async throws -> String { + @discardableResult func send(content: T, options: SendOptions? = nil) async throws -> String { let preparedMessage = try await prepareMessage(content: content, options: options, ephemeral: options?.ephemeral == true) try await preparedMessage.send() return preparedMessage.messageID From 1736e1292a6a7130fd7554d58119e89c4d3595ec Mon Sep 17 00:00:00 2001 From: Pat Nakajima Date: Tue, 4 Apr 2023 18:20:39 -0700 Subject: [PATCH 9/9] Add missing ephemeral stream --- Sources/XMTP/Conversation.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/XMTP/Conversation.swift b/Sources/XMTP/Conversation.swift index 938e5ae9..860a548b 100644 --- a/Sources/XMTP/Conversation.swift +++ b/Sources/XMTP/Conversation.swift @@ -141,8 +141,8 @@ public enum Conversation { public func streamEphemeral() -> AsyncThrowingStream? { switch self { - case .v1: - return nil + case let .v1(conversation): + return conversation.streamEphemeral() case let .v2(conversation): return conversation.streamEphemeral() }