Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add requeststate comments #939

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,8 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) {
return cn.peerImpl._request(ppReq), nil
}

var peerUpdateRequestsPeerCancelReason = "Peer.cancel"

func (me *Peer) cancel(r RequestIndex) {
if !me.deleteRequest(r) {
panic("request not existing should have been guarded")
Expand All @@ -480,7 +482,7 @@ func (me *Peer) cancel(r RequestIndex) {
}
me.decPeakRequests()
if me.isLowOnRequests() {
me.updateRequests("Peer.cancel")
me.updateRequests(peerUpdateRequestsPeerCancelReason)
}
}

Expand Down Expand Up @@ -566,6 +568,8 @@ func runSafeExtraneous(f func()) {
}
}

var peerUpdateRequestsRemoteRejectReason = "Peer.remoteRejectedRequest"

// Returns true if it was valid to reject the request.
func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
if c.deleteRequest(r) {
Expand All @@ -574,7 +578,7 @@ func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
return false
}
if c.isLowOnRequests() {
c.updateRequests("Peer.remoteRejectedRequest")
c.updateRequests(peerUpdateRequestsRemoteRejectReason)
}
c.decExpectedChunkReceive(r)
return true
Expand Down
7 changes: 7 additions & 0 deletions requesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,19 @@ func (p *Peer) applyRequestState(next desiredRequestState) {
panic("changed")
}

// don't add requests on receipt of a reject - because this causes request back
// to potentially permanently unresponsive peers - which just adds network noise. If
// the peer can handle more requests it will send an "unchoked" message - which
// will cause it to get added back to the request queue
if p.needRequestUpdate == "Peer.remoteRejectedRequest" {
continue
}

existing := t.requestingPeer(req)
if existing != nil && existing != p {
// don't steal on cancel - because this is triggered by t.cancelRequest below
// which means that the cancelled can immediately try to steal back a request
// it has lost which can lead to circular cancel/add processing
if p.needRequestUpdate == "Peer.cancel" {
continue
}
Expand Down
41 changes: 36 additions & 5 deletions webseed-peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type webseedPeer struct {
client webseed.Client
activeRequests map[Request]webseed.Request
requesterCond sync.Cond
updateRequestor *time.Timer
lastUnhandledErr time.Time
}

Expand Down Expand Up @@ -72,7 +73,6 @@ func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
}

func (ws *webseedPeer) _request(r Request) bool {
ws.requesterCond.Signal()
return true
}

Expand All @@ -91,15 +91,17 @@ func (ws *webseedPeer) doRequest(r Request) error {
func (ws *webseedPeer) requester(i int) {
ws.requesterCond.L.Lock()
defer ws.requesterCond.L.Unlock()
start:

for !ws.peer.closed.IsSet() {
// Restart is set if we don't need to wait for the requestCond before trying again.
restart := false

ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool {
r := ws.peer.t.requestIndexToRequest(x)
if _, ok := ws.activeRequests[r]; ok {
return true
}

err := ws.doRequest(r)
ws.requesterCond.L.Unlock()
if err != nil && !errors.Is(err, context.Canceled) {
Expand All @@ -117,10 +119,38 @@ start:
ws.requesterCond.L.Lock()
return false
})
if restart {
goto start

if !restart {
if !ws.peer.t.dataDownloadDisallowed.Bool() && ws.peer.isLowOnRequests() && len(ws.peer.getDesiredRequestState().Requests.requestIndexes) > 0 {
if ws.updateRequestor == nil {
ws.updateRequestor = time.AfterFunc(updateRequestsTimerDuration, func() { requestUpdate(ws) })
}
}

ws.requesterCond.Wait()

if ws.updateRequestor != nil {
ws.updateRequestor.Stop()
ws.updateRequestor = nil
}
}
}
}

func requestUpdate(ws *webseedPeer) {
if ws != nil {
if !ws.peer.closed.IsSet() {
if len(ws.peer.getDesiredRequestState().Requests.requestIndexes) > 0 {
if ws.peer.isLowOnRequests() {
if time.Since(ws.peer.lastRequestUpdate) > updateRequestsTimerDuration {
ws.peer.updateRequests(peerUpdateRequestsTimerReason)
return
}
}

ws.requesterCond.Signal()
}
}
ws.requesterCond.Wait()
}
}

Expand All @@ -142,6 +172,7 @@ func (ws *webseedPeer) handleUpdateRequests() {
ws.peer.t.cl.lock()
defer ws.peer.t.cl.unlock()
ws.peer.maybeUpdateActualRequestState()
ws.requesterCond.Signal()
}()
}

Expand Down
Loading