Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer: sendRDY error not propogating #199

Open
armcknight opened this issue Nov 2, 2016 · 4 comments
Open

consumer: sendRDY error not propogating #199

armcknight opened this issue Nov 2, 2016 · 4 comments

Comments

@armcknight
Copy link

Hello,

first of all apologies if my terminology is a bit off, I'm not a regular Go programmer :)

We run a process reading from NSQ servers over an SSH tunnel. While debugging an issue when this connection breaks, we found a potential problem with how an error from sendRDY will not fully propagate.

sendRDY possibly emits an error (

go-nsq/consumer.go

Lines 950 to 964 in d71fb89

func (r *Consumer) sendRDY(c *Conn, count int64) error {
if count == 0 && c.LastRDY() == 0 {
// no need to send. It's already that RDY count
return nil
}
atomic.AddInt64(&r.totalRdyCount, -c.RDY()+count)
c.SetRDY(count)
err := c.WriteCommand(Ready(int(count)))
if err != nil {
r.log(LogLevelError, "(%s) error sending RDY %d - %s", c.String(), count, err)
return err
}
return nil
}
):

func (r *Consumer) sendRDY(c *Conn, count int64) error

updateRDY, which calls sendRDY, also possibly emits an error (

func (r *Consumer) updateRDY(c *Conn, count int64) error {
):

func (r *Consumer) updateRDY(c *Conn, count int64) error

But that error isn't handled in it's own recursive call here (

r.updateRDY(c, count)
):

r.rdyRetryTimers[c.String()] = time.AfterFunc(5*time.Second,
	func() {
		r.updateRDY(c, count)
	})

We were thinking that the failure for the error to fully propagate means our process doesn't pick up the loss of connection and doesn't know to attempt a mitigation.

We also found a few other invocations of updateRDY that don't appear to handle errors, which both appear in startStopContinueBackoff , which doesn't report that it can throw an error (

func (r *Consumer) startStopContinueBackoff(conn *Conn, signal backoffSignal) {
):

@bmhatfield
Copy link

For a little bit of extra context, this seems to require a pretty specific set of circumstances for us. When the tunnel drops, sometimes it's detected and a reconnect happens, other times we see this:

2016/11/01 10:19:07 ERR    1 [csym/create] (127.0.0.1:3001) IO error - write tcp 127.0.0.1:51178->127.0.0.1:3001: write: broken pipe
2016/11/01 10:19:07 ERR    1 [csym/create] (127.0.0.1:3001) error sending RDY 1 - write tcp 127.0.0.1:51178->127.0.0.1:3001: write: broken pipe

When we fall into this mode, we do not observe a reconnect (even though the tunnel would have eventually come back up on it's own, we'd need to reinitialize the connection to NSQ)

@mreiferson
Copy link
Member

@armcknight @bmhatfield thanks for the detailed info, I'll try to take a look at this!

@mreiferson mreiferson changed the title sendRDY error not propogating consumer: sendRDY error not propogating Nov 2, 2016
@mreiferson mreiferson added the bug label Nov 2, 2016
@mreiferson
Copy link
Member

I just poked around at this.

Despite not handling the returned errors in sendRDY / updateRDY, conn.WriteCommand calls c.delegate.OnIOError which calls conn.Close, which should then trigger reconnect cycle.

The only reason why it wouldn't is if messages are in flight and never "complete", meaning the rest of the cleanup logic doesn't execute. This is probably a poor assumption though, perhaps we should bound this with some timeout.

Thoughts?

@djmally
Copy link

djmally commented Apr 21, 2017

We resolved this recently on our end, your explanation is pretty spot-on for what we were experiencing. We had messages still in flight when the RDY count was getting redistributed, which caused the connection with the in-flight messages to close prematurely. We fixed this by upping low_rdy_idle_timeout to 2 minutes in our NSQ client configuration.

I don't think I quite understand all of the inner workings of this package to comment on whether or not there should be a timeout on this operation in the client, so I'll leave that up to you, but hopefully the way we resolved this internally may provide some assistance in making that decision.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants