Skip to content

Commit

Permalink
Merge pull request #52 from tony-p/fix/delayed-subscription
Browse files Browse the repository at this point in the history
Fix delayed subscription missing first message and change topic name
  • Loading branch information
facontidavide authored Apr 12, 2024
2 parents b82d850 + 47d60b9 commit 57fbeee
Showing 1 changed file with 29 additions and 4 deletions.
33 changes: 29 additions & 4 deletions behaviortree_ros2/include/behaviortree_ros2/bt_topic_sub_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class RosTopicSubNode : public BT::ConditionNode
// The callback will broadcast to all the instances of RosTopicSubNode<T>
auto callback = [this](const std::shared_ptr<TopicT> msg)
{
last_msg = msg;
broadcaster(msg);
};
subscriber = node->create_subscription<TopicT>(topic_name, 1, callback, option);
Expand All @@ -75,6 +76,7 @@ class RosTopicSubNode : public BT::ConditionNode
rclcpp::CallbackGroup::SharedPtr callback_group;
rclcpp::executors::SingleThreadedExecutor callback_group_executor;
boost::signals2::signal<void (const std::shared_ptr<TopicT>)> broadcaster;
std::shared_ptr<TopicT> last_msg;


};
Expand Down Expand Up @@ -173,6 +175,16 @@ class RosTopicSubNode : public BT::ConditionNode
*/
virtual NodeStatus onTick(const std::shared_ptr<TopicT>& last_msg) = 0;

/** latch the message that has been processed. If returns false and no new message is
* received, before next call there will be no message to process. If returns true,
* the next call will process the same message again, if no new message received.
*
* This can be equated with latched vs non-latched topics in ros 1.
*
* @return false will clear the message after ticking/processing.
*/
virtual bool latchLastMessage() const { return false; }

private:

bool createSubscriber(const std::string& topic_name);
Expand Down Expand Up @@ -260,6 +272,11 @@ template<class T> inline
sub_instance_ = it->second;
}

// Check if there was a message received before the creation of this subscriber action
if (sub_instance_->last_msg)
{
last_msg_ = sub_instance_->last_msg;
}

// add "this" as received of the broadcaster
signal_connection_ = sub_instance_->broadcaster.connect(
Expand All @@ -276,12 +293,18 @@ template<class T> inline
// First, check if the subscriber_ is valid and that the name of the
// topic_name in the port didn't change.
// otherwise, create a new subscriber
std::string topic_name;
getInput("topic_name", topic_name);

if(!sub_instance_)
{
std::string topic_name;
getInput("topic_name", topic_name);
createSubscriber(topic_name);
}
else if(topic_name_ != topic_name)
{
sub_instance_.reset();
createSubscriber(topic_name);
}

auto CheckStatus = [](NodeStatus status)
{
Expand All @@ -294,8 +317,10 @@ template<class T> inline
};
sub_instance_->callback_group_executor.spin_some();
auto status = CheckStatus (onTick(last_msg_));
last_msg_ = nullptr;

if (!latchLastMessage())
{
last_msg_.reset();
}
return status;
}

Expand Down

0 comments on commit 57fbeee

Please sign in to comment.