Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ephemeral topics #87

Merged
merged 12 commits into from
Aug 3, 2023
4 changes: 2 additions & 2 deletions Package.resolved
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/xmtp/xmtp-rust-swift",
"state" : {
"revision" : "41a1161cf06a86bab0aa886e450584a1191429b1",
"version" : "0.3.0-beta0"
"revision" : "4a76e5401fa780c40610e2f0d248f695261d08dd",
"version" : "0.3.1-beta0"
}
}
],
Expand Down
19 changes: 16 additions & 3 deletions Sources/XMTP/Conversation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ public enum Conversation: Sendable {
}
}

@discardableResult public func send(encodedContent: EncodedContent) async throws -> String {
@discardableResult public func send(encodedContent: EncodedContent, options: SendOptions? = nil) async throws -> String {
switch self {
case let .v1(conversationV1):
return try await conversationV1.send(encodedContent: encodedContent)
return try await conversationV1.send(encodedContent: encodedContent, options: options)
case let .v2(conversationV2):
return try await conversationV2.send(encodedContent: encodedContent)
return try await conversationV2.send(encodedContent: encodedContent, options: options)
}
}

Expand All @@ -160,6 +160,10 @@ public enum Conversation: Sendable {
}
}

public var clientAddress: String {
return client.address
}

/// The topic identifier for this conversation
public var topic: String {
switch self {
Expand All @@ -170,6 +174,15 @@ public enum Conversation: Sendable {
}
}

public func streamEphemeral() -> AsyncThrowingStream<Envelope, Error>? {
switch self {
case let .v1(conversation):
return conversation.streamEphemeral()
case let .v2(conversation):
return conversation.streamEphemeral()
}
}

/// Returns a stream you can iterate through to receive new messages in this conversation.
///
/// > Note: All messages in the conversation are returned by this stream. If you want to filter out messages
Expand Down
37 changes: 31 additions & 6 deletions Sources/XMTP/ConversationV1.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public struct ConversationV1 {
Topic.directMessageV1(client.address, peerAddress)
}

func prepareMessage(encodedContent: EncodedContent) async throws -> PreparedMessage {
func prepareMessage(encodedContent: EncodedContent, options: SendOptions?) async throws -> PreparedMessage {
guard let contact = try await client.contacts.find(peerAddress) else {
throw ContactBundleError.notFound
}
Expand All @@ -58,16 +58,23 @@ public struct ConversationV1 {
timestamp: date
)

let isEphemeral: Bool
if let options, options.ephemeral {
isEphemeral = true
} else {
isEphemeral = false
}

let messageEnvelope = Envelope(
topic: .directMessageV1(client.address, peerAddress),
topic: isEphemeral ? ephemeralTopic : topic.description,
timestamp: date,
message: try Message(v1: message).serializedData()
)

return PreparedMessage(messageEnvelope: messageEnvelope, conversation: .v1(self)) {
var envelopes = [messageEnvelope]

if client.contacts.needsIntroduction(peerAddress) {
if client.contacts.needsIntroduction(peerAddress) && !isEphemeral {
envelopes.append(contentsOf: [
Envelope(
topic: .userIntro(peerAddress),
Expand Down Expand Up @@ -107,7 +114,7 @@ public struct ConversationV1 {
encoded = try encoded.compress(compression)
}

return try await prepareMessage(encodedContent: encoded)
return try await prepareMessage(encodedContent: encoded, options: options)
}

@discardableResult func send(content: String, options: SendOptions? = nil) async throws -> String {
Expand All @@ -120,8 +127,8 @@ public struct ConversationV1 {
return preparedMessage.messageID
}

@discardableResult func send(encodedContent: EncodedContent) async throws -> String {
let preparedMessage = try await prepareMessage(encodedContent: encodedContent)
@discardableResult func send(encodedContent: EncodedContent, options: SendOptions?) async throws -> String {
let preparedMessage = try await prepareMessage(encodedContent: encodedContent, options: options)
try await preparedMessage.send()
return preparedMessage.messageID
}
Expand All @@ -143,6 +150,24 @@ public struct ConversationV1 {
}
}

var ephemeralTopic: String {
topic.description.replacingOccurrences(of: "/xmtp/0/dm-", with: "/xmtp/0/dmE-")
}

public func streamEphemeral() -> AsyncThrowingStream<Envelope, Error> {
AsyncThrowingStream { continuation in
Task {
do {
for try await envelope in client.subscribe(topics: [ephemeralTopic]) {
continuation.yield(envelope)
}
} catch {
continuation.finish(throwing: error)
}
}
}
}

func messages(limit: Int? = nil, before: Date? = nil, after: Date? = nil) async throws -> [DecodedMessage] {
let pagination = Pagination(limit: limit, before: before, after: after)

Expand Down
28 changes: 24 additions & 4 deletions Sources/XMTP/ConversationV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@ public struct ConversationV2 {
ConversationV2Container(topic: topic, keyMaterial: keyMaterial, conversationID: context.conversationID, metadata: context.metadata, peerAddress: peerAddress, header: header)
}

func prepareMessage(encodedContent: EncodedContent) async throws -> PreparedMessage {
func prepareMessage(encodedContent: EncodedContent, options: SendOptions?) async throws -> PreparedMessage {
let message = try await MessageV2.encode(
client: client,
content: encodedContent,
topic: topic,
keyMaterial: keyMaterial
)

let topic = options?.ephemeral == true ? ephemeralTopic : topic

let envelope = Envelope(topic: topic, timestamp: Date(), message: try Message(v2: message).serializedData())
return PreparedMessage(messageEnvelope: envelope, conversation: .v2(self)) {
try await client.publish(envelopes: [envelope])
Expand All @@ -107,7 +109,7 @@ public struct ConversationV2 {
encoded = try encoded.compress(compression)
}

return try await prepareMessage(encodedContent: encoded)
return try await prepareMessage(encodedContent: encoded, options: options)
}

func messages(limit: Int? = nil, before: Date? = nil, after: Date? = nil) async throws -> [DecodedMessage] {
Expand All @@ -124,6 +126,24 @@ public struct ConversationV2 {
}
}

var ephemeralTopic: String {
topic.replacingOccurrences(of: "/xmtp/0/m", with: "/xmtp/0/mE")
}

public func streamEphemeral() -> AsyncThrowingStream<Envelope, Error> {
AsyncThrowingStream { continuation in
Task {
do {
for try await envelope in client.subscribe(topics: [ephemeralTopic]) {
continuation.yield(envelope)
}
} catch {
continuation.finish(throwing: error)
}
}
}
}

public func streamMessages() -> AsyncThrowingStream<DecodedMessage, Error> {
AsyncThrowingStream { continuation in
Task {
Expand Down Expand Up @@ -165,8 +185,8 @@ public struct ConversationV2 {
return preparedMessage.messageID
}

@discardableResult func send(encodedContent: EncodedContent) async throws -> String {
let preparedMessage = try await prepareMessage(encodedContent: encodedContent)
@discardableResult func send(encodedContent: EncodedContent, options: SendOptions? = nil) async throws -> String {
let preparedMessage = try await prepareMessage(encodedContent: encodedContent, options: options)
try await preparedMessage.send()
return preparedMessage.messageID
}
Expand Down
4 changes: 3 additions & 1 deletion Sources/XMTP/SendOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ public struct SendOptions {
public var compression: EncodedContentCompression?
public var contentType: ContentTypeID?
public var contentFallback: String?
public var ephemeral: Bool = false

public init(compression: EncodedContentCompression? = nil, contentType: ContentTypeID? = nil, contentFallback: String? = nil) {
public init(compression: EncodedContentCompression? = nil, contentType: ContentTypeID? = nil, contentFallback: String? = nil, ephemeral: Bool = false) {
self.compression = compression
self.contentType = contentType
self.contentFallback = contentFallback
self.ephemeral = ephemeral
}
}
2 changes: 1 addition & 1 deletion Tests/XMTPTests/ConversationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ class ConversationTests: XCTestCase {

let encodedContent = try TextCodec().encode(content: "hi")

try await bobConversation.send(encodedContent: encodedContent)
try await bobConversation.send(encodedContent: encodedContent, options: nil)

let messages = try await aliceConversation.messages()

Expand Down
67 changes: 67 additions & 0 deletions Tests/XMTPTests/IntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,73 @@ final class IntegrationTests: XCTestCase {
XCTAssertEqual("hi bob", messages[0])
}

func testStreamEphemeralInV1Conversation() async throws {
throw XCTSkip("integration only (requires local node)")

let alice = try PrivateKey.generate()
let bob = try PrivateKey.generate()

let clientOptions = ClientOptions(api: .init(env: .local, isSecure: false))
let aliceClient = try await Client.create(account: alice, options: clientOptions)
try await aliceClient.publishUserContact(legacy: true)
let bobClient = try await Client.create(account: bob, options: clientOptions)
try await bobClient.publishUserContact(legacy: true)

let expectation = expectation(description: "bob gets a streamed message")

let convo = ConversationV1(client: bobClient, peerAddress: alice.address, sentAt: Date())

Task(priority: .userInitiated) {
for try await _ in convo.streamEphemeral() {
expectation.fulfill()
}
}

try await convo.send(content: "hi", options: .init(ephemeral: true))

let messages = try await convo.messages()
XCTAssertEqual(0, messages.count)

await waitForExpectations(timeout: 3)
}

func testStreamEphemeralInV2Conversation() async throws {
throw XCTSkip("integration only (requires local node)")

let alice = try PrivateKey.generate()
let bob = try PrivateKey.generate()

let clientOptions = ClientOptions(api: .init(env: .local, isSecure: false))
let aliceClient = try await Client.create(account: alice, options: clientOptions)
let bobClient = try await Client.create(account: bob, options: clientOptions)

let aliceConversation = try await aliceClient.conversations.newConversation(with: bob.walletAddress, context: .init(conversationID: "https://example.com/3"))

let expectation = expectation(description: "bob gets a streamed message")

guard case let .v2(bobConversation) = try await
bobClient.conversations.newConversation(with: alice.walletAddress, context: .init(conversationID: "https://example.com/3"))
else {
XCTFail("Did not create v2 convo")
return
}

XCTAssertEqual(bobConversation.topic, aliceConversation.topic)

Task(priority: .userInitiated) {
for try await _ in bobConversation.streamEphemeral() {
expectation.fulfill()
}
}

try await aliceConversation.send(content: "hi", options: .init(ephemeral: true))

let messages = try await aliceConversation.messages()
XCTAssertEqual(0, messages.count)

await waitForExpectations(timeout: 3)
}

func testCanPaginateV1Messages() async throws {
try TestConfig.skipIfNotRunningLocalNodeTests()

Expand Down
6 changes: 0 additions & 6 deletions dev/local/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,3 @@ services:
image: postgres:13
environment:
POSTGRES_PASSWORD: xmtp
js:
restart: always
depends_on:
wakunode:
condition: service_healthy
build: ./dev/test
Comment on lines -33 to -38
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We weren't using this anymore so I just deleted it.