Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close producer no matter if @producer_enabled or not #127

Open
renjiexu opened this issue Jun 10, 2020 · 11 comments
Open

Close producer no matter if @producer_enabled or not #127

renjiexu opened this issue Jun 10, 2020 · 11 comments

Comments

@renjiexu
Copy link

renjiexu commented Jun 10, 2020

Hi phobos team,

By reading the current code, it would only close the producer when @producer_enable:

if @producer_enabled
@handler_class.producer.async_producer_shutdown
@handler_class.producer.configure_kafka_client(nil)
end

This assumes we have a class that extends both Listener and Producer. In our code, we have something like:

class KafkaProducer
  include Phobos::Producer
end

..and it has been used in the consumer/listener, but didn't get closed when we reboot the consumer.

  1. What's the problem that if we don't close the producer connection during deployment?
  2. Could we go through OpenSpace find all the classes that inherit Phobos::Producer and close them?

Thanks!

@dorner
Copy link
Contributor

dorner commented Jun 10, 2020

For producers, by default they are only opened/closed when you produce:

        def publish_list(messages)
          producer = sync_producer || create_sync_producer
          produce_messages(producer, messages)
          producer.deliver_messages
        ensure
          producer&.shutdown unless Phobos.config.producer_hash[:persistent_connections]
        end

If you have persistent_connections set to true, that's when the producer will not automatically be shutdown. In this case, it's up to you to shut it down manually when your process shuts down, e.g.:

at_exit do
  MyProducer.producer.sync_producer_shutdown
end

@renjiexu
Copy link
Author

Yeah, sorry I didn't explain the use case clearly in the description:

  1. We're using persistent_connection
  2. The producing happens inside the consumer

at_exit do
MyProducer.producer.sync_producer_shutdown
end

If I understand this correctly, this needs to happen inside the same thread that does the producing, where do you recommend to put this logic for phobos-consumer? Right now my approach is:

module Phobos
  class Listener
    alias original_stop_listener stop_listener

    def stop_listener
      original_stop_listener

      MyProducer.producer.sync_producer_shutdown
      MyProducer.producer.async_producer_shutdown
    end
  end
end

That's why I proposed to have this logic inside the existing stop_listener directly instead of monkey-patching it

@dorner
Copy link
Contributor

dorner commented Jun 10, 2020

One option is for you to define your producer classes outside the listener, e.g. in an initializer (if you're running Rails), a railtie, etc. This means your consumers would share the producer objects and you could shut them down from a single place.

@dorner
Copy link
Contributor

dorner commented Jun 10, 2020

Ah - just realized Phobos has a boot option that lets you define some code to run before the threading happens. So you could define it there.

@renjiexu
Copy link
Author

renjiexu commented Jun 10, 2020

For other application rails/sinatra, we handled it by puma on_worker_shutdown hook and it works fine.

For publishing inside the phobos consumer, I'm not sure how to share the producer across different threads though (we're using max_concurrency: 3 here, but it's same issue for 1), for example, I have following in phobos_boot.rb:

puts "boot thread id: #{Thread.current.object_id}"

at_exit do
  puts "Inside at_exit"
  MyProducer.producer.sync_producer_shutdown
  puts "Done at_exit"
end

This is the output:

boot thread id: 70240352542780
{"msg":"Start Listener thread_id: 70240357425020"}
{"msg":"Start Listener thread_id: 70240357422220"}
{"msg":"Start Listener thread_id: 70240357418780"}

...
// stop
{"msg":"Stop Listener.... 70240357422220"}
sync_producer_shutdown: 70240357422220, true
{"msg":"Stop Listener.... 70240357425020"}
sync_producer_shutdown: 70240357425020, true
{"msg":"Stop Listener.... 70240357418780"}
sync_producer_shutdown: 70240357418780, false // this is checking if `producer_store[:sync_producer].nil?`
Inside at_exit
sync_producer_shutdown: 70240352542780, true
Done at_exit

So the boot thread isn't the same one as those store producers, any thoughts on how to address this? Thanks!

@dorner
Copy link
Contributor

dorner commented Jun 10, 2020

You first need to actually create the producer before starting up the listeners. So it's not enough to put in the at_exit block, you actually have to initialize the producer there by calling create_sync_producer or create_async_producer.

@renjiexu
Copy link
Author

Hm? are you saying it's actually the same producer among all the threads including the main one for phobos?

@dorner
Copy link
Contributor

dorner commented Jun 10, 2020

Hmm... now that I'm looking at it I think you're right. It would be stored in thread variables and sharing the object reference wouldn't share the producer. I'm not sure why I've never come across this, because we also do producing inside our consumers. 😮

I'm down for a PR to add a callback before a listener shuts down so you can add code to shut down your producers there.

@renjiexu
Copy link
Author

cool, yeah, callback sounds better than looping the OpenSpace.

because we also do producing inside our consumers.

btw, back the original question in 1), do you know what's the downside if we don't close the connection? I think broker may still close the idle connection after certain time?

@dorner
Copy link
Contributor

dorner commented Jun 10, 2020

Yeah I don't know of any ramifications of not closing the producer. I think you're right, the broker should kill the extra one after some time. connections.max.idle.ms is the setting.

@renjiexu
Copy link
Author

For other application rails/sinatra, we handled it by puma on_worker_shutdown hook and it works fine.

btw, I reviewed this today and realize I made a mistake: on_worker_shutdown only has access to worker context, each worker could have multiple threads and seems impossible to shutdown the producer in this case

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants