Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ephemeral topics #87

Merged
merged 12 commits into from
Aug 3, 2023
13 changes: 13 additions & 0 deletions Sources/XMTP/Conversation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ public enum Conversation {
}
}

public var clientAddress: String {
return client.address
}

/// The topic identifier for this conversation
public var topic: String {
switch self {
Expand All @@ -135,6 +139,15 @@ public enum Conversation {
}
}

public func streamEphemeral() -> AsyncThrowingStream<Envelope, Error>? {
switch self {
case .v1:
return nil
case let .v2(conversation):
return conversation.streamEphemeral()
}
}

/// Returns a stream you can iterate through to receive new messages in this conversation.
///
/// > Note: All messages in the conversation are returned by this stream. If you want to filter out messages
Expand Down
29 changes: 27 additions & 2 deletions Sources/XMTP/ConversationV1.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,23 @@ public struct ConversationV1 {
timestamp: date
)

let isEphemeral: Bool
if let options, options.ephemeral {
isEphemeral = true
} else {
isEphemeral = false
}

let messageEnvelope = Envelope(
topic: .directMessageV1(client.address, peerAddress),
topic: isEphemeral ? ephemeralTopic : topic.description,
timestamp: date,
message: try Message(v1: message).serializedData()
)

return PreparedMessage(messageEnvelope: messageEnvelope, conversation: .v1(self)) {
var envelopes = [messageEnvelope]

if client.contacts.needsIntroduction(peerAddress) {
if client.contacts.needsIntroduction(peerAddress) && !isEphemeral {
envelopes.append(contentsOf: [
Envelope(
topic: .userIntro(peerAddress),
Expand Down Expand Up @@ -133,6 +140,24 @@ public struct ConversationV1 {
}
}

var ephemeralTopic: String {
topic.description.replacingOccurrences(of: "/xmtp/0/dm-", with: "/xmtp/0/dmE-")
}

public func streamEphemeral() -> AsyncThrowingStream<Envelope, Error> {
AsyncThrowingStream { continuation in
Task {
do {
for try await envelope in client.subscribe(topics: [ephemeralTopic]) {
continuation.yield(envelope)
}
} catch {
continuation.finish(throwing: error)
}
}
}
}

func messages(limit: Int? = nil, before: Date? = nil, after: Date? = nil) async throws -> [DecodedMessage] {
let pagination = Pagination(limit: limit, startTime: before, endTime: after)

Expand Down
26 changes: 23 additions & 3 deletions Sources/XMTP/ConversationV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public struct ConversationV2 {
ConversationV2Container(topic: topic, keyMaterial: keyMaterial, conversationID: context.conversationID, metadata: context.metadata, peerAddress: peerAddress, header: header)
}

func prepareMessage<T>(content: T, options: SendOptions?) async throws -> PreparedMessage {
func prepareMessage<T>(content: T, options: SendOptions?, ephemeral: Bool = false) async throws -> PreparedMessage {
let codec = Client.codecRegistry.find(for: options?.contentType)

func encode<Codec: ContentCodec>(codec: Codec, content: Any) throws -> EncodedContent {
Expand All @@ -91,6 +91,8 @@ public struct ConversationV2 {
encoded = try encoded.compress(compression)
}

let topic = ephemeral ? ephemeralTopic : topic

let message = try await MessageV2.encode(
client: client,
content: encoded,
Expand Down Expand Up @@ -119,6 +121,24 @@ public struct ConversationV2 {
}
}

var ephemeralTopic: String {
topic.replacingOccurrences(of: "/xmtp/0/m", with: "/xmtp/0/mE")
}

public func streamEphemeral() -> AsyncThrowingStream<Envelope, Error> {
AsyncThrowingStream { continuation in
Task {
do {
for try await envelope in client.subscribe(topics: [ephemeralTopic]) {
continuation.yield(envelope)
}
} catch {
continuation.finish(throwing: error)
}
}
}
}

public func streamMessages() -> AsyncThrowingStream<DecodedMessage, Error> {
AsyncThrowingStream { continuation in
Task {
Expand Down Expand Up @@ -148,8 +168,8 @@ public struct ConversationV2 {
try MessageV2.decode(message, keyMaterial: keyMaterial)
}

@discardableResult func send<T>(content: T, options: SendOptions? = nil) async throws -> String {
let preparedMessage = try await prepareMessage(content: content, options: options)
@discardableResult func send<T>(content: T, options: SendOptions? = nil, ephemeral _: Bool = false) async throws -> String {
let preparedMessage = try await prepareMessage(content: content, options: options, ephemeral: options?.ephemeral == true)
try await preparedMessage.send()
return preparedMessage.messageID
}
Expand Down
4 changes: 3 additions & 1 deletion Sources/XMTP/SendOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ public struct SendOptions {
public var compression: EncodedContentCompression?
public var contentType: ContentTypeID?
public var contentFallback: String?
public var ephemeral: Bool = false

public init(compression: EncodedContentCompression? = nil, contentType: ContentTypeID? = nil, contentFallback: String? = nil) {
public init(compression: EncodedContentCompression? = nil, contentType: ContentTypeID? = nil, contentFallback: String? = nil, ephemeral: Bool = false) {
self.compression = compression
self.contentType = contentType
self.contentFallback = contentFallback
self.ephemeral = ephemeral
}
}
67 changes: 67 additions & 0 deletions Tests/XMTPTests/IntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,73 @@ final class IntegrationTests: XCTestCase {
await waitForExpectations(timeout: 3)
}

func testStreamEphemeralInV1Conversation() async throws {
throw XCTSkip("integration only (requires local node)")

let alice = try PrivateKey.generate()
let bob = try PrivateKey.generate()

let clientOptions = ClientOptions(api: .init(env: .local, isSecure: false))
let aliceClient = try await Client.create(account: alice, options: clientOptions)
try await aliceClient.publishUserContact(legacy: true)
let bobClient = try await Client.create(account: bob, options: clientOptions)
try await bobClient.publishUserContact(legacy: true)

let expectation = expectation(description: "bob gets a streamed message")

let convo = ConversationV1(client: bobClient, peerAddress: alice.address, sentAt: Date())

Task(priority: .userInitiated) {
for try await _ in convo.streamEphemeral() {
expectation.fulfill()
}
}

try await convo.send(content: "hi", options: .init(ephemeral: true))

let messages = try await convo.messages()
XCTAssertEqual(0, messages.count)

await waitForExpectations(timeout: 3)
}

func testStreamEphemeralInV2Conversation() async throws {
throw XCTSkip("integration only (requires local node)")

let alice = try PrivateKey.generate()
let bob = try PrivateKey.generate()

let clientOptions = ClientOptions(api: .init(env: .local, isSecure: false))
let aliceClient = try await Client.create(account: alice, options: clientOptions)
let bobClient = try await Client.create(account: bob, options: clientOptions)

let aliceConversation = try await aliceClient.conversations.newConversation(with: bob.walletAddress, context: .init(conversationID: "https://example.com/3"))

let expectation = expectation(description: "bob gets a streamed message")

guard case let .v2(bobConversation) = try await
bobClient.conversations.newConversation(with: alice.walletAddress, context: .init(conversationID: "https://example.com/3"))
else {
XCTFail("Did not create v2 convo")
return
}

XCTAssertEqual(bobConversation.topic, aliceConversation.topic)

Task(priority: .userInitiated) {
for try await _ in bobConversation.streamEphemeral() {
expectation.fulfill()
}
}

try await aliceConversation.send(content: "hi", options: .init(ephemeral: true))

let messages = try await aliceConversation.messages()
XCTAssertEqual(0, messages.count)

await waitForExpectations(timeout: 3)
}

func testCanPaginateV1Messages() async throws {
throw XCTSkip("integration only (requires local node)")

Expand Down
7 changes: 1 addition & 6 deletions dev/local/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ services:
- --ws
- --store
- --message-db-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable
- --message-db-reader-connection-string=postgres://postgres:xmtp@db:5432/postgres?sslmode=disable
- --lightpush
- --filter
- --ws-port=9001
Expand All @@ -28,9 +29,3 @@ services:
image: postgres:13
environment:
POSTGRES_PASSWORD: xmtp
js:
restart: always
depends_on:
wakunode:
condition: service_healthy
build: ./dev/test
Comment on lines -33 to -38
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We weren't using this anymore so I just deleted it.