Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1078]🚀Support send transaction message for client🍻 #1088

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Oct 26, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #1078

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced a new transaction-producer example for RocketMQ transaction management.
    • Added TransactionMQProducer and TransactionMQProducerBuilder for enhanced transactional message production.
    • New method for decoding message IDs added, improving message handling capabilities.
  • Improvements

    • Enhanced EndTransactionRequestHeader for better transaction request handling.
    • Mutable access to the body field in RemotingCommand allows for direct modifications.
  • Bug Fixes

    • Corrected formatting in Cargo.toml for better readability.

These updates provide users with improved functionality and usability in managing transactions within the RocketMQ client.

Copy link
Contributor

coderabbitai bot commented Oct 26, 2024

Walkthrough

The pull request introduces several enhancements to the RocketMQ client, including the addition of a new transaction producer example and modifications to existing structures and methods to support transactional message handling. Key changes involve the implementation of a TransactionMQProducer, updates to the MQClientInstance, and the introduction of new request handling capabilities. The changes also include structural updates for better type safety and mutability, along with new utility functions for message ID decoding.

Changes

File Path Change Summary
rocketmq-client/Cargo.toml Added example transaction-producer and ensured newline at the end for ordermessage-consumer.
rocketmq-client/examples/transaction/transaction_producer.rs Implemented a transaction producer with message configuration and transaction listener.
rocketmq-client/src/factory/mq_client_instance.rs Updated MQClientInstance to use MQProducerInnerImpl and ArcRefCellWrapper<ClientConfig>.
rocketmq-client/src/hook/end_transaction_context.rs Expanded EndTransactionContext with new fields for transaction management.
rocketmq-client/src/hook/send_message_context.rs Changed producer field type to Option<ArcRefCellWrapper<DefaultMQProducerImpl>>.
rocketmq-client/src/implementation/client_remoting_processor.rs Added handling for new request codes and implemented check_transaction_state method.
rocketmq-client/src/implementation/mq_client_api_impl.rs Introduced end_transaction_oneway method for sending transaction end requests.
rocketmq-client/src/lib.rs Added #![recursion_limit = "256"] for macro recursion depth.
rocketmq-client/src/producer.rs Added new modules: transaction_mq_produce_builder and transaction_mq_producer.
rocketmq-client/src/producer/default_mq_producer.rs Updated send_message_in_transaction method signature and handling.
rocketmq-client/src/producer/local_transaction_state.rs Implemented Display trait for LocalTransactionState.
rocketmq-client/src/producer/mq_producer.rs Updated send_message_in_transaction method signature to accept a generic type parameter.
rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs Added transaction-related fields and methods for handling message transactions.
rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs Introduced MQProducerInnerImpl struct with updated method signatures.
rocketmq-client/src/producer/transaction_mq_produce_builder.rs Created builder for TransactionMQProducer with multiple configuration options.
rocketmq-client/src/producer/transaction_mq_producer.rs Implemented TransactionMQProducer struct with transactional message handling capabilities.
rocketmq-common/src/common/message/message_decoder.rs Added decode_message_id function for message ID decoding.
rocketmq-common/src/common/message/message_id.rs Made MessageId struct public and added cloning and comparison traits.
rocketmq-common/src/utils/util_all.rs Changed string_to_bytes function visibility to public.
rocketmq-remoting/src/protocol/header.rs Added new module for end_transaction_request_header.
rocketmq-remoting/src/protocol/header/end_transaction_request_header.rs Introduced EndTransactionRequestHeader struct with serialization and validation logic.
rocketmq-remoting/src/protocol/remoting_command.rs Added get_body_mut method for mutable access to body field.

Assessment against linked issues

Objective Addressed Explanation
Support sending transaction messages for client (#1078)

Possibly related PRs

Suggested labels

approved, auto merge

Suggested reviewers

  • TeslaRustor
  • SpaceXCN

Poem

In the garden where messages bloom,
A transaction producer finds room.
With each message sent, a tale unfolds,
In RocketMQ's realm, where magic beholds.
So hop with joy, let the data flow,
For every transaction, we reap what we sow! 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@mxsm
Copy link
Owner Author

mxsm commented Oct 26, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 34

🧹 Outside diff range and nitpick comments (24)
rocketmq-common/src/common/message/message_id.rs (1)

20-24: Consider adding documentation for this public API.

Since this is now a public struct used for message identification, it would be helpful to add documentation explaining:

  • The purpose of MessageId
  • The significance of the address and offset fields
  • Any constraints or assumptions about the values

Example documentation:

 #[derive(Debug, Clone, PartialEq, Eq, Copy)]
+/// Uniquely identifies a message in RocketMQ using broker address and offset.
+///
+/// # Fields
+/// * `address` - The socket address of the broker storing the message
+/// * `offset` - The position of the message in the broker's storage
 pub struct MessageId {
     pub address: SocketAddr,
     pub offset: i64,
 }
rocketmq-client/src/hook/end_transaction_context.rs (1)

21-29: Add documentation for the EndTransactionContext struct.

The struct appears to be well-structured for transaction handling, but it lacks documentation explaining its purpose, usage, and the significance of each field. Consider adding rustdoc comments.

Apply this diff to add documentation:

+/// Represents the context for ending a transaction in RocketMQ.
+/// This struct contains all necessary information to complete a transaction operation.
 pub struct EndTransactionContext<'a> {
+    /// The producer group responsible for the transaction
     pub producer_group: String,
+    /// The address of the broker handling the transaction
     pub broker_addr: String,
+    /// Reference to the message involved in the transaction
     pub message: &'a Message,
+    /// Unique identifier for the message
     pub msg_id: String,
+    /// Unique identifier for the transaction
     pub transaction_id: String,
+    /// Current state of the local transaction
     pub transaction_state: LocalTransactionState,
+    /// Indicates if this context is created from a transaction check
     pub from_transaction_check: bool,
 }
rocketmq-client/src/producer.rs (1)

31-32: LGTM! Consider grouping transaction-related modules together.

The new module declarations align well with the PR's objective of supporting transaction messages and follow the existing naming conventions.

Consider grouping all transaction-related modules together for better code organization. You could move these declarations next to other transaction-related modules like transaction_listener and transaction_send_result.

pub mod send_callback;
pub mod send_result;
pub mod send_status;
pub mod transaction_listener;
+pub mod transaction_mq_produce_builder;
+pub mod transaction_mq_producer;
pub mod transaction_send_result;
-pub mod transaction_mq_produce_builder;
-pub mod transaction_mq_producer;
rocketmq-client/src/hook/send_message_context.rs (1)

41-43: Consider adding documentation for the producer field.

Adding documentation comments would help explain:

  • The purpose of using ArcRefCellWrapper
  • Thread safety guarantees
  • Usage guidelines for interior mutability
+    /// Producer instance wrapped in ArcRefCellWrapper for thread-safe mutable access.
+    /// The wrapper provides:
+    /// - Thread-safe sharing through Arc
+    /// - Interior mutability through RefCell
     pub producer: Option<ArcRefCellWrapper<DefaultMQProducerImpl>>,
rocketmq-common/src/utils/util_all.rs (2)

194-194: Add documentation for the public function.

Since this function is now public, it should have documentation explaining its purpose, parameters, return value, and example usage.

Add documentation like this:

+/// Converts a hexadecimal string to a byte vector.
+///
+/// # Arguments
+///
+/// * `hex_string` - A string containing hexadecimal characters (0-9, A-F). Case-insensitive.
+///
+/// # Returns
+///
+/// * `Some(Vec<u8>)` - The converted bytes if the input is valid
+/// * `None` - If the input is empty or contains invalid characters
+///
+/// # Examples
+///
+/// ```
+/// let bytes = string_to_bytes("414243").unwrap();
+/// assert_eq!(bytes, vec![0x41, 0x42, 0x43]);
+/// ```
pub fn string_to_bytes(hex_string: impl Into<String>) -> Option<Vec<u8>> {

Function needs test coverage for hex string conversion functionality

The implementation looks correct, but there are no specific tests for the string_to_bytes function despite it being used in a critical message decoding path. The function is used in message_decoder.rs to decode message IDs, which makes it an important component that should be thoroughly tested.

Key findings:

  • Function is used in decode_message_id for parsing message identifiers
  • No dedicated test cases found for hex string conversion
  • No alternative hex conversion utilities exist in the codebase
🔗 Analysis chain

Line range hint 194-215: LGTM! The implementation is correct and well-tested.

The function is well-implemented with proper:

  • Input validation
  • Error handling using Option
  • Performance optimization with capacity pre-allocation
  • Safe operations throughout

Let's verify its usage in the codebase:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of string_to_bytes function
# Expected: Used in message_decoder.rs for decoding message IDs

# Search for direct usage of the function
rg "string_to_bytes\(" --type rust

# Search for potential test cases
rg "string_to_bytes" -A 5 --type rust -g "*test*.rs"

Length of output: 465


Script:

#!/bin/bash
# Let's verify the implementation and test coverage more thoroughly

# Check the test file in the same directory as util_all.rs
fd "util_all" --type f --extension rs | grep "test"

# Look for test modules within util_all.rs
rg "^mod test" -A 10 rocketmq-common/src/utils/util_all.rs

# Check message_decoder.rs usage context
rg -B 5 -A 5 "string_to_bytes" rocketmq-common/src/common/message/message_decoder.rs

# Check if there are any other hex string conversion utilities
rg "hex.*string|string.*hex" --type rust

Length of output: 1546

rocketmq-common/src/common/message/message_decoder.rs (2)

391-395: Fix indentation in store size calculation comments.

The comments are accurate but have inconsistent indentation compared to the surrounding code.

Apply this diff to fix the indentation:

-           + 4 // 2 MAGICCOD
-           + 4 // 3 BODYCRC
-           + 4 // 4 FLAG
-           + 4 + body_len // 4 BODY
-           + 2 + properties_length;
+        + 4 // 2 MAGICCOD
+        + 4 // 3 BODYCRC
+        + 4 // 4 FLAG
+        + 4 + body_len // 4 BODY
+        + 2 + properties_length;

582-588: Add more test cases for message ID decoding.

The current test only covers the IPv4 case. Consider adding:

  1. IPv6 message ID test case
  2. Invalid length test case
  3. Invalid format test case

Here's an example of additional test cases:

#[test]
fn decode_message_id_ipv6() {
    let msg_id = "ABCD1234ABCD1234ABCD1234ABCD12340007D8260BF075769D36C348";
    let message_id = decode_message_id(msg_id);
    assert_eq!(message_id.address, "[abcd:1234:abcd:1234:abcd:1234:abcd:1234]:55334".parse().unwrap());
    assert_eq!(message_id.offset, 860316681131967304);
}

#[test]
#[should_panic(expected = "Invalid message ID length")]
fn decode_message_id_invalid_length() {
    decode_message_id("invalid");
}
rocketmq-remoting/src/protocol/remoting_command.rs (1)

519-521: LGTM! Consider adding documentation.

The implementation of get_body_mut follows Rust idioms well and provides the necessary mutable access to support transaction message handling. Consider adding documentation to explain its purpose and usage in the transaction message context.

Add documentation like this:

+    /// Returns a mutable reference to the command body.
+    /// 
+    /// This method is particularly useful when modifying message bodies during transaction processing.
     pub fn get_body_mut(&mut self) -> Option<&mut Bytes> {
         self.body.as_mut()
     }
rocketmq-client/src/implementation/mq_client_api_impl.rs (1)

1135-1150: Consider adding parameter validation.

The implementation looks good and follows the established patterns. However, consider adding validation for:

  • addr: Ensure it's not empty
  • timeout_millis: Ensure it's within reasonable bounds
 pub async fn end_transaction_oneway(
     &mut self,
     addr: &str,
     request_header: EndTransactionRequestHeader,
     remark: String,
     timeout_millis: u64,
 ) -> Result<()> {
+    if addr.is_empty() {
+        return Err(MQClientError::MQClientErr(-1, "addr cannot be empty".to_string()));
+    }
+    if timeout_millis == 0 || timeout_millis > 3600000 { // 1 hour max
+        return Err(MQClientError::MQClientErr(-1, "invalid timeout_millis".to_string()));
+    }
+
     let request =
         RemotingCommand::create_request_command(RequestCode::EndTransaction, request_header)
             .set_remark(Some(remark));
rocketmq-client/examples/transaction/transaction_producer.rs (4)

60-60: Handle the result of producer.shutdown().await

The shutdown method returns a Result. It's good practice to handle potential errors by using the ? operator to propagate them.

Apply this diff to handle possible errors during shutdown:

-producer.shutdown().await;
+producer.shutdown().await?;

33-33: Remove unused MESSAGE_COUNT constant

The MESSAGE_COUNT constant is defined but not used in the code. If it's unnecessary, consider removing it to keep the code clean.

Apply this diff to remove the unused constant:

-pub const MESSAGE_COUNT: usize = 1;

34-34: Replace placeholder producer group name with a meaningful identifier

The PRODUCER_GROUP constant is set to "please_rename_unique_group_name". For clarity and to avoid confusion, replace it with a meaningful producer group name.

Apply this diff to update the producer group name:

-pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name";
+pub const PRODUCER_GROUP: &str = "transaction_producer_group";

94-96: Define constants for transaction statuses

Using named constants instead of magic numbers improves code readability and maintainability. Define constants for the transaction status codes.

Apply this diff to define and use status constants:

+const STATUS_COMMIT: i32 = 1;
+const STATUS_ROLLBACK: i32 = 2;

 match status {
-    1 => LocalTransactionState::CommitMessage,
-    2 => LocalTransactionState::RollbackMessage,
+    STATUS_COMMIT => LocalTransactionState::CommitMessage,
+    STATUS_ROLLBACK => LocalTransactionState::RollbackMessage,
     _ => LocalTransactionState::Unknown,
 }
rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs (2)

74-83: Ensure consistent parameter naming in method signatures.

In the check_transaction_state method of MQProducerInnerImpl, the parameter is named addr, whereas in the trait it's broker_addr. This inconsistency can cause confusion.

Consider renaming addr to broker_addr for consistency:

pub fn check_transaction_state(
    &self,
-   addr: &str,
+   broker_addr: &str,
    msg: MessageExt,
    check_request_header: CheckTransactionStateRequestHeader,
) {
    if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner {
-       default_mqproducer_impl_inner.check_transaction_state(addr, msg, check_request_header);
+       default_mqproducer_impl_inner.check_transaction_state(broker_addr, msg, check_request_header);
    }
}

91-96: Simplify the is_unit_mode method with default value.

Since is_unit_mode returns false when default_mqproducer_impl_inner is None, consider simplifying the method.

pub fn is_unit_mode(&self) -> bool {
    if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner {
        return default_mqproducer_impl_inner.is_unit_mode();
    }
-   false
+   // Default to false if not set
    false
}

Ensure that returning false is the intended default behavior.

rocketmq-client/src/implementation/client_remoting_processor.rs (1)

261-264: Differentiate warning messages for better debugging

The warning messages at lines 261 and 264 are the same, which may make debugging difficult. Consider making them more specific.

You can update the warnings as follows:

                     } else {
-                        warn!("checkTransactionState, pick producer group failed");
+                        warn!("checkTransactionState: No producer found for group '{}'", group);
                     }
                 } else {
-                    warn!("checkTransactionState, pick producer group failed");
+                    warn!("checkTransactionState: Producer group not specified in message properties");
                 }
rocketmq-client/src/producer/transaction_mq_produce_builder.rs (2)

35-61: Improve readability by grouping related fields or adding documentation

The TransactionMQProducerBuilder struct contains numerous optional fields, which can make the code harder to read and maintain. Consider grouping related fields together or adding documentation comments to explain the purpose of each field group. This will enhance clarity for future maintainers.


248-335: Ensure required configurations are validated in the build method

The build method does not currently validate whether all necessary configurations have been provided. To prevent runtime errors, consider adding validation logic to check for the presence of essential fields and provide default values or error messages when they are missing.

rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (3)

Line range hint 99-111: Review use of Arc and ArcRefCellWrapper for shared state

The struct DefaultMQProducerImpl uses Arc and ArcRefCellWrapper for shared fields. Ensure that this usage properly handles concurrency and avoids potential deadlocks or mutable borrow issues.

Consider whether using Arc<Mutex<T>> or other concurrency primitives would be more appropriate for the shared mutable state.


1847-1849: Clear delay properties only if necessary

The code clears the PROPERTY_DELAY_TIME_LEVEL if it's not zero. Ensure that this is the intended behavior and that it doesn't affect other message properties unexpectedly.

Double-check that clearing this property is required and doesn't have side effects on message delivery timing.


1939-1945: Clarify logic for commit_or_rollback in request header

The mapping of local_transaction_state to commit_or_rollback flags in EndTransactionRequestHeader might benefit from comments or refactoring for clarity.

Consider adding comments or refactoring to make the mapping between transaction states and sys flags clearer.

rocketmq-remoting/src/protocol/header/end_transaction_request_header.rs (2)

27-40: Ensure field ordering aligns with serialization expectations

The #[serde(flatten)] attribute is applied to rpc_request_header after other fields. While this works, it's generally recommended to place flattened fields before other fields to avoid potential conflicts or unexpected behavior during serialization.

Consider rearranging the struct fields:

 pub struct EndTransactionRequestHeader {
+    #[serde(flatten)]
+    pub rpc_request_header: RpcRequestHeader,
     pub topic: String,
     pub producer_group: String,
     tran_state_table_offset: u64,
     commit_log_offset: u64,
     commit_or_rollback: i32,
     from_transaction_check: bool,
     msg_id: String,
     transaction_id: Option<String>,
-    #[serde(flatten)]
-    pub rpc_request_header: RpcRequestHeader,
 }

17-26: Organize imports alphabetically and by crate

For better readability and maintenance, consider organizing imports first by external crates, then by internal crates, and sorting them alphabetically.

Reorder the imports:

 use anyhow::anyhow;
 use anyhow::Error;
 use serde::Deserialize;
 use serde::Serialize;
+use crate::protocol::command_custom_header::CommandCustomHeader;
+use crate::protocol::command_custom_header::FromMap;
+use crate::rpc::rpc_request_header::RpcRequestHeader;
+use rocketmq_common::common::sys_flag::message_sys_flag::MessageSysFlag;
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 18afaac and ae75277.

📒 Files selected for processing (22)
  • rocketmq-client/Cargo.toml (1 hunks)
  • rocketmq-client/examples/transaction/transaction_producer.rs (1 hunks)
  • rocketmq-client/src/factory/mq_client_instance.rs (5 hunks)
  • rocketmq-client/src/hook/end_transaction_context.rs (1 hunks)
  • rocketmq-client/src/hook/send_message_context.rs (2 hunks)
  • rocketmq-client/src/implementation/client_remoting_processor.rs (4 hunks)
  • rocketmq-client/src/implementation/mq_client_api_impl.rs (2 hunks)
  • rocketmq-client/src/lib.rs (1 hunks)
  • rocketmq-client/src/producer.rs (1 hunks)
  • rocketmq-client/src/producer/default_mq_producer.rs (2 hunks)
  • rocketmq-client/src/producer/local_transaction_state.rs (2 hunks)
  • rocketmq-client/src/producer/mq_producer.rs (1 hunks)
  • rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (16 hunks)
  • rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs (2 hunks)
  • rocketmq-client/src/producer/transaction_mq_produce_builder.rs (1 hunks)
  • rocketmq-client/src/producer/transaction_mq_producer.rs (1 hunks)
  • rocketmq-common/src/common/message/message_decoder.rs (5 hunks)
  • rocketmq-common/src/common/message/message_id.rs (1 hunks)
  • rocketmq-common/src/utils/util_all.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/end_transaction_request_header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/remoting_command.rs (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • rocketmq-client/src/lib.rs
🔇 Additional comments (23)
rocketmq-common/src/common/message/message_id.rs (1)

20-21: LGTM! Appropriate trait implementations for message identification.

The added traits (Clone, PartialEq, Eq, Copy) and public visibility are well-suited for message identification in a distributed system. The implementation is safe as both fields (SocketAddr and i64) implement these traits.

rocketmq-client/src/hook/end_transaction_context.rs (1)

21-29: Verify transaction state handling across the codebase.

The introduction of transaction state handling might require consistent implementation across other components.

Let's verify the transaction state handling implementation:

✅ Verification successful

Based on the shell script results, I can now generate the final response:

Transaction state handling is consistently implemented across the codebase

The transaction state implementation shows a well-structured and consistent pattern:

  • LocalTransactionState enum is properly defined with serialization/deserialization
  • Transaction state is correctly used in producer implementations for commit/rollback handling
  • End transaction hooks provide proper extension points
  • Example code demonstrates correct usage patterns

The only minor concern is the unimplemented end_transaction method in EndTransactionTraceHookImpl, but this appears to be a planned extension point rather than a critical issue.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for consistent transaction state handling across the codebase

# Check for LocalTransactionState usage
echo "Checking LocalTransactionState usage patterns:"
rg -A 5 "LocalTransactionState"

# Check for transaction-related implementations
echo "Checking transaction-related implementations:"
ast-grep --pattern 'impl$$$Transaction$$$'

# Check for other transaction context usage
echo "Checking EndTransactionContext usage:"
rg -A 5 "EndTransactionContext"

Length of output: 19939

rocketmq-client/src/hook/send_message_context.rs (1)

42-42: LGTM! Thread-safe producer field type.

The change to Option<ArcRefCellWrapper<DefaultMQProducerImpl>> is a good improvement as it provides:

  • Thread-safe sharing through Arc
  • Interior mutability through RefCell
  • Consistent with the codebase's concurrency patterns
rocketmq-client/Cargo.toml (2)

84-85: LGTM! Improved formatting with newline.

The added newline improves readability and follows Rust's formatting conventions.


86-88: Verify the transaction example directory structure.

The new transaction producer example follows the established pattern of organizing examples by feature. Let's verify the directory structure exists.

✅ Verification successful

Directory structure verified and follows the established pattern

The transaction example follows the established pattern in the codebase:

  • Located at the correct path: rocketmq-client/examples/transaction/transaction_producer.rs
  • Follows the same directory structure as other examples like:
    • rocketmq-client/examples/batch/
    • rocketmq-client/examples/broadcast/
    • rocketmq-client/examples/ordermessage/
    • rocketmq-client/examples/quickstart/

The example's location and structure are consistent with the project's conventions.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the transaction example directory structure and file

# Check if the example directory and file exist
fd --type d "transaction$" "examples/"
fd --type f "transaction_producer.rs$" "examples/"

# Verify the example follows the same structure as other examples
fd --type f "_producer.rs$" "examples/" --exec echo "Found producer example: {}"

Length of output: 461


Script:

#!/bin/bash
# Let's first check if the examples directory exists and its location
fd -t d "examples"

# Then check for any transaction-related files in the repository
fd -t f "transaction.*\.rs$"

# List all Rust files to understand the example structure
fd -t f "\.rs$"

Length of output: 31904

rocketmq-remoting/src/protocol/header.rs (1)

23-23: LGTM! Verify the new module implementation.

The new module declaration is correctly placed and follows the established naming pattern.

Let's verify that the module file exists and contains the expected transaction-related header implementation:

✅ Verification successful

The new module is properly implemented and integrated

The verification confirms:

  • The module file exists at the correct location
  • The EndTransactionRequestHeader struct is properly implemented with all necessary fields and methods
  • The module is correctly integrated and used in the producer implementation for transaction message handling
  • The implementation follows the codebase's patterns by implementing required traits (CommandCustomHeader, FromMap)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the existence and implementation of the end_transaction_request_header module

# Test 1: Check if the module file exists
fd "end_transaction_request_header.rs" --type f

# Test 2: Verify the implementation includes transaction-related structures
rg -A 5 "EndTransactionRequestHeader" 

Length of output: 10953

rocketmq-client/src/producer/local_transaction_state.rs (3)

18-18: LGTM!

The import of Display trait is correctly added.


Line range hint 24-30: LGTM!

The enum definition follows Rust best practices with appropriate derive macros and a sensible default variant.


81-89: ⚠️ Potential issue

Consider standardizing the "UNKNOWN" spelling across the codebase.

The current implementation uses "UNKNOW" in the Display implementation, while the Deserialize implementation accepts both "UNKNOW" and "UNKNOWN". This inconsistency could lead to confusion:

  1. "UNKNOW" appears to be a typo of "UNKNOWN"
  2. The deserialize implementation suggests "UNKNOWN" is also valid
  3. The comment "java is UNKNOW" indicates this might be for compatibility

Consider either:

  1. Standardizing on "UNKNOWN" throughout the codebase:
-            LocalTransactionState::Unknown => write!(f, "UNKNOW"),
+            LocalTransactionState::Unknown => write!(f, "UNKNOWN"),
  1. Or documenting the reason for maintaining "UNKNOW" if it's required for Java compatibility.

Let's check if "UNKNOW" is used consistently across the codebase:

✅ Verification successful

Let me gather more context about the Java compatibility aspect since there's a comment suggesting this might be intentional.


Based on the gathered evidence, I can now provide the final response:

Document the Java compatibility requirement for "UNKNOW" spelling

The "UNKNOW" spelling appears to be intentionally maintained for Java compatibility, as indicated by the comment "//java is UNKNOW" in the enum definition. The current implementation correctly handles both spellings in deserialization while maintaining the Java-compatible "UNKNOW" in serialization.

Recommended actions:

  • Add a more detailed documentation comment explaining the Java compatibility requirement:
/// Represents the state of a local transaction.
/// Note: The `Unknown` variant serializes to "UNKNOW" (without 'N') 
/// to maintain compatibility with the Java implementation of RocketMQ.
pub enum LocalTransactionState {
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for both spellings to understand the convention
echo "Checking for 'UNKNOW' usage:"
rg "UNKNOW" --type rust

echo -e "\nChecking for 'UNKNOWN' usage:"
rg "UNKNOWN" --type rust

Length of output: 2943


Script:

#!/bin/bash
# Check for any Java-related compatibility code or comments
rg -B 2 -A 2 "java" --type rust

# Also check the specific enum implementation more thoroughly
ast-grep --pattern 'enum LocalTransactionState {
  $$$
}'

Length of output: 3174

rocketmq-common/src/common/message/message_decoder.rs (1)

18-18: LGTM! Import changes are well-organized.

The new imports are properly organized and necessary for the message ID decoding functionality.

Also applies to: 34-34, 39-39

rocketmq-client/src/producer/mq_producer.rs (1)

431-437: LGTM! The signature changes improve flexibility and safety.

The updated method signature introduces several improvements:

  • Generic type parameter allows passing any thread-safe type as transaction argument
  • Taking ownership of the message is more idiomatic in Rust
  • Mutability requirement ensures thread-safety

Let's verify that all implementations have been updated to match the new signature:

✅ Verification successful

All implementations correctly match the new signature

The search results show that all implementations of send_message_in_transaction have been properly updated to match the new signature:

  • transaction_mq_producer.rs: Implements with the new generic signature
  • default_mq_producer.rs: Implements with the new signature but explicitly unimplements the functionality
  • default_mq_producer_impl.rs: Internal implementation matches the new signature

The changes are consistent across the codebase and all implementations properly handle the breaking changes introduced in the trait.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for implementations of send_message_in_transaction
ast-grep --pattern 'impl $_ for $_ {
  $$$
  async fn send_message_in_transaction$_($_, $_, $_) {
    $$$
  }
  $$$
}'

Length of output: 120


Script:

#!/bin/bash
# Let's try a broader search for send_message_in_transaction implementations
rg "send_message_in_transaction" -A 5 -B 2

Length of output: 3988

rocketmq-client/src/producer/default_mq_producer.rs (2)

351-356: LGTM: Thread-safe producer implementation handling.

The implementation correctly manages thread-safe sharing of the producer implementation using ArcRefCellWrapper.


1005-1013: ⚠️ Potential issue

Implementation missing for transaction message support.

The PR aims to add transaction message support, but the send_message_in_transaction method is marked as unimplemented. This will cause runtime panics when called. Please implement the transaction message handling logic as per the PR objectives.

Let's verify if there are any transaction-related implementations in other files:

rocketmq-client/src/implementation/mq_client_api_impl.rs (1)

49-49: LGTM!

The import statement for EndTransactionRequestHeader is correctly placed and follows the existing import pattern.

rocketmq-client/src/factory/mq_client_instance.rs (3)

69-69: LGTM: Improved type safety and mutability handling.

The changes to field types enhance the implementation by:

  1. Using ArcRefCellWrapper for client_config enables thread-safe interior mutability
  2. Replacing boxed trait objects with concrete MQProducerInnerImpl improves type safety and performance

Also applies to: 76-76


Line range hint 329-338: LGTM: Thread-safe producer registration.

The implementation correctly handles producer registration with proper thread safety using RwLock and appropriate error handling.


1058-1061: LGTM: Clean producer selection implementation.

The new select_producer method provides thread-safe access to producers with proper ownership handling through cloning.

rocketmq-client/src/producer/producer_impl/mq_producer_inner.rs (2)

67-72: ⚠️ Potential issue

Avoid using unreachable!() macro in production code.

In the get_check_listener method, using unreachable!() may lead to runtime panics if default_mqproducer_impl_inner is None. This could cause the application to crash unexpectedly.

Consider handling the None case gracefully or ensuring that default_mqproducer_impl_inner cannot be None:

pub fn get_check_listener(&self) -> Arc<Box<dyn TransactionListener>> {
    if let Some(default_mqproducer_impl_inner) = &self.default_mqproducer_impl_inner {
        return default_mqproducer_impl_inner.get_check_listener();
    }
-   unreachable!("default_mqproducer_impl_inner is None")
+   panic!("default_mqproducer_impl_inner should not be None");
}

Alternatively, remove the Option wrapper as suggested earlier.

Likely invalid or redundant comment.


85-89: Ensure mutable access is properly handled.

The method update_topic_publish_info takes &mut self, but the borrowing of default_mqproducer_impl_inner should ensure mutable access correctly. Verify that there are no data races or borrowing issues.

Run the following script to check for any borrowing conflicts:

✅ Verification successful

Let me gather more information about the implementation and usage of TopicPublishInfo to ensure thread safety and proper mutation.


Let me analyze the mutable access pattern by checking the implementation of TopicPublishInfo and its usage.


Based on the code analysis, I can now provide the final response:

The mutable access is properly handled and safe.

The TopicPublishInfo struct is designed to be thread-safe through:

  1. #[derive(Clone)] implementation allowing safe copying
  2. Storage in an Arc<RwLock<HashMap>> in the producer implementation, providing synchronized access
  3. Proper mutable access through RwLock when updating via update_topic_publish_info

The implementation follows Rust's ownership and borrowing rules correctly, with synchronized access patterns preventing data races.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for mutable borrowing issues in update_topic_publish_info method.

# Expect: No error messages about borrowing issues.
rg 'update_topic_publish_info' -A 5

Length of output: 2754


Script:

#!/bin/bash
# Check TopicPublishInfo implementation and usage
ast-grep --pattern 'struct TopicPublishInfo {
  $$$
}'

# Check for any concurrent access patterns
rg "topic_publish_info" -B 2 -A 2

# Check for any Arc/Mutex usage with TopicPublishInfo
rg "Arc|Mutex.*TopicPublishInfo" -B 1 -A 1

Length of output: 383290


Script:

#!/bin/bash
# Check TopicPublishInfo implementation and usage
ast-grep --pattern 'struct TopicPublishInfo {
  $$$
}'

# Check for any concurrent access patterns in TopicPublishInfo
rg "TopicPublishInfo" -A 5 -B 2 rocketmq-client/src/producer/producer_impl/topic_publish_info.rs

Length of output: 696

rocketmq-client/src/implementation/client_remoting_processor.rs (1)

213-271: check_transaction_state method implementation looks correct

The check_transaction_state method is implemented properly, with appropriate logic for handling transaction state checks.

rocketmq-client/src/producer/producer_impl/default_mq_producer_impl.rs (3)

1861-1906: Handle possible None values for transaction IDs

When retrieving and setting the transaction ID from message properties, ensure that transaction_id is available to prevent potential issues.

Confirm that transaction_id is always present in message properties or handle the None case appropriately.


2078-2081: Handle potential None values when retrieving unique message ID

When retrieving unique_key from message properties, if it is None, you fall back to msg.msg_id.clone(). Ensure that msg.msg_id is always set or handle the case where both values could be None to prevent issues.

Add checks or defaults to handle cases where both unique_key and msg.msg_id might be missing.


1973-2002: 🛠️ Refactor suggestion

Ensure end_transaction_hook_list is properly synchronized

The end_transaction_hook_list may be accessed concurrently without synchronization. Consider wrapping it with a thread-safe structure like Arc<Mutex<...>> to prevent data races.

Check if synchronization is necessary and implement appropriate thread safety mechanisms.

Comment on lines +21 to +29
pub struct EndTransactionContext<'a> {
pub producer_group: String,
pub broker_addr: String,
pub message: &'a Message,
pub msg_id: String,
pub transaction_id: String,
pub transaction_state: LocalTransactionState,
pub from_transaction_check: bool,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider encapsulating fields and adding validation.

The struct currently exposes all fields as public, which might lead to invalid state mutations. Consider implementing proper encapsulation and validation.

Here's a suggested implementation with better encapsulation and validation:

/// Represents the context for ending a transaction in RocketMQ.
pub struct EndTransactionContext<'a> {
    producer_group: String,
    broker_addr: String,
    message: &'a Message,
    msg_id: String,
    transaction_id: String,
    transaction_state: LocalTransactionState,
    from_transaction_check: bool,
}

impl<'a> EndTransactionContext<'a> {
    /// Creates a new EndTransactionContext with validation
    pub fn new(
        producer_group: String,
        broker_addr: String,
        message: &'a Message,
        msg_id: String,
        transaction_id: String,
        transaction_state: LocalTransactionState,
        from_transaction_check: bool,
    ) -> Result<Self, &'static str> {
        // Validate inputs
        if producer_group.is_empty() {
            return Err("producer_group cannot be empty");
        }
        if broker_addr.is_empty() {
            return Err("broker_addr cannot be empty");
        }
        if msg_id.is_empty() {
            return Err("msg_id cannot be empty");
        }
        if transaction_id.is_empty() {
            return Err("transaction_id cannot be empty");
        }

        Ok(Self {
            producer_group,
            broker_addr,
            message,
            msg_id,
            transaction_id,
            transaction_state,
            from_transaction_check,
        })
    }

    // Getters
    pub fn producer_group(&self) -> &str {
        &self.producer_group
    }

    pub fn broker_addr(&self) -> &str {
        &self.broker_addr
    }

    pub fn message(&self) -> &Message {
        self.message
    }

    pub fn msg_id(&self) -> &str {
        &self.msg_id
    }

    pub fn transaction_id(&self) -> &str {
        &self.transaction_id
    }

    pub fn transaction_state(&self) -> LocalTransactionState {
        self.transaction_state
    }

    pub fn from_transaction_check(&self) -> bool {
        self.from_transaction_check
    }

    // Setter for mutable state only
    pub fn set_transaction_state(&mut self, state: LocalTransactionState) {
        self.transaction_state = state;
    }
}

Comment on lines +524 to +542
pub fn decode_message_id(msg_id: &str) -> MessageId {
let bytes = util_all::string_to_bytes(msg_id).unwrap();
let mut buffer = Bytes::from(bytes);
let len = if msg_id.len() == 32 {
let mut ip = [0u8; 4];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16)
} else {
let mut ip = [0u8; 16];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
};
MessageId {
address: len,
offset: buffer.get_i64(),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add input validation and improve variable naming.

The function handles both IPv4 and IPv6 addresses well, but could be improved in two areas:

  1. There's no validation of the input string length before processing
  2. The variable name len is misleading as it stores a socket address

Apply these improvements:

 pub fn decode_message_id(msg_id: &str) -> MessageId {
+    // Message ID should be either 32 chars (IPv4) or 56 chars (IPv6)
+    if msg_id.len() != 32 && msg_id.len() != 56 {
+        panic!("Invalid message ID length: {}", msg_id.len());
+    }
     let bytes = util_all::string_to_bytes(msg_id).unwrap();
     let mut buffer = Bytes::from(bytes);
-    let len = if msg_id.len() == 32 {
+    let address = if msg_id.len() == 32 {
         let mut ip = [0u8; 4];
         buffer.copy_to_slice(&mut ip);
         let port = buffer.get_i32();
         SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16)
     } else {
         let mut ip = [0u8; 16];
         buffer.copy_to_slice(&mut ip);
         let port = buffer.get_i32();
         SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
     };
     MessageId {
-        address: len,
+        address,
         offset: buffer.get_i64(),
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn decode_message_id(msg_id: &str) -> MessageId {
let bytes = util_all::string_to_bytes(msg_id).unwrap();
let mut buffer = Bytes::from(bytes);
let len = if msg_id.len() == 32 {
let mut ip = [0u8; 4];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16)
} else {
let mut ip = [0u8; 16];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
};
MessageId {
address: len,
offset: buffer.get_i64(),
}
}
pub fn decode_message_id(msg_id: &str) -> MessageId {
// Message ID should be either 32 chars (IPv4) or 56 chars (IPv6)
if msg_id.len() != 32 && msg_id.len() != 56 {
panic!("Invalid message ID length: {}", msg_id.len());
}
let bytes = util_all::string_to_bytes(msg_id).unwrap();
let mut buffer = Bytes::from(bytes);
let address = if msg_id.len() == 32 {
let mut ip = [0u8; 4];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V4(Ipv4Addr::from(ip)), port as u16)
} else {
let mut ip = [0u8; 16];
buffer.copy_to_slice(&mut ip);
let port = buffer.get_i32();
SocketAddr::new(IpAddr::V6(Ipv6Addr::from(ip)), port as u16)
};
MessageId {
address,
offset: buffer.get_i64(),
}
}

Comment on lines +1058 to +1061
pub async fn select_producer(&self, group: &str) -> Option<MQProducerInnerImpl> {
let producer_table = self.producer_table.read().await;
producer_table.get(group).cloned()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the gathered information, I can now provide the final response:

Transaction producer cleanup needs implementation

The verification reveals incomplete producer cleanup implementation:

  • unregister_producer is properly implemented to handle producer table cleanup
  • However, transaction producer's shutdown() is unimplemented (marked with unimplemented!())
  • Default producer's shutdown only handles accumulator and trace dispatcher cleanup
🔗 Analysis chain

Verify producer cleanup in transaction scenarios.

Since this is part of transaction message support, ensure that producers are properly cleaned up after transaction completion or failure.


Let me gather more information about the producer cleanup implementation and transaction handling.


Let me gather more specific information about producer cleanup in the unregister flow.


Let me check one final aspect about producer cleanup in the implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for producer cleanup in transaction handling
rg -A 5 "unregister_producer|producer_table.remove" 

Length of output: 1583


Script:

#!/bin/bash
# Search for producer cleanup in transaction context
rg -A 10 "transaction.*producer|producer.*transaction" 

# Search for producer table modifications
ast-grep --pattern 'self.producer_table.$_($$$)'

# Search for producer cleanup in client implementation
rg -A 10 "unregister_client.*producer.*table|producer.*table.*remove"

Length of output: 22008


Script:

#!/bin/bash
# Search for producer table cleanup in unregister_client implementation
ast-grep --pattern 'fn unregister_client($$$) {
  $$$
}'

# Search for producer cleanup in client shutdown
rg -A 10 "shutdown.*producer|producer.*shutdown"

Length of output: 3022


Script:

#!/bin/bash
# Search for producer cleanup in shutdown implementation
ast-grep --pattern 'async fn shutdown($$$) {
  $$$
}'

# Search for producer table cleanup
rg -A 5 "producer_table.*clear|producer_table.*remove"

Length of output: 3087

use std::sync::Arc;

use parking_lot::Mutex;
use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Import TransactionMQProducer for transactional messaging

Since this example is intended to demonstrate transactional message sending, you should import TransactionMQProducer instead of DefaultMQProducer to enable transactional functionality.

Apply this diff to import TransactionMQProducer:

-use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
+use rocketmq_client::producer::transaction_mq_producer::TransactionMQProducer;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
use rocketmq_client::producer::default_mq_producer::DefaultMQProducer;
use rocketmq_client::producer::transaction_mq_producer::TransactionMQProducer;

Comment on lines +47 to +50
let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use TransactionMQProducer instead of DefaultMQProducer for transactional messaging

To properly handle transactional messages, you should instantiate a TransactionMQProducer rather than a DefaultMQProducer. This ensures that the producer is configured correctly for transactional operations.

Apply this diff to use TransactionMQProducer and attach the transaction listener:

-let builder = DefaultMQProducer::builder();
+let builder = TransactionMQProducer::builder();

 let mut producer = builder
+    .transaction_listener(Arc::new(TransactionListenerImpl::default()))
     .producer_group(PRODUCER_GROUP.to_string())
     .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
     .build();

Committable suggestion was skipped due to low confidence.

Comment on lines +2065 to +2132
broker_addr: &str,
msg: MessageExt,
check_request_header: CheckTransactionStateRequestHeader,
) {
todo!()
let transaction_listener = self.transaction_listener.clone().unwrap();
let mut producer_impl_inner = self.default_mqproducer_impl_inner.clone().unwrap();
let broker_addr = broker_addr.to_string();
self.check_runtime
.as_ref()
.unwrap()
.get_handle()
.spawn(async move {
let mut unique_key =
msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if unique_key.is_none() {
unique_key = Some(msg.msg_id.clone());
}
let transaction_state = transaction_listener.check_local_transaction(&msg);
let request_header = EndTransactionRequestHeader {
topic: check_request_header.topic.clone().unwrap_or_default(),
producer_group: producer_impl_inner
.producer_config
.producer_group()
.to_string(),
tran_state_table_offset: check_request_header.commit_log_offset as u64,
commit_log_offset: check_request_header.commit_log_offset as u64,
commit_or_rollback: match transaction_state {
LocalTransactionState::CommitMessage => {
MessageSysFlag::TRANSACTION_COMMIT_TYPE
}
LocalTransactionState::RollbackMessage => {
MessageSysFlag::TRANSACTION_ROLLBACK_TYPE
}
LocalTransactionState::Unknown => MessageSysFlag::TRANSACTION_NOT_TYPE,
},
from_transaction_check: true,
msg_id: unique_key.clone().unwrap_or_default(),
transaction_id: check_request_header.transaction_id.clone(),
rpc_request_header: RpcRequestHeader {
broker_name: check_request_header
.rpc_request_header
.unwrap_or_default()
.broker_name,
..Default::default()
},
};
producer_impl_inner.do_execute_end_transaction_hook(
&msg.message,
unique_key.as_ref().unwrap(),
broker_addr.as_str(),
transaction_state,
true,
);
let _ = producer_impl_inner
.client_instance
.as_mut()
.unwrap()
.mq_client_api_impl
.as_mut()
.unwrap()
.end_transaction_oneway(
broker_addr.as_str(),
request_header,
"".to_string(),
3000,
)
.await;
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent panic by safely accessing check_runtime

In check_transaction_state, accessing self.check_runtime.as_ref().unwrap() can cause a panic if check_runtime is None. Safely handle the Option to avoid unexpected panics.

Update the code to check for None:

- self.check_runtime
-     .as_ref()
-     .unwrap()
-     .get_handle()
-     .spawn(async move {
+ if let Some(runtime) = &self.check_runtime {
+     runtime.get_handle().spawn(async move {
      // ... existing code ...
+     });
+ } else {
+     warn!("Check runtime is not initialized.");
+     // Handle the error as appropriate.
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
broker_addr: &str,
msg: MessageExt,
check_request_header: CheckTransactionStateRequestHeader,
) {
todo!()
let transaction_listener = self.transaction_listener.clone().unwrap();
let mut producer_impl_inner = self.default_mqproducer_impl_inner.clone().unwrap();
let broker_addr = broker_addr.to_string();
self.check_runtime
.as_ref()
.unwrap()
.get_handle()
.spawn(async move {
let mut unique_key =
msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if unique_key.is_none() {
unique_key = Some(msg.msg_id.clone());
}
let transaction_state = transaction_listener.check_local_transaction(&msg);
let request_header = EndTransactionRequestHeader {
topic: check_request_header.topic.clone().unwrap_or_default(),
producer_group: producer_impl_inner
.producer_config
.producer_group()
.to_string(),
tran_state_table_offset: check_request_header.commit_log_offset as u64,
commit_log_offset: check_request_header.commit_log_offset as u64,
commit_or_rollback: match transaction_state {
LocalTransactionState::CommitMessage => {
MessageSysFlag::TRANSACTION_COMMIT_TYPE
}
LocalTransactionState::RollbackMessage => {
MessageSysFlag::TRANSACTION_ROLLBACK_TYPE
}
LocalTransactionState::Unknown => MessageSysFlag::TRANSACTION_NOT_TYPE,
},
from_transaction_check: true,
msg_id: unique_key.clone().unwrap_or_default(),
transaction_id: check_request_header.transaction_id.clone(),
rpc_request_header: RpcRequestHeader {
broker_name: check_request_header
.rpc_request_header
.unwrap_or_default()
.broker_name,
..Default::default()
},
};
producer_impl_inner.do_execute_end_transaction_hook(
&msg.message,
unique_key.as_ref().unwrap(),
broker_addr.as_str(),
transaction_state,
true,
);
let _ = producer_impl_inner
.client_instance
.as_mut()
.unwrap()
.mq_client_api_impl
.as_mut()
.unwrap()
.end_transaction_oneway(
broker_addr.as_str(),
request_header,
"".to_string(),
3000,
)
.await;
});
broker_addr: &str,
msg: MessageExt,
check_request_header: CheckTransactionStateRequestHeader,
) {
let transaction_listener = self.transaction_listener.clone().unwrap();
let mut producer_impl_inner = self.default_mqproducer_impl_inner.clone().unwrap();
let broker_addr = broker_addr.to_string();
if let Some(runtime) = &self.check_runtime {
runtime.get_handle().spawn(async move {
let mut unique_key =
msg.get_property(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if unique_key.is_none() {
unique_key = Some(msg.msg_id.clone());
}
let transaction_state = transaction_listener.check_local_transaction(&msg);
let request_header = EndTransactionRequestHeader {
topic: check_request_header.topic.clone().unwrap_or_default(),
producer_group: producer_impl_inner
.producer_config
.producer_group()
.to_string(),
tran_state_table_offset: check_request_header.commit_log_offset as u64,
commit_log_offset: check_request_header.commit_log_offset as u64,
commit_or_rollback: match transaction_state {
LocalTransactionState::CommitMessage => {
MessageSysFlag::TRANSACTION_COMMIT_TYPE
}
LocalTransactionState::RollbackMessage => {
MessageSysFlag::TRANSACTION_ROLLBACK_TYPE
}
LocalTransactionState::Unknown => MessageSysFlag::TRANSACTION_NOT_TYPE,
},
from_transaction_check: true,
msg_id: unique_key.clone().unwrap_or_default(),
transaction_id: check_request_header.transaction_id.clone(),
rpc_request_header: RpcRequestHeader {
broker_name: check_request_header
.rpc_request_header
.unwrap_or_default()
.broker_name,
..Default::default()
},
};
producer_impl_inner.do_execute_end_transaction_hook(
&msg.message,
unique_key.as_ref().unwrap(),
broker_addr.as_str(),
transaction_state,
true,
);
let _ = producer_impl_inner
.client_instance
.as_mut()
.unwrap()
.mq_client_api_impl
.as_mut()
.unwrap()
.end_transaction_oneway(
broker_addr.as_str(),
request_header,
"".to_string(),
3000,
)
.await;
});
} else {
warn!("Check runtime is not initialized.");
// Handle the error as appropriate.
}

Comment on lines +42 to +51
impl EndTransactionRequestHeader {
pub const TOPIC: &'static str = "topic";
pub const PRODUCER_GROUP: &'static str = "producerGroup";
pub const TRAN_STATE_TABLE_OFFSET: &'static str = "tranStateTableOffset";
pub const COMMIT_LOG_OFFSET: &'static str = "commitLogOffset";
pub const COMMIT_OR_ROLLBACK: &'static str = "commitOrRollback";
pub const FROM_TRANSACTION_CHECK: &'static str = "fromTransactionCheck";
pub const MSG_ID: &'static str = "msgId";
pub const TRANSACTION_ID: &'static str = "transactionId";
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using an enum for constant field names

The constant field names are defined as pub const strings. Using an enum with &'static str variants can improve code organization and prevent typos.

Example:

pub enum Field {
    Topic,
    ProducerGroup,
    TranStateTableOffset,
    CommitLogOffset,
    CommitOrRollback,
    FromTransactionCheck,
    MsgId,
    TransactionId,
}

impl Field {
    pub fn as_str(&self) -> &'static str {
        match self {
            Field::Topic => "topic",
            Field::ProducerGroup => "producerGroup",
            Field::TranStateTableOffset => "tranStateTableOffset",
            Field::CommitLogOffset => "commitLogOffset",
            Field::CommitOrRollback => "commitOrRollback",
            Field::FromTransactionCheck => "fromTransactionCheck",
            Field::MsgId => "msgId",
            Field::TransactionId => "transactionId",
        }
    }
}

Adjust the code to use Field::Topic.as_str() wherever needed.

Comment on lines +87 to +98
fn check_fields(&self) -> anyhow::Result<(), Error> {
if MessageSysFlag::TRANSACTION_NOT_TYPE == self.commit_or_rollback {
return Ok(());
}
if MessageSysFlag::TRANSACTION_COMMIT_TYPE == self.commit_or_rollback {
return Ok(());
}
if MessageSysFlag::TRANSACTION_ROLLBACK_TYPE == self.commit_or_rollback {
return Ok(());
}
Err(anyhow!("commitOrRollback field wrong"))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance error handling in check_fields method

The check_fields method defaults to an error message "commitOrRollback field wrong" when commit_or_rollback does not match any of the expected types. To improve debugging and clarity, consider including the invalid value in the error message.

Apply this diff to enhance the error message:

 fn check_fields(&self) -> anyhow::Result<(), Error> {
     if MessageSysFlag::TRANSACTION_NOT_TYPE == self.commit_or_rollback {
         return Ok(());
     }
     if MessageSysFlag::TRANSACTION_COMMIT_TYPE == self.commit_or_rollback {
         return Ok(());
     }
     if MessageSysFlag::TRANSACTION_ROLLBACK_TYPE == self.commit_or_rollback {
         return Ok(());
     }
-    Err(anyhow!("commitOrRollback field wrong"))
+    Err(anyhow!("Invalid commitOrRollback value: {}", self.commit_or_rollback))
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn check_fields(&self) -> anyhow::Result<(), Error> {
if MessageSysFlag::TRANSACTION_NOT_TYPE == self.commit_or_rollback {
return Ok(());
}
if MessageSysFlag::TRANSACTION_COMMIT_TYPE == self.commit_or_rollback {
return Ok(());
}
if MessageSysFlag::TRANSACTION_ROLLBACK_TYPE == self.commit_or_rollback {
return Ok(());
}
Err(anyhow!("commitOrRollback field wrong"))
}
fn check_fields(&self) -> anyhow::Result<(), Error> {
if MessageSysFlag::TRANSACTION_NOT_TYPE == self.commit_or_rollback {
return Ok(());
}
if MessageSysFlag::TRANSACTION_COMMIT_TYPE == self.commit_or_rollback {
return Ok(());
}
if MessageSysFlag::TRANSACTION_ROLLBACK_TYPE == self.commit_or_rollback {
return Ok(());
}
Err(anyhow!("Invalid commitOrRollback value: {}", self.commit_or_rollback))
}

Comment on lines +54 to +85
fn to_map(&self) -> Option<std::collections::HashMap<String, String>> {
let mut map = std::collections::HashMap::new();
map.insert(Self::TOPIC.to_string(), self.topic.clone());
map.insert(
Self::PRODUCER_GROUP.to_string(),
self.producer_group.clone(),
);
map.insert(
Self::TRAN_STATE_TABLE_OFFSET.to_string(),
self.tran_state_table_offset.to_string(),
);
map.insert(
Self::COMMIT_LOG_OFFSET.to_string(),
self.commit_log_offset.to_string(),
);
map.insert(
Self::COMMIT_OR_ROLLBACK.to_string(),
self.commit_or_rollback.to_string(),
);
map.insert(
Self::FROM_TRANSACTION_CHECK.to_string(),
self.from_transaction_check.to_string(),
);
map.insert(Self::MSG_ID.to_string(), self.msg_id.clone());
if let Some(value) = self.transaction_id.as_ref() {
map.insert(Self::TRANSACTION_ID.to_string(), value.clone());
}
if let Some(value) = self.rpc_request_header.to_map() {
map.extend(value);
}
Some(map)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify to_map method using a serializer

Manually constructing the map in to_map is error-prone and verbose. Consider using serde_json::to_value followed by converting to a map, which leverages serialization attributes and ensures consistency.

Example:

 fn to_map(&self) -> Option<std::collections::HashMap<String, String>> {
-    let mut map = std::collections::HashMap::new();
-    map.insert(Self::TOPIC.to_string(), self.topic.clone());
-    // ... (other inserts)
-    if let Some(value) = self.rpc_request_header.to_map() {
-        map.extend(value);
-    }
-    Some(map)
+    let value = serde_json::to_value(self).ok()?;
+    let map = serde_json::from_value(value).ok()?;
+    Some(map)
 }

This approach reduces the potential for errors and keeps the method concise.

Committable suggestion was skipped due to low confidence.

Comment on lines +104 to +139
fn from(map: &std::collections::HashMap<String, String>) -> Option<Self::Target> {
Some(EndTransactionRequestHeader {
topic: map
.get(EndTransactionRequestHeader::TOPIC)
.cloned()
.unwrap_or_default(),
producer_group: map
.get(EndTransactionRequestHeader::PRODUCER_GROUP)
.cloned()
.unwrap_or_default(),
tran_state_table_offset: map
.get(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or_default(),
commit_log_offset: map
.get(EndTransactionRequestHeader::COMMIT_LOG_OFFSET)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or_default(),
commit_or_rollback: map
.get(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK)
.and_then(|s| s.parse::<i32>().ok())
.unwrap_or_default(),
from_transaction_check: map
.get(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK)
.and_then(|s| s.parse::<bool>().ok())
.unwrap_or_default(),
msg_id: map
.get(EndTransactionRequestHeader::MSG_ID)
.cloned()
.unwrap_or_default(),
transaction_id: map
.get(EndTransactionRequestHeader::TRANSACTION_ID)
.cloned(),
rpc_request_header: <RpcRequestHeader as FromMap>::from(map).unwrap_or_default(),
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid defaulting to zero on parse failure in from method

In the from method of the FromMap implementation, numeric fields default to zero when parsing fails. This might mask issues where invalid data is provided. It's safer to propagate the error or return None if parsing fails to prevent unintended behavior due to default values.

Apply this diff to handle parsing errors appropriately:

 fn from(map: &std::collections::HashMap<String, String>) -> Option<Self::Target> {
     Some(EndTransactionRequestHeader {
         topic: map
             .get(EndTransactionRequestHeader::TOPIC)
             .cloned()
             .unwrap_or_default(),
         producer_group: map
             .get(EndTransactionRequestHeader::PRODUCER_GROUP)
             .cloned()
             .unwrap_or_default(),
         tran_state_table_offset: map
             .get(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET)
-            .and_then(|s| s.parse::<u64>().ok())
-            .unwrap_or_default(),
+            .and_then(|s| s.parse::<u64>().ok())?,
         commit_log_offset: map
             .get(EndTransactionRequestHeader::COMMIT_LOG_OFFSET)
-            .and_then(|s| s.parse::<u64>().ok())
-            .unwrap_or_default(),
+            .and_then(|s| s.parse::<u64>().ok())?,
         commit_or_rollback: map
             .get(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK)
-            .and_then(|s| s.parse::<i32>().ok())
-            .unwrap_or_default(),
+            .and_then(|s| s.parse::<i32>().ok())?,
         from_transaction_check: map
             .get(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK)
-            .and_then(|s| s.parse::<bool>().ok())
-            .unwrap_or_default(),
+            .and_then(|s| s.parse::<bool>().ok())?,
         msg_id: map
             .get(EndTransactionRequestHeader::MSG_ID)
             .cloned()
             .unwrap_or_default(),
         transaction_id: map
             .get(EndTransactionRequestHeader::TRANSACTION_ID)
             .cloned(),
         rpc_request_header: <RpcRequestHeader as FromMap>::from(map).unwrap_or_default(),
     })
 }

This change will cause the method to return None if parsing fails, making it explicit when invalid data is encountered.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn from(map: &std::collections::HashMap<String, String>) -> Option<Self::Target> {
Some(EndTransactionRequestHeader {
topic: map
.get(EndTransactionRequestHeader::TOPIC)
.cloned()
.unwrap_or_default(),
producer_group: map
.get(EndTransactionRequestHeader::PRODUCER_GROUP)
.cloned()
.unwrap_or_default(),
tran_state_table_offset: map
.get(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or_default(),
commit_log_offset: map
.get(EndTransactionRequestHeader::COMMIT_LOG_OFFSET)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or_default(),
commit_or_rollback: map
.get(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK)
.and_then(|s| s.parse::<i32>().ok())
.unwrap_or_default(),
from_transaction_check: map
.get(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK)
.and_then(|s| s.parse::<bool>().ok())
.unwrap_or_default(),
msg_id: map
.get(EndTransactionRequestHeader::MSG_ID)
.cloned()
.unwrap_or_default(),
transaction_id: map
.get(EndTransactionRequestHeader::TRANSACTION_ID)
.cloned(),
rpc_request_header: <RpcRequestHeader as FromMap>::from(map).unwrap_or_default(),
})
}
fn from(map: &std::collections::HashMap<String, String>) -> Option<Self::Target> {
Some(EndTransactionRequestHeader {
topic: map
.get(EndTransactionRequestHeader::TOPIC)
.cloned()
.unwrap_or_default(),
producer_group: map
.get(EndTransactionRequestHeader::PRODUCER_GROUP)
.cloned()
.unwrap_or_default(),
tran_state_table_offset: map
.get(EndTransactionRequestHeader::TRAN_STATE_TABLE_OFFSET)
.and_then(|s| s.parse::<u64>().ok())?,
commit_log_offset: map
.get(EndTransactionRequestHeader::COMMIT_LOG_OFFSET)
.and_then(|s| s.parse::<u64>().ok())?,
commit_or_rollback: map
.get(EndTransactionRequestHeader::COMMIT_OR_ROLLBACK)
.and_then(|s| s.parse::<i32>().ok())?,
from_transaction_check: map
.get(EndTransactionRequestHeader::FROM_TRANSACTION_CHECK)
.and_then(|s| s.parse::<bool>().ok())?,
msg_id: map
.get(EndTransactionRequestHeader::MSG_ID)
.cloned()
.unwrap_or_default(),
transaction_id: map
.get(EndTransactionRequestHeader::TRANSACTION_ID)
.cloned(),
rpc_request_header: <RpcRequestHeader as FromMap>::from(map).unwrap_or_default(),
})
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature🚀] Support send transaction message for client
1 participant