From 16c1e40a8a51c6729eb94bd009252c8d077b3ced Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Sat, 20 Apr 2024 08:04:13 +0100 Subject: [PATCH 1/5] add webseed update timer --- webseed-peer.go | 41 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/webseed-peer.go b/webseed-peer.go index ca915f3861..e34a8bd692 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -27,6 +27,7 @@ type webseedPeer struct { client webseed.Client activeRequests map[Request]webseed.Request requesterCond sync.Cond + updateRequestor *time.Timer lastUnhandledErr time.Time } @@ -72,7 +73,6 @@ func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec { } func (ws *webseedPeer) _request(r Request) bool { - ws.requesterCond.Signal() return true } @@ -91,15 +91,17 @@ func (ws *webseedPeer) doRequest(r Request) error { func (ws *webseedPeer) requester(i int) { ws.requesterCond.L.Lock() defer ws.requesterCond.L.Unlock() -start: + for !ws.peer.closed.IsSet() { // Restart is set if we don't need to wait for the requestCond before trying again. restart := false + ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool { r := ws.peer.t.requestIndexToRequest(x) if _, ok := ws.activeRequests[r]; ok { return true } + err := ws.doRequest(r) ws.requesterCond.L.Unlock() if err != nil && !errors.Is(err, context.Canceled) { @@ -117,10 +119,38 @@ start: ws.requesterCond.L.Lock() return false }) - if restart { - goto start + + if !restart { + if !ws.peer.t.dataDownloadDisallowed.Bool() && ws.peer.isLowOnRequests() && len(ws.peer.getDesiredRequestState().Requests.requestIndexes) > 0 { + if ws.updateRequestor == nil { + ws.updateRequestor = time.AfterFunc(updateRequestsTimerDuration, func() { requestUpdate(ws) }) + } + } + + ws.requesterCond.Wait() + + if ws.updateRequestor != nil { + ws.updateRequestor.Stop() + ws.updateRequestor = nil + } + } + } +} + +func requestUpdate(ws *webseedPeer) { + if ws != nil { + if !ws.peer.closed.IsSet() { + if len(ws.peer.getDesiredRequestState().Requests.requestIndexes) > 0 { + if ws.peer.isLowOnRequests() { + if time.Since(ws.peer.lastRequestUpdate) > updateRequestsTimerDuration { + ws.peer.updateRequests(peerUpdateRequestsTimerReason) + return + } + } + + ws.requesterCond.Signal() + } } - ws.requesterCond.Wait() } } @@ -142,6 +172,7 @@ func (ws *webseedPeer) handleUpdateRequests() { ws.peer.t.cl.lock() defer ws.peer.t.cl.unlock() ws.peer.maybeUpdateActualRequestState() + ws.requesterCond.Signal() }() } From 5d3d59935704acae84791ab5b68045cf0a4ca067 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Sat, 20 Apr 2024 08:17:09 +0100 Subject: [PATCH 2/5] steal stability --- requesting.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/requesting.go b/requesting.go index 51419a3599..923f935266 100644 --- a/requesting.go +++ b/requesting.go @@ -312,13 +312,22 @@ func (p *Peer) applyRequestState(next desiredRequestState) { if cap(next.Requests.requestIndexes) != cap(orig) { panic("changed") } + + if p.needRequestUpdate == "Peer.remoteRejectedRequest" { + continue + } + existing := t.requestingPeer(req) if existing != nil && existing != p { + if p.needRequestUpdate == "Peer.cancel" { + continue + } + // Don't steal from the poor. diff := int64(current.Requests.GetCardinality()) + 1 - (int64(existing.uncancelledRequests()) - 1) // Steal a request that leaves us with one more request than the existing peer // connection if the stealer more recently received a chunk. - if diff > 1 || (diff == 1 && p.lastUsefulChunkReceived.Before(existing.lastUsefulChunkReceived)) { + if diff > 1 || (diff == 1 && !p.lastUsefulChunkReceived.After(existing.lastUsefulChunkReceived)) { continue } t.cancelRequest(req) From 472bfbc56ad5894471f86b138fcf64321b9d78a4 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Sat, 20 Apr 2024 10:41:17 +0100 Subject: [PATCH 3/5] close body in same go routine as do --- webseed/client.go | 53 +++++++++++++++++------------------------------ 1 file changed, 19 insertions(+), 34 deletions(-) diff --git a/webseed/client.go b/webseed/client.go index 4614a3e407..7740fb8ed9 100644 --- a/webseed/client.go +++ b/webseed/client.go @@ -19,16 +19,10 @@ import ( type RequestSpec = segments.Extent -type requestPartResult struct { - resp *http.Response - err error -} - type requestPart struct { - req *http.Request - e segments.Extent - result chan requestPartResult - start func() + req *http.Request + e segments.Extent + do func() (*http.Response, error) // Wrap http response bodies for such things as download rate limiting. responseBodyWrapper ResponseBodyWrapper } @@ -88,18 +82,11 @@ func (ws *Client) NewRequest(r RequestSpec) Request { } part := requestPart{ req: req, - result: make(chan requestPartResult, 1), e: e, responseBodyWrapper: ws.ResponseBodyWrapper, } - part.start = func() { - go func() { - resp, err := ws.HttpClient.Do(req) - part.result <- requestPartResult{ - resp: resp, - err: err, - } - }() + part.do = func() (*http.Response, error) { + return ws.HttpClient.Do(req) } requestParts = append(requestParts, part) return true @@ -129,24 +116,18 @@ func (me ErrBadResponse) Error() string { return me.Msg } -func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error { - result := <-part.result - // Make sure there's no further results coming, it should be a one-shot channel. - close(part.result) - if result.err != nil { - return result.err - } - defer result.resp.Body.Close() - var body io.Reader = result.resp.Body +func recvPartResult(ctx context.Context, buf io.Writer, part requestPart, resp *http.Response) error { + defer resp.Body.Close() + var body io.Reader = resp.Body if part.responseBodyWrapper != nil { body = part.responseBodyWrapper(body) } // Prevent further accidental use - result.resp.Body = nil + resp.Body = nil if ctx.Err() != nil { return ctx.Err() } - switch result.resp.StatusCode { + switch resp.StatusCode { case http.StatusPartialContent: copied, err := io.Copy(buf, body) if err != nil { @@ -178,14 +159,14 @@ func recvPartResult(ctx context.Context, buf io.Writer, part requestPart) error _, err := io.CopyN(buf, body, part.e.Length) return err } else { - return ErrBadResponse{"resp status ok but requested range", result.resp} + return ErrBadResponse{"resp status ok but requested range", resp} } case http.StatusServiceUnavailable: return ErrTooFast default: return ErrBadResponse{ - fmt.Sprintf("unhandled response status code (%v)", result.resp.StatusCode), - result.resp, + fmt.Sprintf("unhandled response status code (%v)", resp.StatusCode), + resp, } } } @@ -195,8 +176,12 @@ var ErrTooFast = errors.New("making requests too fast") func readRequestPartResponses(ctx context.Context, parts []requestPart) (_ []byte, err error) { var buf bytes.Buffer for _, part := range parts { - part.start() - err = recvPartResult(ctx, &buf, part) + result, err := part.do() + + if err == nil { + err = recvPartResult(ctx, &buf, part, result) + } + if err != nil { err = fmt.Errorf("reading %q at %q: %w", part.req.URL, part.req.Header.Get("Range"), err) break From a06a84c3ceb073eb4e09999a3cdd75970a368086 Mon Sep 17 00:00:00 2001 From: Mark Holt Date: Mon, 29 Apr 2024 22:05:49 +0100 Subject: [PATCH 4/5] Added comments and variables instead of raw reason strings --- peer.go | 8 ++++++-- requesting.go | 11 +++++++++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/peer.go b/peer.go index 733aa018ef..4174971077 100644 --- a/peer.go +++ b/peer.go @@ -468,6 +468,8 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) { return cn.peerImpl._request(ppReq), nil } +var peerUpdateRequestsPeerCancelReason = "Peer.cancel" + func (me *Peer) cancel(r RequestIndex) { if !me.deleteRequest(r) { panic("request not existing should have been guarded") @@ -480,7 +482,7 @@ func (me *Peer) cancel(r RequestIndex) { } me.decPeakRequests() if me.isLowOnRequests() { - me.updateRequests("Peer.cancel") + me.updateRequests(peerUpdateRequestsPeerCancelReason) } } @@ -566,6 +568,8 @@ func runSafeExtraneous(f func()) { } } +var peerUpdateRequestsRemoteRejectReason = "Peer.remoteRejectedRequest" + // Returns true if it was valid to reject the request. func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { if c.deleteRequest(r) { @@ -574,7 +578,7 @@ func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { return false } if c.isLowOnRequests() { - c.updateRequests("Peer.remoteRejectedRequest") + c.updateRequests(peerUpdateRequestsRemoteRejectReason) } c.decExpectedChunkReceive(r) return true diff --git a/requesting.go b/requesting.go index 923f935266..c5a078504a 100644 --- a/requesting.go +++ b/requesting.go @@ -313,13 +313,20 @@ func (p *Peer) applyRequestState(next desiredRequestState) { panic("changed") } - if p.needRequestUpdate == "Peer.remoteRejectedRequest" { + // don't add requests on reciept of a reject - because this causes request back + // to potentially permanently unresponive peers - which just adds network noise. If + // the peer can handle more requests it will send an "unchoked" message - which + // will cause it to get added back to the request queue + if p.needRequestUpdate == peerUpdateRequestsRemoteRejectReason { continue } existing := t.requestingPeer(req) if existing != nil && existing != p { - if p.needRequestUpdate == "Peer.cancel" { + // don't steal on cancel - because this is triggered by t.cancelRequest below + // which means that the cancelled can immediately try to steal back a request + // it has lost which can lead to circular cancel/add processing + if p.needRequestUpdate == peerUpdateRequestsPeerCancelReason { continue } From 87b0b5d7fea68bdd9bb35a302c7648bff7e442f2 Mon Sep 17 00:00:00 2001 From: Mark Holt <135143369+mh0lt@users.noreply.github.com> Date: Tue, 30 Apr 2024 16:54:07 +0100 Subject: [PATCH 5/5] add comments to continue clauses --- requesting.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/requesting.go b/requesting.go index 7ecb4e2221..9799f96eff 100644 --- a/requesting.go +++ b/requesting.go @@ -312,12 +312,19 @@ func (p *Peer) applyRequestState(next desiredRequestState) { panic("changed") } + // don't add requests on receipt of a reject - because this causes request back + // to potentially permanently unresponsive peers - which just adds network noise. If + // the peer can handle more requests it will send an "unchoked" message - which + // will cause it to get added back to the request queue if p.needRequestUpdate == "Peer.remoteRejectedRequest" { continue } existing := t.requestingPeer(req) if existing != nil && existing != p { + // don't steal on cancel - because this is triggered by t.cancelRequest below + // which means that the cancelled can immediately try to steal back a request + // it has lost which can lead to circular cancel/add processing if p.needRequestUpdate == "Peer.cancel" { continue }