Skip to content

Commit

Permalink
Drop support for FindBatch API
Browse files Browse the repository at this point in the history
The go-libipni library is no longer supporting a FindBatch API, called by `HTTP POST /multihash`. That was previously only supported for non-encrypted non-streaming lookups directly with an indexer and not with dhstore. So, support for this limited case is being discontinued.

Other changes:
- Cleanup status returns for unsupported method
  • Loading branch information
gammazero committed Sep 19, 2023
1 parent b6da07e commit 578a6bd
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 111 deletions.
29 changes: 6 additions & 23 deletions delegated_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,16 @@ import (
"context"
"encoding/json"
"hash/crc32"
"io"
"net/http"
"net/url"
"path"

"github.com/ipfs/go-cid"
"github.com/ipni/go-libipni/find/model"
"github.com/ipni/go-libipni/metadata"
"github.com/ipni/indexstar/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
Expand All @@ -26,7 +23,7 @@ const (
unknownSchema = unknownProtocol
)

type findFunc func(ctx context.Context, method, source string, req *url.URL, body []byte, mh multihash.Multihash, encrypted bool) (int, []byte)
type findFunc func(ctx context.Context, method, source string, req *url.URL, body []byte, encrypted bool) (int, []byte)

func NewDelegatedTranslator(backend findFunc) (http.Handler, error) {
finder := delegatedTranslator{backend}
Expand All @@ -47,7 +44,6 @@ func (dt *delegatedTranslator) provide(w http.ResponseWriter, r *http.Request) {
stats.WithTags(tag.Insert(metrics.Method, r.Method)),
stats.WithMeasurements(metrics.HttpDelegatedRoutingMethod.M(1)))

discardBody(r)
h := w.Header()
h.Add("Access-Control-Allow-Origin", "*")
h.Add("Access-Control-Allow-Methods", "GET, PUT, OPTIONS")
Expand All @@ -66,39 +62,26 @@ func (dt *delegatedTranslator) find(w http.ResponseWriter, r *http.Request, encr
stats.WithTags(tag.Insert(metrics.Method, r.Method)),
stats.WithMeasurements(metrics.HttpDelegatedRoutingMethod.M(1)))

// read out / close the request body.
_, err := io.ReadAll(r.Body)
_ = r.Body.Close()
if err != nil {
log.Warnw("failed to read original request body", "err", err)
http.Error(w, "", http.StatusBadRequest)
return
}

h := w.Header()
h.Add("Access-Control-Allow-Origin", "*")
h.Add("Access-Control-Allow-Methods", "GET, OPTIONS")
switch r.Method {
case http.MethodGet:
case http.MethodOptions:
w.WriteHeader(http.StatusOK)
return
case http.MethodGet:
default:
http.Error(w, "", http.StatusNotFound)
w.Header().Set("Allow", http.MethodGet)
http.Error(w, "", http.StatusMethodNotAllowed)
return
}

// Get the CID resource from the last element in the URL path.
cidUrlParam := path.Base(r.URL.Path)
c, err := cid.Decode(cidUrlParam)
if err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}

// Translate URL by mapping `/providers/{CID}` to `/cid/{CID}`.
uri := r.URL.JoinPath("../cid", cidUrlParam)
rcode, resp := dt.be(r.Context(), http.MethodGet, findMethodDelegated, uri, []byte{}, c.Hash(), encrypted)
rcode, resp := dt.be(r.Context(), http.MethodGet, findMethodDelegated, uri, []byte{}, encrypted)
if rcode != http.StatusOK {
http.Error(w, "", rcode)
return
Expand All @@ -117,7 +100,7 @@ func (dt *delegatedTranslator) find(w http.ResponseWriter, r *http.Request, encr
// serverr
log.Warnw("failed to parse backend response", "number_multihash", len(parsed.MultihashResults))
http.Error(w, "", http.StatusInternalServerError)

return
}

res := parsed.MultihashResults[0]
Expand Down
89 changes: 19 additions & 70 deletions find.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,81 +30,64 @@ const (
func (s *server) findCid(w http.ResponseWriter, r *http.Request, encrypted bool) {
switch r.Method {
case http.MethodOptions:
discardBody(r)
handleIPNIOptions(w, false)
case http.MethodGet:
sc := strings.TrimPrefix(path.Base(r.URL.Path), "cid/")
c, err := cid.Decode(sc)
if err != nil {
discardBody(r)
http.Error(w, "invalid cid: "+err.Error(), http.StatusBadRequest)
}
s.find(w, r, c.Hash(), encrypted)
default:
discardBody(r)
http.Error(w, "", http.StatusNotFound)
}
}

func (s *server) findMultihash(w http.ResponseWriter, r *http.Request, encrypted bool) {
switch r.Method {
case http.MethodOptions:
discardBody(r)
handleIPNIOptions(w, true)
case http.MethodPost:
s.find(w, r, nil, encrypted)
default:
discardBody(r)
http.Error(w, "", http.StatusNotFound)
w.Header().Set("Allow", http.MethodGet)
http.Error(w, "", http.StatusMethodNotAllowed)
}
}

func (s *server) findMultihashSubtree(w http.ResponseWriter, r *http.Request, encrypted bool) {
switch r.Method {
case http.MethodOptions:
discardBody(r)
handleIPNIOptions(w, false)
case http.MethodGet:
smh := path.Base(r.URL.Path)
mh, err := multihash.FromB58String(smh)
if err != nil {
discardBody(r)
http.Error(w, "invalid multihash: "+err.Error(), http.StatusBadRequest)
}
s.find(w, r, mh, encrypted)
default:
discardBody(r)
http.Error(w, "", http.StatusNotFound)
w.Header().Set("Allow", http.MethodGet)
http.Error(w, "", http.StatusMethodNotAllowed)
}
}

func (s *server) findMetadataSubtree(w http.ResponseWriter, r *http.Request) {
discardBody(r)
if r.Method != http.MethodGet {
http.Error(w, "", http.StatusNotFound)
w.Header().Set("Allow", http.MethodGet)
http.Error(w, "", http.StatusMethodNotAllowed)
return
}

ctx, cancel := context.WithCancel(r.Context())
defer cancel()
method := r.Method
req := r.URL
reqURL := r.URL

sg := &scatterGather[Backend, []byte]{
backends: s.backends,
maxWait: config.Server.ResultMaxWait,
}

// TODO: wait for the first successful response instead
if err := sg.scatter(ctx, func(cctx context.Context, b Backend) (*[]byte, error) {
err := sg.scatter(ctx, func(cctx context.Context, b Backend) (*[]byte, error) {
// send metadata requests only to dh backends
if _, isDhBackend := b.(dhBackend); !isDhBackend {
return nil, nil
}

// Copy the URL from original request and override host/schema to point
// to the server.
endpoint := *req
endpoint := *reqURL
endpoint.Host = b.URL().Host
endpoint.Scheme = b.URL().Scheme
log := log.With("backend", endpoint.Host)
Expand Down Expand Up @@ -146,7 +129,8 @@ func (s *server) findMetadataSubtree(w http.ResponseWriter, r *http.Request) {
}
return nil, err
}
}); err != nil {
})
if err != nil {
log.Errorw("Failed to scatter HTTP find metadata request", "err", err)
http.Error(w, "", http.StatusInternalServerError)
return
Expand All @@ -168,56 +152,23 @@ func (s *server) findMetadataSubtree(w http.ResponseWriter, r *http.Request) {
func (s *server) find(w http.ResponseWriter, r *http.Request, mh multihash.Multihash, encrypted bool) {
acc, err := getAccepts(r)
if err != nil {
discardBody(r)
http.Error(w, "invalid Accept header", http.StatusBadRequest)
return
}
var rb []byte
switch r.Method {
case http.MethodGet:
discardBody(r)
case http.MethodPost:
if !acc.json && !acc.any && acc.acceptHeaderFound {
// Only non-streaming JSON is supported for POST requests.
http.Error(w, "unsupported media type", http.StatusBadRequest)
return
}
// Copy the original request body in case it is a POST batch find request.
var err error
rb, err = io.ReadAll(r.Body)
_ = r.Body.Close()
if err != nil {
log.Warnw("Failed to read original request body", "err", err)
http.Error(w, "", http.StatusBadRequest)
return
}
rcode, resp := s.doFind(r.Context(), r.Method, findMethodOrig, r.URL, rb, mh, encrypted)
if rcode != http.StatusOK {
http.Error(w, "", rcode)
return
}
writeJsonResponse(w, http.StatusOK, resp)
return
default:
discardBody(r)
http.Error(w, "", http.StatusNotFound)
return
}
ctx := r.Context()

// Use NDJSON response only when the request explicitly accepts it. Otherwise, fallback on
// JSON unless only unsupported media types are specified.
switch {
case acc.ndjson:
s.doFindNDJson(ctx, w, findMethodOrig, r.URL, false, mh, encrypted)
s.doFindNDJson(r.Context(), w, findMethodOrig, r.URL, false, mh, encrypted)
case acc.json || acc.any || !acc.acceptHeaderFound:
if s.translateNonStreaming {
s.doFindNDJson(ctx, w, findMethodOrig, r.URL, true, mh, encrypted)
s.doFindNDJson(r.Context(), w, findMethodOrig, r.URL, true, mh, encrypted)
return
}
// In a case where the request has no `Accept` header at all, be forgiving and respond with
// JSON.
rcode, resp := s.doFind(ctx, r.Method, findMethodOrig, r.URL, rb, mh, encrypted)
rcode, resp := s.doFind(r.Context(), r.Method, findMethodOrig, r.URL, nil, encrypted)
if rcode != http.StatusOK {
http.Error(w, "", rcode)
return
Expand All @@ -229,7 +180,7 @@ func (s *server) find(w http.ResponseWriter, r *http.Request, mh multihash.Multi
}
}

func (s *server) doFind(ctx context.Context, method, source string, req *url.URL, body []byte, mh multihash.Multihash, encrypted bool) (int, []byte) {
func (s *server) doFind(ctx context.Context, method, source string, req *url.URL, body []byte, encrypted bool) (int, []byte) {
start := time.Now()
latencyTags := []tag.Mutator{tag.Insert(metrics.Method, method)}
loadTags := []tag.Mutator{tag.Insert(metrics.Method, source)}
Expand Down Expand Up @@ -272,7 +223,10 @@ func (s *server) doFind(ctx context.Context, method, source string, req *url.URL
endpoint.Scheme = b.URL().Scheme
log := log.With("backend", endpoint.Host)

bodyReader := bytes.NewReader(body)
var bodyReader *bytes.Reader
if len(body) != 0 {
bodyReader = bytes.NewReader(body)
}
req, err := http.NewRequestWithContext(cctx, method, endpoint.String(), bodyReader)
if err != nil {
log.Warnw("Failed to construct backend query", "err", err)
Expand Down Expand Up @@ -426,8 +380,3 @@ func handleIPNIOptions(w http.ResponseWriter, post bool) {
}
w.WriteHeader(http.StatusAccepted)
}

func discardBody(r *http.Request) {
_, _ = io.Copy(io.Discard, r.Body)
_ = r.Body.Close()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
contrib.go.opencensus.io/exporter/prometheus v0.4.0
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipni/go-libipni v0.5.0
github.com/ipni/go-libipni v0.5.2
github.com/libp2p/go-libp2p v0.31.0
github.com/mercari/go-circuitbreaker v0.0.2
github.com/mitchellh/go-homedir v1.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY=
github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI=
github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E=
github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ=
github.com/ipni/go-libipni v0.5.0 h1:UTVq4U9cReNz/rbeVeUvl+JTLexdfb7yjmBgouSteJ8=
github.com/ipni/go-libipni v0.5.0/go.mod h1:UnrhEqjVI2Z2HXlaieOBONJmtW557nZkYpB4IIsMD+s=
github.com/ipni/go-libipni v0.5.2 h1:9vaYOnR4dskd8p88NOboqI6yVqBwYPNCQ/zOaRSr59I=
github.com/ipni/go-libipni v0.5.2/go.mod h1:UnrhEqjVI2Z2HXlaieOBONJmtW557nZkYpB4IIsMD+s=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down
12 changes: 2 additions & 10 deletions providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"encoding/json"
"io"
"net/http"
"path"

Expand All @@ -12,8 +11,8 @@ import (

func (s *server) providers(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
discardBody(r)
http.Error(w, "", http.StatusNotFound)
w.Header().Set("Allow", http.MethodGet)
http.Error(w, "", http.StatusMethodNotAllowed)
return
}

Expand All @@ -34,13 +33,6 @@ func (s *server) providers(w http.ResponseWriter, r *http.Request) {

// provider returns most recent state of a single provider.
func (s *server) provider(w http.ResponseWriter, r *http.Request) {
_, err := io.ReadAll(r.Body)
_ = r.Body.Close()
if err != nil {
log.Warnw("failed to read original request body", "err", err)
return
}

pid, err := peer.Decode(path.Base(r.URL.Path))
if err != nil {
log.Warnw("bad provider ID", "err", err)
Expand Down
7 changes: 2 additions & 5 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func NewServer(c *cli.Context) (*server, error) {
}

func loadBackends(servers, cascadeServers, dhServers, providersServers []string) ([]Backend, error) {

newBackendFunc := func(s string) (Backend, error) {
return NewBackend(s, circuitbreaker.New(
circuitbreaker.WithFailOnContextCancel(false),
Expand Down Expand Up @@ -239,8 +238,6 @@ func (s *server) Serve() chan error {
mux := http.NewServeMux()
mux.HandleFunc("/cid/", func(w http.ResponseWriter, r *http.Request) { s.findCid(w, r, false) })
mux.HandleFunc("/encrypted/cid/", func(w http.ResponseWriter, r *http.Request) { s.findCid(w, r, true) })
mux.HandleFunc("/multihash", func(w http.ResponseWriter, r *http.Request) { s.findMultihash(w, r, false) })
mux.HandleFunc("/encrypted/multihash", func(w http.ResponseWriter, r *http.Request) { s.findMultihash(w, r, true) })
mux.HandleFunc("/multihash/", func(w http.ResponseWriter, r *http.Request) { s.findMultihashSubtree(w, r, false) })
mux.HandleFunc("/encrypted/multihash/", func(w http.ResponseWriter, r *http.Request) { s.findMultihashSubtree(w, r, true) })
mux.HandleFunc("/metadata/", s.findMetadataSubtree)
Expand Down Expand Up @@ -313,9 +310,9 @@ func (s *server) Serve() chan error {
}

func (s *server) health(w http.ResponseWriter, r *http.Request) {
discardBody(r)
if r.Method != http.MethodGet {
http.Error(w, "", http.StatusNotFound)
w.Header().Set("Allow", http.MethodGet)
http.Error(w, "", http.StatusMethodNotAllowed)
return
}
writeJsonResponse(w, http.StatusOK, []byte("ready"))
Expand Down

0 comments on commit 578a6bd

Please sign in to comment.