Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PBM-1417: allow multi-database selective backup #1038

Draft
wants to merge 7 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cmd/pbm/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ func runBackup(
if err != nil {
return nil, errors.Wrap(err, "parse --ns option")
}
if len(nss) > 1 {
return nil, errors.New("parse --ns option: multiple namespaces are not supported")
}
if len(nss) != 0 && b.typ != string(defs.LogicalBackup) {
return nil, errors.New("--ns flag is only allowed for logical backup")
}
Expand Down Expand Up @@ -348,6 +345,7 @@ type bcpReplDesc struct {
LastWriteTime string `json:"last_write_time" yaml:"last_write_time"`
LastTransitionTime string `json:"last_transition_time" yaml:"last_transition_time"`
IsConfigSvr *bool `json:"configsvr,omitempty" yaml:"configsvr,omitempty"`
IsConfigShard *bool `json:"configshard,omitempty" yaml:"configshard,omitempty"`
SecurityOpts *topo.MongodOptsSec `json:"security,omitempty" yaml:"security,omitempty"`
Error *string `json:"error,omitempty" yaml:"error,omitempty"`
Collections []string `json:"collections,omitempty" yaml:"collections,omitempty"`
Expand Down Expand Up @@ -442,6 +440,7 @@ func describeBackup(
Name: r.Name,
Node: r.Node,
IsConfigSvr: r.IsConfigSvr,
IsConfigShard: r.IsConfigShard,
Status: r.Status,
LastWriteTS: int64(r.LastWriteTS.T),
LastTransitionTS: r.LastTransitionTS,
Expand Down
174 changes: 6 additions & 168 deletions pbm/archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,50 +38,19 @@ const MaxBSONSize = db.MaxBSONSize
var terminatorBytes = []byte{0xFF, 0xFF, 0xFF, 0xFF}

type (
NSFilterFn func(ns string) bool
// NSFilterFn checks whether a namespace is selected for backup.
// Useful when only some namespaces are selected for backup.
NSFilterFn func(ns string) bool

// DocFilter checks whether a document is selected for backup.
// Useful when only some documents are selected for backup.
DocFilterFn func(ns string, d bson.Raw) bool
)

func DefaultNSFilter(string) bool { return true }

func DefaultDocFilter(string, bson.Raw) bool { return true }

func Decompose(r io.Reader, newWriter NewWriter, nsFilter NSFilterFn, docFilter DocFilterFn) error {
meta, err := readPrelude(r)
if err != nil {
return errors.Wrap(err, "prelude")
}

if nsFilter == nil {
nsFilter = DefaultNSFilter
}
if docFilter == nil {
docFilter = DefaultDocFilter
}

c := newConsumer(newWriter, nsFilter, docFilter)
if err := (&archive.Parser{In: r}).ReadAllBlocks(c); err != nil {
return errors.Wrap(err, "archive parser")
}

// save metadata for selected namespaces only
nss := make([]*Namespace, 0, len(meta.Namespaces))
for _, n := range meta.Namespaces {
ns := NSify(n.Database, n.Collection)
if !nsFilter(ns) {
continue
}

n.CRC = c.crc[ns]
n.Size = c.size[ns]
nss = append(nss, n)
}
meta.Namespaces = nss

err = writeMetadata(meta, newWriter)
return errors.Wrap(err, "metadata")
}

func Compose(w io.Writer, newReader NewReader, nsFilter NSFilterFn, concurrency int) error {
meta, err := readMetadata(newReader)
if err != nil {
Expand Down Expand Up @@ -113,22 +82,6 @@ func Compose(w io.Writer, newReader NewReader, nsFilter NSFilterFn, concurrency
return errors.Wrap(err, "write namespaces")
}

func readPrelude(r io.Reader) (*archiveMeta, error) {
prelude := archive.Prelude{}
err := prelude.Read(r)
if err != nil {
return nil, errors.Wrap(err, "read")
}

m := &archiveMeta{Header: prelude.Header}
m.Namespaces = make([]*Namespace, len(prelude.NamespaceMetadatas))
for i, n := range prelude.NamespaceMetadatas {
m.Namespaces[i] = &Namespace{CollectionMetadata: n}
}

return m, nil
}

func writePrelude(w io.Writer, m *archiveMeta) error {
prelude := archive.Prelude{Header: m.Header}
prelude.NamespaceMetadatas = make([]*archive.CollectionMetadata, len(m.Namespaces))
Expand Down Expand Up @@ -290,21 +243,6 @@ func closeChunk(w io.Writer, ns *Namespace) error {
return errors.Wrap(err, "terminator")
}

func writeMetadata(meta *archiveMeta, newWriter NewWriter) error {
w, err := newWriter(MetaFile)
if err != nil {
return errors.Wrap(err, "new writer")
}
defer w.Close()

data, err := bson.MarshalExtJSONIndent(meta, true, true, "", "\t")
if err != nil {
return errors.Wrap(err, "marshal")
}

return SecureWrite(w, data)
}

func readMetadata(newReader NewReader) (*archiveMeta, error) {
r, err := newReader(MetaFile)
if err != nil {
Expand All @@ -326,106 +264,6 @@ func ReadMetadata(r io.Reader) (*archiveMeta, error) {
return meta, errors.Wrap(err, "unmarshal")
}

type consumer struct {
open NewWriter
nsFilter NSFilterFn
docFilter DocFilterFn
nss map[string]io.WriteCloser
crc map[string]int64
size map[string]int64
curr string
}

func newConsumer(newWriter NewWriter, nsFilter NSFilterFn, docFilter DocFilterFn) *consumer {
return &consumer{
open: newWriter,
nsFilter: nsFilter,
docFilter: docFilter,
nss: make(map[string]io.WriteCloser),
crc: make(map[string]int64),
size: make(map[string]int64),
}
}

func (c *consumer) HeaderBSON(data []byte) error {
h := &archive.NamespaceHeader{}
if err := bson.Unmarshal(data, h); err != nil {
return errors.Wrap(err, "unmarshal")
}

ns := NSify(h.Database, h.Collection)
if !c.nsFilter(ns) {
// non-selected namespace. ignore
c.curr = ""
return nil
}

if !h.EOF {
c.curr = ns
return nil
}

c.crc[ns] = h.CRC

w := c.nss[ns]
if w == nil {
return nil
}

delete(c.nss, ns)
return errors.Wrap(w.Close(), "close")
}

func (c *consumer) BodyBSON(data []byte) error {
ns := c.curr
if ns == "" {
// ignored ns. skip data loading/reading
return nil
}
if !c.docFilter(ns, bson.Raw(data)) {
// ignored doc. skip data loading/reading
return nil
}

w := c.nss[ns]
if w == nil {
var err error
w, err = c.open(ns)
if err != nil {
return errors.Wrapf(err, "open: %q", ns)
}

c.nss[ns] = w
}

c.size[ns] += int64(len(data))
return errors.Wrapf(SecureWrite(w, data), "%q", ns)
}

func (c *consumer) End() error {
errs := []error{}

wg := &sync.WaitGroup{}
mu := &sync.Mutex{}

wg.Add(len(c.nss))
for ns, w := range c.nss {
go func() {
defer wg.Done()

err := w.Close()
if err != nil {
mu.Lock()
errs = append(errs, errors.Wrapf(err, "close: %q", ns))
mu.Unlock()
}
}()
}

wg.Wait()
return errors.Join(errs...)
}

func SecureWrite(w io.Writer, data []byte) error {
n, err := w.Write(data)
if err != nil {
Expand Down
Loading
Loading