diff --git a/v2/internal/io/offset_write_seeker.go b/v2/internal/io/offset_write_seeker.go index 7e0f6ba5..afee7f40 100644 --- a/v2/internal/io/offset_write_seeker.go +++ b/v2/internal/io/offset_write_seeker.go @@ -1,6 +1,11 @@ package io -import "io" +import ( + "errors" + "io" +) + +var ErrUnsupported = errors.New("unsupported seek operation") var ( _ io.Writer = (*OffsetWriteSeeker)(nil) @@ -30,7 +35,7 @@ func (ow *OffsetWriteSeeker) Seek(offset int64, whence int) (int64, error) { case io.SeekCurrent: ow.offset += offset case io.SeekEnd: - panic("unsupported whence: SeekEnd") + return 0, ErrUnsupported } return ow.Position(), nil } diff --git a/v2/internal/io/skip_writer_read_seeker.go b/v2/internal/io/skip_writer_read_seeker.go new file mode 100644 index 00000000..eb8ed2c2 --- /dev/null +++ b/v2/internal/io/skip_writer_read_seeker.go @@ -0,0 +1,115 @@ +package io + +import ( + "context" + "errors" + "fmt" + "io" +) + +// SkipWriterReaderSeeker wraps a factory producing a writer with a ReadSeeker implementation. +// Note that Read and Seek are not thread-safe, they must not be called +// concurrently. +type SkipWriterReaderSeeker struct { + parentCtx context.Context + offset uint64 + size uint64 + + cons ReWriter + reader *io.PipeReader + writeCancel context.CancelFunc + writeComplete chan struct{} +} + +// ReWriter is a function writing to an io.Writer from an offset. +type ReWriter func(ctx context.Context, skip uint64, w io.Writer) (uint64, error) + +var _ io.ReadSeeker = (*SkipWriterReaderSeeker)(nil) +var _ io.Closer = (*SkipWriterReaderSeeker)(nil) + +// NewSkipWriterReaderSeeker creates an io.ReadSeeker around a ReWriter. +func NewSkipWriterReaderSeeker(ctx context.Context, size uint64, cons ReWriter) *SkipWriterReaderSeeker { + return &SkipWriterReaderSeeker{ + parentCtx: ctx, + size: size, + cons: cons, + writeComplete: make(chan struct{}, 1), + } +} + +// Note: not threadsafe +func (c *SkipWriterReaderSeeker) Read(p []byte) (int, error) { + // Check if there's already a write in progress + if c.reader == nil { + // No write in progress, start a new write from the current offset + // in a go routine and feed it back to the caller via a pipe + writeCtx, writeCancel := context.WithCancel(c.parentCtx) + c.writeCancel = writeCancel + pr, pw := io.Pipe() + c.reader = pr + + go func() { + amnt, err := c.cons(writeCtx, c.offset, pw) + c.offset += amnt + if err != nil && !errors.Is(err, context.Canceled) { + pw.CloseWithError(err) + } else { + pw.Close() + } + c.writeComplete <- struct{}{} + }() + } + + return c.reader.Read(p) +} + +// Note: not threadsafe +func (c *SkipWriterReaderSeeker) Seek(offset int64, whence int) (int64, error) { + // Update the offset + switch whence { + case io.SeekStart: + if offset < 0 { + return 0, fmt.Errorf("invalid offset %d from start: must be positive", offset) + } + c.offset = uint64(offset) + case io.SeekCurrent: + if int64(c.offset)+offset < 0 { + return 0, fmt.Errorf("invalid offset %d from current %d: resulting offset is negative", offset, c.offset) + } + c.offset = uint64((int64(c.offset) + offset)) + case io.SeekEnd: + if c.size == 0 { + return 0, ErrUnsupported + + } + if int64(c.size)+offset < 0 { + return 0, fmt.Errorf("invalid offset %d from end: larger than total size %d", offset, c.size) + } + c.offset = uint64(int64(c.size) + offset) + } + + // Cancel any ongoing write and wait for it to complete + // TODO: if we're fast-forwarding with 'SeekCurrent', we may be able to read from the current reader instead. + if c.reader != nil { + err := c.Close() + c.reader = nil + if err != nil { + return 0, err + } + } + + return int64(c.offset), nil +} + +func (c *SkipWriterReaderSeeker) Close() error { + c.writeCancel() + // Seek and Read should not be called concurrently so this is safe + c.reader.Close() + + select { + case <-c.parentCtx.Done(): + return c.parentCtx.Err() + case <-c.writeComplete: + } + return nil +} diff --git a/v2/internal/loader/counting_loader.go b/v2/internal/loader/counting_loader.go index e428993e..9adaee0d 100644 --- a/v2/internal/loader/counting_loader.go +++ b/v2/internal/loader/counting_loader.go @@ -9,13 +9,13 @@ import ( "github.com/multiformats/go-varint" ) -// counter tracks how much data has been read. -type counter struct { - totalRead uint64 +// Counter tracks how much data has been read. +type Counter struct { + TotalRead uint64 } -func (c *counter) Size() uint64 { - return c.totalRead +func (c *Counter) Size() uint64 { + return c.TotalRead } // ReadCounter provides an externally consumable interface to the @@ -26,12 +26,12 @@ type ReadCounter interface { type countingReader struct { r io.Reader - c *counter + c *Counter } func (c *countingReader) Read(p []byte) (int, error) { n, err := c.r.Read(p) - c.c.totalRead += uint64(n) + c.c.TotalRead += uint64(n) return n, err } @@ -41,7 +41,7 @@ func (c *countingReader) Read(p []byte) (int, error) { // appear in a CAR file is added to the counter (included the size of the // CID and the varint length for the block data). func CountingLinkSystem(ls ipld.LinkSystem) (ipld.LinkSystem, ReadCounter) { - c := counter{} + c := Counter{} clc := ls clc.StorageReadOpener = func(lc linking.LinkContext, l ipld.Link) (io.Reader, error) { r, err := ls.StorageReadOpener(lc, l) @@ -54,7 +54,7 @@ func CountingLinkSystem(ls ipld.LinkSystem) (ipld.LinkSystem, ReadCounter) { return nil, err } size := varint.ToUvarint(uint64(n) + uint64(len(l.Binary()))) - c.totalRead += uint64(len(size)) + uint64(len(l.Binary())) + c.TotalRead += uint64(len(size)) + uint64(len(l.Binary())) return &countingReader{buf, &c}, nil } return clc, &c diff --git a/v2/internal/loader/writing_loader.go b/v2/internal/loader/writing_loader.go index b4d0e6ef..c3a93083 100644 --- a/v2/internal/loader/writing_loader.go +++ b/v2/internal/loader/writing_loader.go @@ -12,22 +12,25 @@ import ( "github.com/multiformats/go-varint" ) -type writerOutput struct { - w io.Writer - size uint64 - code multicodec.Code - rcrds map[cid.Cid]index.Record +// indexingWriter wraps an io.Writer with metadata of the index of the car written to it. +type indexingWriter struct { + w io.Writer + size uint64 + toSkip uint64 + code multicodec.Code + rcrds map[cid.Cid]index.Record } -func (w *writerOutput) Size() uint64 { +func (w *indexingWriter) Size() uint64 { return w.size } -func (w *writerOutput) Index() (index.Index, error) { +func (w *indexingWriter) Index() (index.Index, error) { idx, err := index.New(w.code) if err != nil { return nil, err } + // todo: maybe keep both a map and a list proactively for efficiency here. rcrds := make([]index.Record, 0, len(w.rcrds)) for _, r := range w.rcrds { rcrds = append(rcrds, r) @@ -46,38 +49,86 @@ type IndexTracker interface { Index() (index.Index, error) } +var _ IndexTracker = (*indexingWriter)(nil) + +// writingReader is used on a per-block basis for the TeeingLinkSystem's StorageReadOpener, we use it +// to intercept block reads and construct CAR section output for that block, passing that section data to +// indexingWriter, while also passing the plain binary block data back to the LinkSystem caller (which +// we expect to be a traversal operation). +// Additionally, if we are performing a "skip" of initial bytes for this CAR, we track the byte count as we +// construct the CAR section data and decide when and how much to write out to the indexingWriter. +// Skip operations don't impact the downstream LinkSystem user (traversal), but they do impact what's +// written out via the indexingWriter. type writingReader struct { r io.Reader - len int64 + buf []byte cid string - wo *writerOutput + iw *indexingWriter } func (w *writingReader) Read(p []byte) (int, error) { - if w.wo != nil { + if w.iw != nil { + // build the buffer of size:cid:block if we don't have it yet. + buf := bytes.NewBuffer(nil) + // allocate space for size + _, err := buf.Write(make([]byte, varint.MaxLenUvarint63)) + if err != nil { + return 0, err + } // write the cid - size := varint.ToUvarint(uint64(w.len) + uint64(len(w.cid))) - if _, err := w.wo.w.Write(size); err != nil { + if _, err := buf.Write([]byte(w.cid)); err != nil { return 0, err } - if _, err := w.wo.w.Write([]byte(w.cid)); err != nil { + // write the block + n, err := io.Copy(buf, w.r) + if err != nil { return 0, err } - cpy := bytes.NewBuffer(w.r.(*bytes.Buffer).Bytes()) - if _, err := cpy.WriteTo(w.wo.w); err != nil { + // write the varint size prefix and trim the unneeded prefix padding we allocated + sizeBytes := varint.ToUvarint(uint64(n) + uint64(len(w.cid))) + writeBuf := buf.Bytes()[varint.MaxLenUvarint63-len(sizeBytes):] + w.buf = buf.Bytes()[varint.MaxLenUvarint63+len(w.cid):] + _ = copy(writeBuf[:], sizeBytes) + + size := len(writeBuf) + // indexingWriter manages state for a skip operation, but we have to mutate it here - + // if there are still bytes to skip, then we either need to skip over this whole block, or pass + // part of it on, and then update the toSkip state + if w.iw.toSkip > 0 { + if w.iw.toSkip >= uint64(len(writeBuf)) { + w.iw.toSkip -= uint64(len(writeBuf)) + // will cause the WriteTo() below to be a noop, we need to skip this entire block + writeBuf = []byte{} + } else { + writeBuf = writeBuf[w.iw.toSkip:] + w.iw.toSkip = 0 + } + } + + if _, err := bytes.NewBuffer(writeBuf).WriteTo(w.iw.w); err != nil { return 0, err } _, c, err := cid.CidFromBytes([]byte(w.cid)) if err != nil { return 0, err } - w.wo.rcrds[c] = index.Record{ + w.iw.rcrds[c] = index.Record{ Cid: c, - Offset: w.wo.size, + Offset: w.iw.size, } - w.wo.size += uint64(w.len) + uint64(len(size)+len(w.cid)) + w.iw.size += uint64(size) + w.iw = nil + } - w.wo = nil + if w.buf != nil { + // we've already read the block from the parent reader for writing the CAR block section (above), + // so we need to pass those bytes on in whatever chunk size the caller wants + n, err := bytes.NewBuffer(w.buf).Read(p) + if err != nil { + return n, err + } + w.buf = w.buf[n:] + return n, err } return w.r.Read(p) @@ -89,12 +140,13 @@ func (w *writingReader) Read(p []byte) (int, error) { // The `initialOffset` is used to calculate the offsets recorded for the index, and will be // included in the `.Size()` of the IndexTracker. // An indexCodec of `index.CarIndexNoIndex` can be used to not track these offsets. -func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, indexCodec multicodec.Code) (ipld.LinkSystem, IndexTracker) { - wo := writerOutput{ - w: w, - size: initialOffset, - code: indexCodec, - rcrds: make(map[cid.Cid]index.Record), +func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, skip uint64, indexCodec multicodec.Code) (ipld.LinkSystem, IndexTracker) { + iw := indexingWriter{ + w: w, + size: initialOffset, + toSkip: skip, + code: indexCodec, + rcrds: make(map[cid.Cid]index.Record), } tls := ls @@ -105,7 +157,7 @@ func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, ind } // if we've already read this cid in this session, don't re-write it. - if _, ok := wo.rcrds[c]; ok { + if _, ok := iw.rcrds[c]; ok { return ls.StorageReadOpener(lc, l) } @@ -113,12 +165,8 @@ func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, ind if err != nil { return nil, err } - buf := bytes.NewBuffer(nil) - n, err := buf.ReadFrom(r) - if err != nil { - return nil, err - } - return &writingReader{buf, n, l.Binary(), &wo}, nil + + return &writingReader{r, nil, l.Binary(), &iw}, nil } - return tls, &wo + return tls, &iw } diff --git a/v2/options.go b/v2/options.go index d2e526c4..4ccd289d 100644 --- a/v2/options.go +++ b/v2/options.go @@ -60,6 +60,8 @@ type Options struct { MaxTraversalLinks uint64 WriteAsCarV1 bool TraversalPrototypeChooser traversal.LinkTargetNodePrototypeChooser + DataPayloadSize uint64 + SkipOffset uint64 MaxAllowedHeaderSize uint64 MaxAllowedSectionSize uint64 @@ -97,6 +99,14 @@ func ZeroLengthSectionAsEOF(enable bool) Option { } } +// WithSkipOffset sets the start offset we should seek to the in the traversal +// when writing out a CAR. This option only applies to the selective and traversal writer. +func WithSkipOffset(skip uint64) Option { + return func(o *Options) { + o.SkipOffset = skip + } +} + // UseDataPadding sets the padding to be added between CARv2 header and its data payload on Finalize. func UseDataPadding(p uint64) Option { return func(o *Options) { diff --git a/v2/selective.go b/v2/selective.go index 39bb5f91..f1d0932f 100644 --- a/v2/selective.go +++ b/v2/selective.go @@ -1,6 +1,7 @@ package car import ( + "bytes" "context" "fmt" "io" @@ -11,7 +12,9 @@ import ( "github.com/ipfs/go-cid" "github.com/ipld/go-car/v2/index" "github.com/ipld/go-car/v2/internal/carv1" + ioint "github.com/ipld/go-car/v2/internal/io" "github.com/ipld/go-car/v2/internal/loader" + resumetraversal "github.com/ipld/go-car/v2/traversal" ipld "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/linking" @@ -40,40 +43,67 @@ func MaxTraversalLinks(MaxTraversalLinks uint64) Option { } } +// WithDataPayloadSize sets the expected v1 size of the car being written if it is known in advance. +// This is required if NewCarV1StreamReader() is used and a Seek() operation needs seek back from +// SeekEnd (i.e. if we don't know where the end is, we can't figure out how far that is from the start). +// It can also be used to validate the expected size of a CAR's data payload if it's known in advance. In +// that case, a selective CAR creation operation will return an ErrSizeMismatch if the actual size doesn't +// match the expected set with this option. +func WithDataPayloadSize(size uint64) Option { + return func(sco *Options) { + sco.DataPayloadSize = size + } +} + // NewSelectiveWriter walks through the proposed dag traversal to learn its total size in order to be able to // stream out a car to a writer in the expected traversal order in one go. func NewSelectiveWriter(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (Writer, error) { - cls, cntr := loader.CountingLinkSystem(*ls) - - c1h := carv1.CarHeader{Roots: []cid.Cid{root}, Version: 1} - headSize, err := carv1.HeaderSize(&c1h) - if err != nil { - return nil, err - } - if err := traverse(ctx, &cls, root, selector, ApplyOptions(opts...)); err != nil { - return nil, err + conf := ApplyOptions(opts...) + if conf.DataPayloadSize != 0 { + return &traversalCar{ + size: conf.DataPayloadSize, + ctx: ctx, + root: root, + selector: selector, + ls: ls, + opts: ApplyOptions(opts...), + }, nil } tc := traversalCar{ - size: headSize + cntr.Size(), + //size: headSize + cntr.Size(), ctx: ctx, root: root, selector: selector, ls: ls, opts: ApplyOptions(opts...), } + if err := tc.setup(ctx, ls, ApplyOptions(opts...)); err != nil { + return nil, err + } + + c1h := carv1.CarHeader{Roots: []cid.Cid{root}, Version: 1} + headSize, err := carv1.HeaderSize(&c1h) + if err != nil { + return nil, err + } + if err := tc.traverse(root, selector); err != nil { + return nil, err + } + tc.size = headSize + tc.resumer.Position() return &tc, nil } // TraverseToFile writes a car file matching a given root and selector to the // path at `destination` using one read of each block. func TraverseToFile(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, destination string, opts ...Option) error { + conf := ApplyOptions(opts...) tc := traversalCar{ - size: 0, + size: conf.DataPayloadSize, ctx: ctx, root: root, selector: selector, ls: ls, - opts: ApplyOptions(opts...), + opts: conf, } fp, err := os.Create(destination) @@ -103,19 +133,49 @@ func TraverseToFile(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, sele // TraverseV1 walks through the proposed dag traversal and writes a carv1 to the provided io.Writer func TraverseV1(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, writer io.Writer, opts ...Option) (uint64, error) { opts = append(opts, WithoutIndex()) + conf := ApplyOptions(opts...) tc := traversalCar{ - size: 0, + size: conf.DataPayloadSize, ctx: ctx, root: root, selector: selector, ls: ls, - opts: ApplyOptions(opts...), + opts: conf, } - len, _, err := tc.WriteV1(writer) + len, _, err := tc.WriteV1(tc.ctx, conf.SkipOffset, writer) return len, err } +// NewSelectiveV1Reader creates an io.ReadSeeker that can be used to stream a +// CARv1 given a LinkSystem, root CID and a selector. If Seek() is used, the +// output will only be given from that point in the resulting CAR. Where the +// size of the CAR is known ahead of time and provided via the +// WithDataPayloadSize option, seeking from the end of the CAR is permissible. +func NewSelectiveV1Reader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) { + opts = append(opts, WithoutIndex()) + conf := ApplyOptions(opts...) + tc := traversalCar{ + size: conf.DataPayloadSize, + ctx: ctx, + root: root, + selector: selector, + ls: ls, + opts: conf, + } + rwf := func(ctx context.Context, offset uint64, writer io.Writer) (uint64, error) { + // it's only at this point we have the `offset` to start writing at since the user of the + // ReadSeeker will (may) have called Seek() and we've worked out where in the CAR + // that is as an offset. Now we can start writing the CARv1 data, which will be passed + // on to the ReadSeeker. + // Note that we're inside a goroutine here + s, _, err := tc.WriteV1(ctx, offset, writer) + return s, err + } + rw := ioint.NewSkipWriterReaderSeeker(ctx, conf.DataPayloadSize, rwf) + return rw, nil +} + // Writer is an interface allowing writing a car prepared by PrepareTraversal type Writer interface { io.WriterTo @@ -130,6 +190,8 @@ type traversalCar struct { selector ipld.Node ls *ipld.LinkSystem opts Options + progress *traversal.Progress + resumer resumetraversal.TraverseResumer } func (tc *traversalCar) WriteTo(w io.Writer) (int64, error) { @@ -137,7 +199,7 @@ func (tc *traversalCar) WriteTo(w io.Writer) (int64, error) { if err != nil { return n, err } - v1s, idx, err := tc.WriteV1(w) + v1s, idx, err := tc.WriteV1(tc.ctx, 0, w) n += int64(v1s) if err != nil { @@ -202,21 +264,38 @@ func (tc *traversalCar) WriteV2Header(w io.Writer) (int64, error) { return hn, nil } -func (tc *traversalCar) WriteV1(w io.Writer) (uint64, index.Index, error) { +// WriteV1 writes a v1 car to the writer, w, except for the first `skip` bytes. +// Returns bytes written, an index of what was written, or error if one occured. +func (tc *traversalCar) WriteV1(ctx context.Context, skip uint64, w io.Writer) (uint64, index.Index, error) { + written := uint64(0) + // write the v1 header c1h := carv1.CarHeader{Roots: []cid.Cid{tc.root}, Version: 1} - if err := carv1.WriteHeader(&c1h, w); err != nil { - return 0, nil, err - } v1Size, err := carv1.HeaderSize(&c1h) if err != nil { - return v1Size, nil, err + return 0, nil, err + } + if skip < v1Size { + buf := bytes.NewBuffer(nil) + if err := carv1.WriteHeader(&c1h, buf); err != nil { + return 0, nil, err + } + if _, err := w.Write(buf.Bytes()[skip:]); err != nil { + return 0, nil, err + } + written = v1Size - skip + skip = 0 + } else { + skip -= v1Size } - // write the block. - wls, writer := loader.TeeingLinkSystem(*tc.ls, w, v1Size, tc.opts.IndexCodec) - err = traverse(tc.ctx, &wls, tc.root, tc.selector, tc.opts) - v1Size = writer.Size() + // write the blocks + wls, writer := loader.TeeingLinkSystem(*tc.ls, w, v1Size, skip, tc.opts.IndexCodec) + if err = tc.setup(ctx, &wls, tc.opts); err != nil { + return v1Size, nil, err + } + err = tc.traverse(tc.root, tc.selector) + v1Size = writer.Size() - v1Size + written if err != nil { return v1Size, nil, err } @@ -232,12 +311,7 @@ func (tc *traversalCar) WriteV1(w io.Writer) (uint64, index.Index, error) { return v1Size, idx, err } -func traverse(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, s ipld.Node, opts Options) error { - sel, err := selector.CompileSelector(s) - if err != nil { - return err - } - +func (tc *traversalCar) setup(ctx context.Context, ls *ipld.LinkSystem, opts Options) error { chooser := func(_ ipld.Link, _ linking.LinkContext) (ipld.NodePrototype, error) { return basicnode.Prototype.Any, nil } @@ -260,17 +334,31 @@ func traverse(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, s ipld.Nod } } - lnk := cidlink.Link{Cid: root} ls.TrustedStorage = true - rp, err := chooser(lnk, ipld.LinkContext{}) + resumer, err := resumetraversal.WithTraversingLinksystem(&progress) + if err != nil { + return err + } + tc.progress = &progress + tc.resumer = resumer + return nil +} + +func (tc *traversalCar) traverse(root cid.Cid, s ipld.Node) error { + sel, err := selector.CompileSelector(s) + if err != nil { + return err + } + lnk := cidlink.Link{Cid: root} + rp, err := tc.progress.Cfg.LinkTargetNodePrototypeChooser(lnk, ipld.LinkContext{}) if err != nil { return err } - rootNode, err := ls.Load(ipld.LinkContext{}, lnk, rp) + rootNode, err := tc.progress.Cfg.LinkSystem.Load(ipld.LinkContext{}, lnk, rp) if err != nil { return fmt.Errorf("root blk load failed: %s", err) } - err = progress.WalkMatching(rootNode, sel, func(_ traversal.Progress, node ipld.Node) error { + err = tc.progress.WalkMatching(rootNode, sel, func(_ traversal.Progress, node ipld.Node) error { if lbn, ok := node.(datamodel.LargeBytesNode); ok { s, err := lbn.AsLargeBytes() if err != nil { diff --git a/v2/selective_test.go b/v2/selective_test.go index 924351d4..a387e890 100644 --- a/v2/selective_test.go +++ b/v2/selective_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "math/rand" "os" "path" "testing" @@ -135,3 +136,42 @@ func TestPartialTraversal(t *testing.T) { } require.Equal(t, 2, len(fnd)) } + +func TestOffsetWrites(t *testing.T) { + store := cidlink.Memory{Bag: make(map[string][]byte)} + ls := cidlink.DefaultLinkSystem() + ls.StorageReadOpener = store.OpenRead + ls.StorageWriteOpener = store.OpenWrite + unixfsnode.AddUnixFSReificationToLinkSystem(&ls) + + // write a unixfs file. + initBuf := bytes.Buffer{} + data := make([]byte, 1000000) + _, _ = rand.Read(data) + _, _ = initBuf.Write(data) + rootCid, _, err := builder.BuildUnixFSFile(&initBuf, "", &ls) + require.NoError(t, err) + _, rts, err := cid.CidFromBytes([]byte(rootCid.Binary())) + require.NoError(t, err) + + // get the full car buffer. + fullBuf := bytes.Buffer{} + _, err = car.TraverseV1(context.Background(), &ls, rts, selectorparse.CommonSelector_ExploreAllRecursively, &fullBuf) + require.NoError(t, err) + + for i := uint64(1); i < 1000; i += 1 { + buf := bytes.Buffer{} + + _, err := car.TraverseV1(context.Background(), &ls, rts, selectorparse.CommonSelector_ExploreAllRecursively, &buf, car.WithSkipOffset(i)) + require.NoError(t, err) + require.Equal(t, fullBuf.Bytes()[i:], buf.Bytes()) + } + + for i := uint64(1000); i < 1000000; i += 1000 { + buf := bytes.Buffer{} + + _, err := car.TraverseV1(context.Background(), &ls, rts, selectorparse.CommonSelector_ExploreAllRecursively, &buf, car.WithSkipOffset(i)) + require.NoError(t, err) + require.Equal(t, fullBuf.Bytes()[i:], buf.Bytes()) + } +} diff --git a/v2/traversal/resumption.go b/v2/traversal/resumption.go new file mode 100644 index 00000000..d88eead8 --- /dev/null +++ b/v2/traversal/resumption.go @@ -0,0 +1,257 @@ +package traversal + +// Resumption is an extension to an ipld traversal Progress struct that tracks the tree of the dag as it is discovered. +// For each link, it tracks the offset that node would appear at from the beginning of the traversal, if the traversal +// were to be serialized in a car format (e.g. [size || cid || block]*, no car header offset is included) +// It can then resume the traversal based on either a path within the traversal, or a car offset. + +import ( + "fmt" + "io" + "math" + + "github.com/ipld/go-car/v2/internal/loader" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/linking" + "github.com/ipld/go-ipld-prime/traversal" +) + +type pathNode struct { + link datamodel.Link + offset uint64 + children map[datamodel.PathSegment]*pathNode +} + +func newPath(link datamodel.Link, at uint64) *pathNode { + return &pathNode{ + link: link, + offset: at, + children: make(map[datamodel.PathSegment]*pathNode), + } +} + +func (pn pathNode) addPath(p []datamodel.PathSegment, link datamodel.Link, at uint64) { + if len(p) == 0 { + return + } + if _, ok := pn.children[p[0]]; !ok { + child := newPath(link, at) + pn.children[p[0]] = child + } + pn.children[p[0]].addPath(p[1:], link, at) +} + +func (pn pathNode) allLinks() []datamodel.Link { + if len(pn.children) == 0 { + return []datamodel.Link{pn.link} + } + links := make([]datamodel.Link, 0) + if pn.link != nil { + links = append(links, pn.link) + } + for _, v := range pn.children { + links = append(links, v.allLinks()...) + } + return links +} + +// getPaths returns reconstructed paths in the tree rooted at 'root' +func (pn pathNode) getLinks(root datamodel.Path) []datamodel.Link { + segs := root.Segments() + switch len(segs) { + case 0: + if pn.link != nil { + return []datamodel.Link{pn.link} + } + return []datamodel.Link{} + case 1: + // base case 1: get all paths below this child. + next := segs[0] + if child, ok := pn.children[next]; ok { + return child.allLinks() + } + return []datamodel.Link{} + default: + } + + next := segs[0] + if _, ok := pn.children[next]; !ok { + // base case 2: not registered sub-path. + return []datamodel.Link{} + } + return pn.children[next].getLinks(datamodel.NewPathNocopy(segs[1:])) +} + +var errInvalid = fmt.Errorf("invalid path") + +func (pn pathNode) offsetAfter(root datamodel.Path) (uint64, error) { + // we look for offset of next sibling. + // if no next sibling recurse up the path segments until we find a next sibling. + segs := root.Segments() + if len(segs) == 0 { + return 0, errInvalid + } + // see if this path is a child. + chld, ok := pn.children[segs[0]] + if !ok { + return 0, errInvalid + } + closest := chld.offset + // try recursive path + if len(segs) > 1 { + co, err := chld.offsetAfter(datamodel.NewPathNocopy(segs[1:])) + if err == nil { + return co, err + } + } + // find our next sibling + var next uint64 = math.MaxUint64 + var nc *pathNode + for _, v := range pn.children { + if v.offset > closest && v.offset < next { + next = v.offset + nc = v + } + } + if nc != nil { + return nc.offset, nil + } + + return 0, errInvalid +} + +// TraverseResumer allows resuming a progress from a previously encountered path in the selector. +type TraverseResumer interface { + RewindToPath(from datamodel.Path) error + RewindToOffset(offset uint64) error + Position() uint64 +} + +type traversalState struct { + wrappedLinksystem *linking.LinkSystem + lsCounter *loader.Counter + blockNumber int + pathOrder map[int]datamodel.Path + pathTree *pathNode + rewindPathTarget *datamodel.Path + rewindOffsetTarget uint64 + pendingBlockStart uint64 // on rewinds, we store where the counter was in order to know the length of the last read block. + progress *traversal.Progress +} + +func (ts *traversalState) RewindToPath(from datamodel.Path) error { + if ts.progress == nil { + return nil + } + // reset progress and traverse until target. + ts.progress.SeenLinks = make(map[datamodel.Link]struct{}) + ts.blockNumber = 0 + ts.pendingBlockStart = ts.lsCounter.Size() + ts.lsCounter.TotalRead = 0 + ts.rewindPathTarget = &from + return nil +} + +func (ts *traversalState) RewindToOffset(offset uint64) error { + if ts.progress == nil { + return nil + } + // no-op + if ts.lsCounter.Size() == offset { + return nil + } + // reset progress and traverse until target. + ts.progress.SeenLinks = make(map[datamodel.Link]struct{}) + ts.blockNumber = 0 + ts.pendingBlockStart = ts.lsCounter.Size() + ts.lsCounter.TotalRead = 0 + ts.rewindOffsetTarget = offset + return nil +} + +func (ts *traversalState) Position() uint64 { + return ts.lsCounter.Size() +} + +func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Reader, error) { + // when not in replay mode, we track metadata + if ts.rewindPathTarget == nil && ts.rewindOffsetTarget == 0 { + ts.pathOrder[ts.blockNumber] = lc.LinkPath + ts.pathTree.addPath(lc.LinkPath.Segments(), l, ts.lsCounter.Size()) + ts.blockNumber++ + return ts.wrappedLinksystem.StorageReadOpener(lc, l) + } + + // if we reach the target, we exit replay mode (by removing target) + if ts.rewindPathTarget != nil && lc.LinkPath.String() == ts.rewindPathTarget.String() { + ts.rewindPathTarget = nil + return ts.wrappedLinksystem.StorageReadOpener(lc, l) + } + + // if we're at the rewind offset target, we exit replay mode + if ts.rewindOffsetTarget != 0 && ts.lsCounter.Size() >= ts.rewindOffsetTarget { + ts.rewindOffsetTarget = 0 + return ts.wrappedLinksystem.StorageReadOpener(lc, l) + } + + // when replaying path, we skip links not of our direct ancestor, + // and add all links on the path under them as 'seen' + if ts.rewindPathTarget != nil { + targetSegments := ts.rewindPathTarget.Segments() + seg := lc.LinkPath.Segments() + for i, s := range seg { + if i >= len(targetSegments) { + break + } + if targetSegments[i].String() != s.String() { + links := ts.pathTree.getLinks(datamodel.NewPathNocopy(seg[0 : i+1])) + for _, l := range links { + ts.progress.SeenLinks[l] = struct{}{} + } + var err error + ts.lsCounter.TotalRead, err = ts.pathTree.offsetAfter(datamodel.NewPathNocopy(seg[0 : i+1])) + if err == errInvalid { + ts.lsCounter.TotalRead = ts.pendingBlockStart + } else if err != nil { + // total read is now invalid, sadly + return nil, err + } + return nil, traversal.SkipMe{} + } + } + } + if ts.rewindOffsetTarget != 0 { + links := ts.pathTree.getLinks(lc.LinkPath) + for _, l := range links { + ts.progress.SeenLinks[l] = struct{}{} + } + var err error + ts.lsCounter.TotalRead, err = ts.pathTree.offsetAfter(lc.LinkPath) + if err == errInvalid { + ts.lsCounter.TotalRead = ts.pendingBlockStart + } else if err != nil { + return nil, err + } + return nil, traversal.SkipMe{} + } + + // descend. + return ts.wrappedLinksystem.StorageReadOpener(lc, l) +} + +// WithTraversingLinksystem extends a progress for traversal such that it can +// subsequently resume and perform subsets of the walk efficiently from +// an arbitrary position within the selector traversal. +func WithTraversingLinksystem(p *traversal.Progress) (TraverseResumer, error) { + wls, ctr := loader.CountingLinkSystem(p.Cfg.LinkSystem) + ts := &traversalState{ + wrappedLinksystem: &wls, + lsCounter: ctr.(*loader.Counter), + pathOrder: make(map[int]datamodel.Path), + pathTree: newPath(nil, 0), + progress: p, + } + p.Cfg.LinkSystem.StorageReadOpener = ts.traverse + return ts, nil +} diff --git a/v2/traversal/resumption_test.go b/v2/traversal/resumption_test.go new file mode 100644 index 00000000..58350b62 --- /dev/null +++ b/v2/traversal/resumption_test.go @@ -0,0 +1,241 @@ +package traversal_test + +import ( + "errors" + "testing" + + cartraversal "github.com/ipld/go-car/v2/traversal" + + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/fluent" + "github.com/ipld/go-ipld-prime/linking" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/ipld/go-ipld-prime/storage/memstore" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/ipld/go-ipld-prime/traversal/selector" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" +) + +var store = memstore.Store{} +var ( + // baguqeeyexkjwnfy + _, leafAlphaLnk = encode(basicnode.NewString("alpha")) + // baguqeeyeqvc7t3a + _, leafBetaLnk = encode(basicnode.NewString("beta")) + // baguqeeyezhlahvq + _, middleMapNodeLnk = encode(fluent.MustBuildMap(basicnode.Prototype.Map, 3, func(na fluent.MapAssembler) { + na.AssembleEntry("foo").AssignBool(true) + na.AssembleEntry("bar").AssignBool(false) + na.AssembleEntry("nested").CreateMap(2, func(na fluent.MapAssembler) { + na.AssembleEntry("alink").AssignLink(leafAlphaLnk) + na.AssembleEntry("nonlink").AssignString("zoo") + }) + })) + // baguqeeyehfkkfwa + _, middleListNodeLnk = encode(fluent.MustBuildList(basicnode.Prototype.List, 4, func(na fluent.ListAssembler) { + na.AssembleValue().AssignLink(leafAlphaLnk) + na.AssembleValue().AssignLink(leafAlphaLnk) + na.AssembleValue().AssignLink(leafBetaLnk) + na.AssembleValue().AssignLink(leafAlphaLnk) + })) + // note that using `rootNode` directly will have a different field ordering than + // the encoded form if you were to load `rootNodeLnk` due to dag-json field + // reordering on encode, beware the difference for traversal order between + // created, in-memory nodes and those that have passed through a codec with + // field ordering rules + // baguqeeyeie4ajfy + rootNode, _ = encode(fluent.MustBuildMap(basicnode.Prototype.Map, 4, func(na fluent.MapAssembler) { + na.AssembleEntry("plain").AssignString("olde string") + na.AssembleEntry("linkedString").AssignLink(leafAlphaLnk) + na.AssembleEntry("linkedMap").AssignLink(middleMapNodeLnk) + na.AssembleEntry("linkedList").AssignLink(middleListNodeLnk) + })) +) + +// encode hardcodes some encoding choices for ease of use in fixture generation; +// just gimme a link and stuff the bytes in a map. +// (also return the node again for convenient assignment.) +func encode(n datamodel.Node) (datamodel.Node, datamodel.Link) { + lp := cidlink.LinkPrototype{Prefix: cid.Prefix{ + Version: 1, + Codec: 0x0129, + MhType: 0x13, + MhLength: 4, + }} + lsys := cidlink.DefaultLinkSystem() + lsys.SetWriteStorage(&store) + + lnk, err := lsys.Store(linking.LinkContext{}, lp, n) + if err != nil { + panic(err) + } + return n, lnk +} + +func TestWalkResumeByPath(t *testing.T) { + seen := 0 + count := func(p traversal.Progress, n datamodel.Node, _ traversal.VisitReason) error { + seen++ + return nil + } + + lsys := cidlink.DefaultLinkSystem() + lsys.SetReadStorage(&store) + p := traversal.Progress{ + Cfg: &traversal.Config{ + LinkSystem: lsys, + LinkTargetNodePrototypeChooser: basicnode.Chooser, + }, + } + resumer, err := cartraversal.WithTraversingLinksystem(&p) + if err != nil { + t.Fatal(err) + } + sd := selectorparse.CommonSelector_ExploreAllRecursively + s, _ := selector.CompileSelector(sd) + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + if seen != 14 { + t.Fatalf("expected total traversal to visit 14 nodes, got %d", seen) + } + + // resume from beginning. + resumer.RewindToPath(datamodel.NewPath(nil)) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + if seen != 14 { + t.Fatalf("expected resumed traversal to visit 14 nodes, got %d", seen) + } + + // resume from middle. + resumer.RewindToPath(datamodel.NewPath([]datamodel.PathSegment{datamodel.PathSegmentOfString("linkedMap")})) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + // one less: will not visit 'linkedString' before linked map. + if seen != 13 { + t.Fatalf("expected resumed traversal to visit 13 nodes, got %d", seen) + } + + // resume from middle. + resumer.RewindToPath(datamodel.NewPath([]datamodel.PathSegment{datamodel.PathSegmentOfString("linkedList")})) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + // will not visit 'linkedString' or 'linkedMap' before linked list. + if seen != 7 { + t.Fatalf("expected resumed traversal to visit 7 nodes, got %d", seen) + } +} + +func TestWalkResumeByPathPartialWalk(t *testing.T) { + seen := 0 + limit := 0 + countUntil := func(p traversal.Progress, n datamodel.Node, _ traversal.VisitReason) error { + seen++ + if seen >= limit { + return traversal.SkipMe{} + } + return nil + } + + lsys := cidlink.DefaultLinkSystem() + lsys.SetReadStorage(&store) + p := traversal.Progress{ + Cfg: &traversal.Config{ + LinkSystem: lsys, + LinkTargetNodePrototypeChooser: basicnode.Chooser, + }, + } + resumer, err := cartraversal.WithTraversingLinksystem(&p) + if err != nil { + t.Fatal(err) + } + sd := selectorparse.CommonSelector_ExploreAllRecursively + s, _ := selector.CompileSelector(sd) + limit = 9 + if err := p.WalkAdv(rootNode, s, countUntil); !errors.Is(err, traversal.SkipMe{}) { + t.Fatal(err) + } + if seen != limit { + t.Fatalf("expected partial traversal, got %d", seen) + } + + // resume. + resumer.RewindToPath(datamodel.NewPath([]datamodel.PathSegment{datamodel.PathSegmentOfString("linkedMap")})) + seen = 0 + limit = 14 + if err := p.WalkAdv(rootNode, s, countUntil); err != nil { + t.Fatal(err) + } + if seen != 13 { + t.Fatalf("expected resumed traversal to visit 13 nodes, got %d", seen) + } +} + +func TestWalkResumeByOffset(t *testing.T) { + seen := 0 + count := func(p traversal.Progress, n datamodel.Node, _ traversal.VisitReason) error { + seen++ + return nil + } + + lsys := cidlink.DefaultLinkSystem() + lsys.SetReadStorage(&store) + p := traversal.Progress{ + Cfg: &traversal.Config{ + LinkSystem: lsys, + LinkTargetNodePrototypeChooser: basicnode.Chooser, + }, + } + resumer, err := cartraversal.WithTraversingLinksystem(&p) + if err != nil { + t.Fatal(err) + } + sd := selectorparse.CommonSelector_ExploreAllRecursively + s, _ := selector.CompileSelector(sd) + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + if seen != 14 { + t.Fatalf("expected total traversal to visit 14 nodes, got %d", seen) + } + + // resume from beginning. + resumer.RewindToOffset(0) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + if seen != 14 { + t.Fatalf("expected resumed traversal to visit 14 nodes, got %d", seen) + } + + // resume from middle. + resumer.RewindToOffset(10) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + if seen != 13 { + t.Fatalf("expected resumed traversal to visit 13 nodes, got %d", seen) + } + + // resume from middle. + resumer.RewindToOffset(50) + seen = 0 + if err := p.WalkAdv(rootNode, s, count); err != nil { + t.Fatal(err) + } + // will not visit 'linkedString' or 'linkedMap' before linked list. + if seen != 7 { + t.Fatalf("expected resumed traversal to visit 7 nodes, got %d", seen) + } +}