Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Presentation publisher and subscriber #378

Merged
merged 64 commits into from
Aug 24, 2024
Merged

Conversation

serges147
Copy link
Collaborator

@serges147 serges147 commented Aug 16, 2024

  • Added Publisher api and implementation.
  • Added Subscriber api and implementation.
  • Eliminated setSendTimeout at transport::ITxSession.
  • Refactored a bit various transport metadata types.
  • Corresponding unit tests and coverage; added DSDL generation at unit tests as well.

Also:

  • latest CETL
  • fixed some typos
  • latest toolshed 4.22.10

Coverage:
image

# Conflicts:
#	docs/examples/platform/posix/posix_single_threaded_executor.hpp
#	test/unittest/sonar.cpp
…ted `ITxSession::setSendTimeout` method #verification #docs #sonar
@serges147 serges147 self-assigned this Aug 16, 2024
@serges147 serges147 changed the title First draft of presentation publisher Presentation publisher and subscriber Aug 21, 2024
@serges147 serges147 marked this pull request as ready for review August 21, 2024 13:42
include/libcyphal/presentation/publisher.hpp Outdated Show resolved Hide resolved

// TODO: docs
template <typename Message>
class Subscriber final : public detail::SubscriberBase
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing peek() and pop() described in the design document. Unless the user opted to use a callback, the Subscriber should act as a one-element-deep queue that can be read using the aforementioned methods. Eventually we could introduce configurable queue size if there's a need for that.

Copy link
Collaborator Author

@serges147 serges147 Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it on purpose! I propose to have instead configurable adapters, which you can very easy plug into Subscriber's "on receive" callback. Idea is that...

  1. Subscriber never stores/caches messages, but only delivers them via callback
  2. Plugged adapter can implement whatever storage policy user wants - this is where we will have/provide peek and pop APIs if user wants messages to be cached.
  3. I believe it's more clear and flexible design, by separating concerns (subscription vs storage/caching).
  4. Parametrization (which we are trying to avoid when possible, right?) of the storage/caching policy won't touch Subscriber-related stuff - it will stay with minimal "evil" <Message> template parameter only.

Copy link
Collaborator Author

@serges147 serges147 Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I thought that maybe even transport::IXxxRxSession-s should not have receive alternative... but taking into consideration that it's kinda low level api, and the fact that these rx sessions are in single instance per port id, I decided to keep both ways to get raw messages (although they are mutually exclusive).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it going to affect the user API significantly? I can't quite imagine what your proposal should look like. Ok we should talk about this tomorrow.

Copy link
Collaborator Author

@serges147 serges147 Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here an example of what I mean:

// Subscription
auto heartbeat_storage = MessageStorage<NodeHelpers::Heartbeat::Message, 1>{};
auto heartbeat_subscriber = NodeHelpers::Heartbeat::makeSubscriber(presentation);
heartbeat_subscriber->setOnReceiveCallback(std::ref(heartbeat_storage));
...
// Main loop.
//
CommonHelpers::runMainLoop(executor_, startup_time_ + run_duration_ + 500ms, [&](const auto now) {
    //
    if (auto heartbeat = heartbeat_storage.pop())
    {
        NodeHelpers::Heartbeat::print(now - startup_time_, std::get<0>(*heartbeat), std::get<1>(*heartbeat));
    }

    // or a bit more verbose with both peek and pop
    if (const auto& heartbeat = heartbeat_storage.peek())
    {
        NodeHelpers::Heartbeat::print(now - startup_time_, std::get<0>(*heartbeat), std::get<1>(*heartbeat));
        heartbeat_storage.pop();
    }
});

And message storage could be something like this:

template <typename Message, std::size_t Size>
class MessageStorage;
template <typename Message>
class MessageStorage<Message, 1>  // `MessageStorage` name is TBD
{
public:
    using MessageAndMetadata = std::tuple<Message, MessageRxMetadata>;

    cetl::optional<MessageAndMetadata> pop() noexcept
    {
        return std::exchange(last_message_, cetl::nullopt);
    }

    const cetl::optional<MessageAndMetadata>& peek() const noexcept
    {
        return last_message_;
    }

    void operator()(const typename Subscriber<Message>::OnReceiveCallback::Arg& arg)
    {
        last_message_.emplace(arg.message, arg.metadata);
    }

private:
    cetl::optional<MessageAndMetadata> last_message_;

}; // MessageStorage

or some kind of "ring" buffer for Size > 1 specialization.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, other but similar MessageStorage could potentially have multi-threading protection inside if it faces multiple threads or MCU cores. In such way it will be possible to continue "run" our libcyphal on a single-threaded executor, but potentially consume messages (via peek/pop) from other threads/cores. It is just an idea/example - my main point again about concerns separation, in this case subscription vs concurrency.

include/libcyphal/presentation/subscriber.hpp Show resolved Hide resolved
{
// This is safe downcast b/c we know that the `curr_node` is of type `Subscriber`.
// Otherwise, the `deserializer_` would be different from `this_deserializer`.
auto* const subscriber = static_cast<Subscriber*>(curr_node);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A very interesting design but it will break one day when we added the queue depth parameterization to the Subscriber (and perhaps some other compile-time policies). Generally, depending on a concrete type of the user-facing type is not a good idea, I think.

Copy link
Collaborator Author

@serges147 serges147 Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will break one day when we added the queue depth parameterization

so, let's not add it ;) - see my above answer about storage adapters


using TypeId = std::uintptr_t;
template <typename Message>
static TypeId getTypeId() noexcept
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type id breaks in dynamically linked executables as covered in the design doc. Can we try the solution outlined in the design doc:

one approach is to use the first instance of Subscriber to perform deserialization and then to deliver the deserialized message object to each of its siblings by value

Copy link
Collaborator Author

@serges147 serges147 Aug 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that it won't like break-break. IMHO the worst what might happen is that de-serialization will be done more than once for such "cross" dyn-linked modules... I'm not sure even how to reproduce it actually, b/c Presentation object lives at some specific module, as well as its internally made SubscriberImpl instances (they are not supposed to travel somehow between module boundaries). Even user-facing Subscriber<Msg> types are again made by the Presentation, you can move them outside of module boundary, but they still reference their original impl object inside (which does actual deserialization).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should be able to reproduce it as follows: Create a shared library that instantiates a subscriber of type T on a given Presentation instance. Create an executable that does the same. Load the shared library dynamically and request a subscriber from it. The executable should end up with two subscribers of the same type that rely on different deserializer.

The described scenario is not entirely hypothetical, you can easily imagine a complex application doing just that (there are ROS libraries out there that are structured similarly). I think the implications of unexpectedly doubling the execution time could be significant in some applications dealing with large messages (examples of these are given in the design doc).

I think it is acceptable to keep the current suboptimal implementation as is for now, but we should log this as a future improvement with a big TODO comment and a separate issue.

Copy link
Collaborator Author

@serges147 serges147 Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what you are describing in general (and why two incompatible types might end up with the same id - it was pretty well described also at the RTTI part of the design), but where I lost is...

... Create an executable that does the same.

Will it be done on two different instances of the Presentation object?

  • If yes, then I don't see how two subscribers, made on two different Presentations might collide...
  • If no, then the way I see it is that actual "factory" of subscribers will stay on only one side, so will do their template instantiated de-serialization code and its type id, right?... am I missing something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created issue #380 to address this problem.

Copy link
Collaborator

@lydia-at-amazon lydia-at-amazon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very exciting to see the Presentation Layer coming together!

ASSERT_THAT(heartbeat_publisher, testing::Optional(testing::_));
auto publish_every_1s_cb = executor_.registerCallback([&](const auto& arg) {
//
EXPECT_THAT(heartbeat_publisher->publish(arg.approx_now + 1s, makeHeartbeatMsg(arg.approx_now)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the first parameter passed to publish() be called now or deadline? The code passes in now + 1 here and then adds another second inside the publish function when you set the TransferMetadata:

void publish(const libcyphal::TimePoint now, const libcyphal::Duration uptime) {
     ....
     const TransferTxMetadata metadata{{transfer_id_, libcyphal::transport::Priority::Nominal}, now + 1s};
}

So, this means the deadline passed to TransferTxMetadata is actually now + 2 seconds?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not the same publish, note I called on heartbeat_publisher->publish !
void publish(const libcyphal::TimePoint now, const libcyphal::Duration uptime) { is not in use at the presentation layer "example_12" at all (but in use for the transport layer example 02).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See also below an idea of splitting node helpers according to which layer an example belongs to.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's precisely the problem with the use of rich helpers in examples

Comment on lines +165 to +168
static cetl::optional<Publisher<Message>> makePublisher(Presentation& presentation)
{
return makeAnyPublisher<Message>(presentation, Message::_traits_::FixedPortId);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        static cetl::optional<Publisher<Heartbeat>> makePublisher(Presentation& presentation)
        {
            return makeAnyPublisher<Heartbeat>(presentation, Heartbeat::_traits_::FixedPortId);
        }

nit: For the purposes of the example, using Heartbeat instead of Message might make it more clear that this function specifically creates a Heartbeat publisher and not a generic publisher

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This static method nested into struct Heartbeat umbrella type, and its usage looks like this : NodeHelpers::Heartbeat::makePublisher - I thought it's straightforward enough...

{
using Message = uavcan::node::Heartbeat_1_0;

bool makeRxSession(libcyphal::transport::ITransport& transport,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This function isn't needed for the presentation layer example. Would it be worth moving to a separate file so that users don't think they need to implement this function in order to integrate with the presentation layer?

Copy link
Collaborator Author

@serges147 serges147 Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right that it's not needed for presentation layer, but the same NodeHelpers::xxx stuff is in use for transport layer examples (02 & 03). Pavel mentioned that he wants more self-sufficient examples (and I hear you guys, and I actually started like this initially), but I really struggle with all this massive code duplication I need to introduce and maintain if I made each and every example self-sufficient (aka 1 single .cpp file)... and there will be more example apps... With NodeHelpers I can avoid this.

Here is an idea: what if I will add one intermediate layer-related subdirectory into docs/examples hierarchy, so that they will be groped by "0_transport" and "1_presentation" (and future "2_application"), and more importantly each of such layer-related dirs will contain only its own set of NodeHelpers with stuff which is boilerplate/specific to this layer only. Something like this:

docs /
    dsdl
    platform
    0_transport
        example_02_posix_udp.cpp
        example_03_linux_socketcan.cpp
        transport_helpers.hpp
    1_presentation
        example_12_posix_udp.cpp
        presentation_helpers.hpp

I prepended them with "0_" & "1_" to keep specific ordering (according to how high layer examples are), which I find useful. The same idea were at my current numbering of example .cpp-s, like: example_<layer_level><N>_<platform>_<kind_of_transport>, where N is just sequential index inside a layer.

What do you think guys? If you like it, can I do it in the next PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Demos/examples should not rely on helpers at all except for platform-specific parts, because they reduce the clarity of the example. If there is too much code duplication, we should look into altering the API design.

return nullptr != msg_rx_session_;
}

void makeTxSession(libcyphal::transport::ITransport& transport,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This function isn't need for the presentation layer example. Would it be worth moving to a separate file so that users don't think they need to implement this function in order to integrate with the presentation layer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above

Comment on lines 186 to 193
static void print(const libcyphal::Duration uptime, const libcyphal::transport::MessageRxTransfer& rx_heartbeat)
{
Message heartbeat_msg{};
if (tryDeserialize(heartbeat_msg, rx_heartbeat.payload))
{
print(uptime, heartbeat_msg, rx_heartbeat.metadata);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Might prefer to call this deserializeAndPrint()

Comment on lines +47 to +51
/// @tparam Message DSDL compiled (aka Nunavut generated) type of the message to publish.
/// @param subject_id The subject ID to publish the message on.
///
template <typename Message>
Expected<Publisher<Message>, transport::AnyFailure> makePublisher(const transport::PortId subject_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user only has access to the subject ID and not the compiled DSDL, can they just pass void for for the template parameter?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is right. I should probably add such "11" example. And cover there as well similar "un-typed' Subscriber (which is coming but not in this PR yet) - that way you can run two instances of the examples and they should listen to each other.

I'll add such example (instead of currently existing commented "example_01_hello_world" one) in the next PR.

},
[this, subject_id, &out_failure]() -> detail::SubscriberImpl* { // factory
//
return makeSubscriberImpl({Message::_traits_::ExtentBytes, subject_id}, out_failure);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user doesn't have access to the compiled Nunavut headers and so they pass void as the Message type, then trying to access Message::_traits_::ExtentBytes wouldn't work

Copy link
Collaborator Author

@serges147 serges147 Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would not, but it won't end up in such case in this

    template <typename Message>
    Expected<Subscriber<Message>, transport::AnyFailure> makeSubscriber(const transport::PortId subject_id)
    {

function in the first place - different "dummy" deserializer will be in use for such specialization. I call it dummy b/c it will do no attempt to deserialize but just pass scattered buffer as is to its callback.

Copy link
Collaborator Author

@serges147 serges147 Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I believe you are right that such Presentation::makeSubscriber overload should have two paramets, like this:

Expected<Subscriber<void>, transport::AnyFailure> makeSubscriber(subject_id, extent_bytes)

In the next PR I was planning to add it.

Copy link
Member

@pavel-kirienko pavel-kirienko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There pending issues to be addressed afterward:

  • Fix the RTTI problem across dynamic linking.
  • Restructure and simplify usage examples at the cost of code duplication.
  • Complete the new subscription API with peek/pop et al.

Copy link

sonarcloud bot commented Aug 23, 2024

@serges147 serges147 added this pull request to the merge queue Aug 24, 2024
Merged via the queue into main with commit 40869bd Aug 24, 2024
21 checks passed
@serges147 serges147 deleted the sshirokov/presentation_epoll branch August 24, 2024 05:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants