Skip to content

Commit

Permalink
[ISSUE #1078]🚀Support send transaction message for client🍻
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Oct 26, 2024
1 parent 4df11a6 commit ae75277
Show file tree
Hide file tree
Showing 13 changed files with 684 additions and 167 deletions.
6 changes: 5 additions & 1 deletion rocketmq-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,8 @@ path = "examples/ordermessage/ordermessage_producer.rs"

[[example]]
name = "ordermessage-consumer"
path = "examples/ordermessage/ordermessage_consumer.rs"
path = "examples/ordermessage/ordermessage_consumer.rs"

[[example]]
name = "transaction-producer"
path = "examples/transaction/transaction_producer.rs"
99 changes: 99 additions & 0 deletions rocketmq-client/examples/transaction/transaction_producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::any::Any;
use std::collections::HashMap;
use std::sync::atomic::AtomicI32;
use std::sync::Arc;

use parking_lot::Mutex;
use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client::producer::local_transaction_state::LocalTransactionState;
use rocketmq_client::producer::mq_producer::MQProducer;
use rocketmq_client::producer::transaction_listener::TransactionListener;
use rocketmq_client::Result;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_rust::rocketmq;

pub const MESSAGE_COUNT: usize = 1;
pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TopicTest";
pub const TAG: &str = "TagA";

#[rocketmq::main]
pub async fn main() -> Result<()> {
//init logger
rocketmq_common::log::init_logger();

// create a producer builder with default configuration
let builder = DefaultMQProducer::builder();

let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.build();

producer.start().await?;

for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());

let send_result = producer.send_with_timeout(message, 2000).await?;
println!("send result: {}", send_result);
}
producer.shutdown().await;

Ok(())
}

struct TransactionListenerImpl {
local_trans: Arc<Mutex<HashMap<String, i32>>>,
transaction_index: AtomicI32,
}

impl Default for TransactionListenerImpl {
fn default() -> Self {
Self {
local_trans: Arc::new(Default::default()),
transaction_index: Default::default(),
}
}
}

impl TransactionListener for TransactionListenerImpl {
fn execute_local_transaction(&self, msg: &Message, arg: &dyn Any) -> LocalTransactionState {
let value = self
.transaction_index
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
let status = value % 3;
let mut guard = self.local_trans.lock();
guard.insert(msg.get_transaction_id().to_string(), status);
LocalTransactionState::Unknown
}

fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState {
let mut guard = self.local_trans.lock();
let status = guard.get(msg.transaction_id()).unwrap_or(&-1);
match status {
1 => LocalTransactionState::CommitMessage,
2 => LocalTransactionState::RollbackMessage,
_ => LocalTransactionState::Unknown,
}
}
}
15 changes: 6 additions & 9 deletions rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::mix_all;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_common::WeakCellWrapper;
use rocketmq_remoting::base::connection_net_event::ConnectionNetEvent;
use rocketmq_remoting::protocol::heartbeat::consumer_data::ConsumerData;
use rocketmq_remoting::protocol::heartbeat::heartbeat_data::HeartbeatData;
Expand Down Expand Up @@ -60,7 +59,6 @@ use crate::implementation::mq_admin_impl::MQAdminImpl;
use crate::implementation::mq_client_api_impl::MQClientAPIImpl;
use crate::producer::default_mq_producer::DefaultMQProducer;
use crate::producer::default_mq_producer::ProducerConfig;
use crate::producer::producer_impl::mq_producer_inner::MQProducerInner;
use crate::producer::producer_impl::mq_producer_inner::MQProducerInnerImpl;
use crate::producer::producer_impl::topic_publish_info::TopicPublishInfo;
use crate::Result;
Expand All @@ -75,7 +73,7 @@ pub struct MQClientInstance {
* The container of the producer in the current client. The key is the name of
* producerGroup.
*/
producer_table: Arc<RwLock<HashMap<String, Box<dyn MQProducerInner>>>>,
producer_table: Arc<RwLock<HashMap<String, MQProducerInnerImpl>>>,
/**
* The container of the consumer in the current client. The key is the name of
* consumer_group.
Expand Down Expand Up @@ -328,7 +326,7 @@ impl MQClientInstance {

pub async fn shutdown(&mut self) {}

pub async fn register_producer(&mut self, group: &str, producer: impl MQProducerInner) -> bool {
pub async fn register_producer(&mut self, group: &str, producer: MQProducerInnerImpl) -> bool {
if group.is_empty() {
return false;
}
Expand All @@ -337,7 +335,7 @@ impl MQClientInstance {
warn!("the producer group[{}] exist already.", group);
return false;
}
producer_table.insert(group.to_string(), Box::new(producer));
producer_table.insert(group.to_string(), producer);
true
}

Expand Down Expand Up @@ -1057,10 +1055,9 @@ impl MQClientInstance {
consumer_table.get(group).cloned()
}

pub async fn select_producer(&self, group: &str) -> Option<Box<dyn MQProducerInner>> {
/*let producer_table = self.producer_table.read().await;
producer_table.get(group).cloned()*/
unimplemented!("select_producer")
pub async fn select_producer(&self, group: &str) -> Option<MQProducerInnerImpl> {
let producer_table = self.producer_table.read().await;
producer_table.get(group).cloned()
}

pub async fn unregister_consumer(&mut self, group: impl Into<String>) {
Expand Down
4 changes: 2 additions & 2 deletions rocketmq-client/src/hook/send_message_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::sync::Arc;
use rocketmq_common::common::message::message_enum::MessageType;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::WeakCellWrapper;
use rocketmq_common::ArcRefCellWrapper;

use crate::implementation::communication_mode::CommunicationMode;
use crate::producer::producer_impl::default_mq_producer_impl::DefaultMQProducerImpl;
Expand All @@ -39,7 +39,7 @@ pub struct SendMessageContext<'a> {
pub exception: Option<Arc<Box<dyn Error + Send + Sync>>>,
pub mq_trace_context: Option<Arc<Box<dyn std::any::Any + Send + Sync>>>,
pub props: HashMap<String, String>,
pub producer: Option<WeakCellWrapper<DefaultMQProducerImpl>>,
pub producer: Option<ArcRefCellWrapper<DefaultMQProducerImpl>>,
pub msg_type: Option<MessageType>,
pub namespace: Option<String>,
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use tracing::info;
use tracing::warn;

use crate::factory::mq_client_instance::MQClientInstance;
use crate::producer::producer_impl::mq_producer_inner::MQProducerInner;
use crate::producer::request_future_holder::REQUEST_FUTURE_HOLDER;

#[derive(Clone)]
Expand Down Expand Up @@ -217,7 +216,7 @@ impl ClientRemotingProcessor {
ctx: ConnectionHandlerContext,
mut request: RemotingCommand,
) -> Result<Option<RemotingCommand>> {
let mut request_header = request
let request_header = request
.decode_command_custom_header::<CheckTransactionStateRequestHeader>()
.unwrap();
let message_ext = MessageDecoder::decode(
Expand Down Expand Up @@ -255,8 +254,8 @@ impl ClientRemotingProcessor {
let addr = channel.remote_address().to_string();
producer.check_transaction_state(
addr.as_str(),
&message_ext,
&request_header,
message_ext,
request_header,
);
} else {
warn!("checkTransactionState, pick producer group failed");
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
#![allow(dead_code)]
#![allow(unused_variables)]
#![recursion_limit = "256"]

extern crate core;

use crate::error::MQClientError;
Expand Down
1 change: 1 addition & 0 deletions rocketmq-client/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ pub mod send_callback;
pub mod send_result;
pub mod send_status;
pub mod transaction_listener;
pub mod transaction_mq_produce_builder;
pub mod transaction_mq_producer;
pub mod transaction_send_result;
10 changes: 3 additions & 7 deletions rocketmq-client/src/producer/default_mq_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::producer::produce_accumulator::ProduceAccumulator;
use crate::producer::producer_impl::default_mq_producer_impl::DefaultMQProducerImpl;
use crate::producer::send_callback::SendMessageCallback;
use crate::producer::send_result::SendResult;
use crate::producer::transaction_listener::TransactionListener;
use crate::producer::transaction_send_result::TransactionSendResult;
use crate::trace::async_trace_dispatcher::AsyncTraceDispatcher;
use crate::trace::hook::end_transaction_trace_hook_impl::EndTransactionTraceHookImpl;
Expand Down Expand Up @@ -350,12 +349,11 @@ impl DefaultMQProducer {

pub fn set_default_mqproducer_impl(&mut self, default_mqproducer_impl: DefaultMQProducerImpl) {
let wrapper = ArcRefCellWrapper::new(default_mqproducer_impl);
let weak = ArcRefCellWrapper::downgrade(&wrapper);
self.default_mqproducer_impl = Some(wrapper);
self.default_mqproducer_impl = Some(wrapper.clone());
self.default_mqproducer_impl
.as_mut()
.unwrap()
.set_default_mqproducer_impl_inner(weak);
.set_default_mqproducer_impl_inner(wrapper);
}

pub fn set_retry_response_codes(&mut self, retry_response_codes: HashSet<i32>) {
Expand Down Expand Up @@ -1004,15 +1002,13 @@ impl MQProducer for DefaultMQProducer {
.await
}

async fn send_message_in_transaction<T, TL>(
async fn send_message_in_transaction<T>(
&mut self,
msg: Message,
arg: T,
transaction_listener: TL,
) -> Result<TransactionSendResult>
where
T: std::any::Any + Sync + Send,
TL: TransactionListener + Send + Sync,
{
unimplemented!("DefaultMQProducer not support send_message_in_transaction")
}
Expand Down
7 changes: 2 additions & 5 deletions rocketmq-client/src/producer/mq_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use rocketmq_common::common::message::MessageTrait;

use crate::producer::send_callback::SendMessageCallback;
use crate::producer::send_result::SendResult;
use crate::producer::transaction_listener::TransactionListener;
use crate::producer::transaction_send_result::TransactionSendResult;
use crate::Result;

Expand Down Expand Up @@ -429,15 +428,13 @@ pub trait MQProducerLocal {
///
/// * `Result<TransactionSendResult>` - A result containing the transaction send result or an
/// error.
async fn send_message_in_transaction<T, TL>(
async fn send_message_in_transaction<T>(
&mut self,
msg: Message,
arg: T,
transaction_listener: TL,
) -> Result<TransactionSendResult>
where
T: std::any::Any + Sync + Send,
TL: TransactionListener + Send + Sync;
T: std::any::Any + Sync + Send;

/// Sends a batch of messages.
///
Expand Down
Loading

0 comments on commit ae75277

Please sign in to comment.