Skip to content

Commit 9c6c1cc

Browse files
committed
feat: CAR offset writer
1 parent 497b9e1 commit 9c6c1cc

File tree

6 files changed

+689
-4
lines changed

6 files changed

+689
-4
lines changed

v2/car_offset_writer.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package car
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"io"
8+
9+
"github.com/ipfs/go-blockservice"
10+
"github.com/ipfs/go-cid"
11+
blockstore "github.com/ipfs/go-ipfs-blockstore"
12+
offline "github.com/ipfs/go-ipfs-exchange-offline"
13+
format "github.com/ipfs/go-ipld-format"
14+
"github.com/ipfs/go-merkledag"
15+
"github.com/ipld/go-car/v2/internal/carv1"
16+
"github.com/ipld/go-car/v2/internal/carv1/util"
17+
)
18+
19+
type blockInfo struct {
20+
offset uint64
21+
// Note: size is the size of the block and metadata
22+
size uint64
23+
links []*format.Link
24+
}
25+
26+
// CarOffsetWriter turns a blockstore and a root CID into a CAR file stream,
27+
// starting from an offset
28+
type CarOffsetWriter struct {
29+
payloadCid cid.Cid
30+
nodeGetter format.NodeGetter
31+
blockInfos map[cid.Cid]*blockInfo
32+
header carv1.CarHeader
33+
}
34+
35+
func NewCarOffsetWriter(payloadCid cid.Cid, bstore blockstore.Blockstore) *CarOffsetWriter {
36+
ng := merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore)))
37+
return &CarOffsetWriter{
38+
payloadCid: payloadCid,
39+
nodeGetter: ng,
40+
blockInfos: make(map[cid.Cid]*blockInfo),
41+
header: carHeader(payloadCid),
42+
}
43+
}
44+
45+
func carHeader(payloadCid cid.Cid) carv1.CarHeader {
46+
return carv1.CarHeader{
47+
Roots: []cid.Cid{payloadCid},
48+
Version: 1,
49+
}
50+
}
51+
52+
func (s *CarOffsetWriter) Write(ctx context.Context, w io.Writer, offset uint64) error {
53+
headerSize, err := s.writeHeader(w, offset)
54+
if err != nil {
55+
return err
56+
}
57+
58+
return s.writeBlocks(ctx, w, headerSize, offset)
59+
}
60+
61+
func (s *CarOffsetWriter) writeHeader(w io.Writer, offset uint64) (uint64, error) {
62+
headerSize, err := carv1.HeaderSize(&s.header)
63+
if err != nil {
64+
return 0, fmt.Errorf("failed to size car header: %w", err)
65+
}
66+
67+
// Check if the offset from which to start writing is after the header
68+
if offset >= headerSize {
69+
return headerSize, nil
70+
}
71+
72+
// Write out the header, starting at the offset
73+
_, err = skipWrite(w, offset, func(sw io.Writer) (int, error) {
74+
return 0, carv1.WriteHeader(&s.header, sw)
75+
})
76+
if err != nil {
77+
return 0, fmt.Errorf("failed to write car header: %w", err)
78+
}
79+
80+
return headerSize, nil
81+
}
82+
83+
func (s *CarOffsetWriter) writeBlocks(ctx context.Context, w io.Writer, headerSize uint64, writeOffset uint64) error {
84+
// The first block's offset is the size of the header
85+
offset := headerSize
86+
87+
// This function gets called for each CID during the merkle DAG walk
88+
nextCid := func(ctx context.Context, c cid.Cid) ([]*format.Link, error) {
89+
// There will be an item in the cache if writeBlocks has already been
90+
// called before, and the DAG traversal reached this CID
91+
cached, ok := s.blockInfos[c]
92+
if ok {
93+
// Check if the offset from which to start writing is after this
94+
// block
95+
nextBlockOffset := cached.offset + cached.size
96+
if writeOffset >= nextBlockOffset {
97+
// The offset from which to start writing is after this block
98+
// so don't write anything, just skip over this block
99+
offset = nextBlockOffset
100+
return cached.links, nil
101+
}
102+
}
103+
104+
// Get the block from the blockstore
105+
nd, err := s.nodeGetter.Get(ctx, c)
106+
if err != nil {
107+
return nil, fmt.Errorf("getting block %s: %w", c, err)
108+
}
109+
110+
// Get the size of the block and metadata
111+
ldsize := util.LdSize(nd.Cid().Bytes(), nd.RawData())
112+
113+
// Check if the offset from which to start writing is in or before this
114+
// block
115+
nextBlockOffset := offset + ldsize
116+
if writeOffset < nextBlockOffset {
117+
// Check if the offset from which to start writing is in this block
118+
var blockWriteOffset uint64
119+
if writeOffset >= offset {
120+
blockWriteOffset = writeOffset - offset
121+
}
122+
123+
// Write the block data to the writer, starting at the block offset
124+
_, err = skipWrite(w, blockWriteOffset, func(sw io.Writer) (int, error) {
125+
return 0, util.LdWrite(sw, nd.Cid().Bytes(), nd.RawData())
126+
})
127+
if err != nil {
128+
return nil, fmt.Errorf("writing CAR block %s: %w", c, err)
129+
}
130+
}
131+
132+
// Add the block to the cache
133+
s.blockInfos[nd.Cid()] = &blockInfo{
134+
offset: offset,
135+
size: ldsize,
136+
links: nd.Links(),
137+
}
138+
139+
offset = nextBlockOffset
140+
141+
// Return any links from this block to other DAG blocks
142+
return nd.Links(), nil
143+
}
144+
145+
seen := cid.NewSet()
146+
return merkledag.Walk(ctx, nextCid, s.payloadCid, seen.Visit)
147+
}
148+
149+
// Write data to the writer, skipping the first skip bytes
150+
func skipWrite(w io.Writer, skip uint64, write func(sw io.Writer) (int, error)) (int, error) {
151+
// If there's nothing to skip, just do a normal write
152+
if skip == 0 {
153+
return write(w)
154+
}
155+
156+
// Write to a buffer
157+
var buff bytes.Buffer
158+
if count, err := write(&buff); err != nil {
159+
return count, err
160+
}
161+
162+
// Write the buffer to the writer, skipping the first skip bytes
163+
bz := buff.Bytes()
164+
if skip >= uint64(len(bz)) {
165+
return 0, nil
166+
}
167+
return w.Write(bz[skip:])
168+
}

v2/car_offset_writer_test.go

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
package car
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"io"
7+
"math/rand"
8+
"testing"
9+
10+
"github.com/ipfs/go-blockservice"
11+
"github.com/ipfs/go-cidutil"
12+
"github.com/ipfs/go-datastore"
13+
dss "github.com/ipfs/go-datastore/sync"
14+
bstore "github.com/ipfs/go-ipfs-blockstore"
15+
chunk "github.com/ipfs/go-ipfs-chunker"
16+
format "github.com/ipfs/go-ipld-format"
17+
"github.com/ipfs/go-merkledag"
18+
"github.com/ipfs/go-unixfs/importer/balanced"
19+
"github.com/ipfs/go-unixfs/importer/helpers"
20+
"github.com/ipld/go-car/v2/internal/carv1"
21+
mh "github.com/multiformats/go-multihash"
22+
"github.com/stretchr/testify/require"
23+
)
24+
25+
func TestCarOffsetWriter(t *testing.T) {
26+
ds := dss.MutexWrap(datastore.NewMapDatastore())
27+
bs := bstore.NewBlockstore(ds)
28+
bserv := blockservice.New(bs, nil)
29+
dserv := merkledag.NewDAGService(bserv)
30+
31+
rseed := 5
32+
size := 2 * 1024 * 1024
33+
source := io.LimitReader(rand.New(rand.NewSource(int64(rseed))), int64(size))
34+
nd, err := DAGImport(dserv, source)
35+
require.NoError(t, err)
36+
37+
// Write the CAR to a buffer from offset 0 so the buffer can be used for
38+
// comparison
39+
payloadCid := nd.Cid()
40+
fullCarCow := NewCarOffsetWriter(payloadCid, bs)
41+
var fullBuff bytes.Buffer
42+
err = fullCarCow.Write(context.Background(), &fullBuff, 0)
43+
require.NoError(t, err)
44+
45+
fullCar := fullBuff.Bytes()
46+
header := carHeader(nd.Cid())
47+
headerSize, err := carv1.HeaderSize(&header)
48+
49+
testCases := []struct {
50+
name string
51+
offset uint64
52+
}{{
53+
name: "1 byte offset",
54+
offset: 1,
55+
}, {
56+
name: "offset < header size",
57+
offset: headerSize - 1,
58+
}, {
59+
name: "offset == header size",
60+
offset: headerSize,
61+
}, {
62+
name: "offset > header size",
63+
offset: headerSize + 1,
64+
}, {
65+
name: "offset > header + one block size",
66+
offset: headerSize + 1024*1024 + 512*1024,
67+
}}
68+
69+
runTestCases := func(name string, runTCWithCow func() *CarOffsetWriter) {
70+
for _, tc := range testCases {
71+
t.Run(name+" - "+tc.name, func(t *testing.T) {
72+
cow := runTCWithCow()
73+
var buff bytes.Buffer
74+
err = cow.Write(context.Background(), &buff, tc.offset)
75+
require.NoError(t, err)
76+
require.Equal(t, len(fullCar)-int(tc.offset), len(buff.Bytes()))
77+
require.Equal(t, fullCar[tc.offset:], buff.Bytes())
78+
})
79+
}
80+
}
81+
82+
// Run tests with a new CarOffsetWriter
83+
runTestCases("new car offset writer", func() *CarOffsetWriter {
84+
return NewCarOffsetWriter(payloadCid, bs)
85+
})
86+
87+
// Run tests with a CarOffsetWriter that has already been used to write
88+
// a CAR starting at offset 0
89+
runTestCases("fully written car offset writer", func() *CarOffsetWriter {
90+
fullCarCow := NewCarOffsetWriter(payloadCid, bs)
91+
var buff bytes.Buffer
92+
err = fullCarCow.Write(context.Background(), &buff, 0)
93+
require.NoError(t, err)
94+
return fullCarCow
95+
})
96+
97+
// Run tests with a CarOffsetWriter that has already been used to write
98+
// a CAR starting at offset 1
99+
runTestCases("car offset writer written from offset 1", func() *CarOffsetWriter {
100+
fullCarCow := NewCarOffsetWriter(payloadCid, bs)
101+
var buff bytes.Buffer
102+
err = fullCarCow.Write(context.Background(), &buff, 1)
103+
require.NoError(t, err)
104+
return fullCarCow
105+
})
106+
107+
// Run tests with a CarOffsetWriter that has already been used to write
108+
// a CAR starting part way through the second block
109+
runTestCases("car offset writer written from offset 1.5 blocks", func() *CarOffsetWriter {
110+
fullCarCow := NewCarOffsetWriter(payloadCid, bs)
111+
var buff bytes.Buffer
112+
err = fullCarCow.Write(context.Background(), &buff, 1024*1024+512*1024)
113+
require.NoError(t, err)
114+
return fullCarCow
115+
})
116+
117+
// Run tests with a CarOffsetWriter that has already been used to write
118+
// a CAR repeatedly
119+
runTestCases("car offset writer written from offset repeatedly", func() *CarOffsetWriter {
120+
fullCarCow := NewCarOffsetWriter(payloadCid, bs)
121+
var buff bytes.Buffer
122+
err = fullCarCow.Write(context.Background(), &buff, 1024)
123+
require.NoError(t, err)
124+
fullCarCow = NewCarOffsetWriter(payloadCid, bs)
125+
var buff2 bytes.Buffer
126+
err = fullCarCow.Write(context.Background(), &buff2, 10)
127+
require.NoError(t, err)
128+
fullCarCow = NewCarOffsetWriter(payloadCid, bs)
129+
var buff3 bytes.Buffer
130+
err = fullCarCow.Write(context.Background(), &buff3, 1024*1024+512*1024)
131+
require.NoError(t, err)
132+
return fullCarCow
133+
})
134+
}
135+
136+
func TestSkipWriter(t *testing.T) {
137+
testCases := []struct {
138+
name string
139+
size int
140+
skip int
141+
expected int
142+
}{{
143+
name: "no skip",
144+
size: 1024,
145+
skip: 0,
146+
expected: 1024,
147+
}, {
148+
name: "skip 1",
149+
size: 1024,
150+
skip: 1,
151+
expected: 1023,
152+
}, {
153+
name: "skip all",
154+
size: 1024,
155+
skip: 1024,
156+
expected: 0,
157+
}, {
158+
name: "skip overflow",
159+
size: 1024,
160+
skip: 1025,
161+
expected: 0,
162+
}}
163+
164+
for _, tc := range testCases {
165+
t.Run(tc.name, func(t *testing.T) {
166+
var buff bytes.Buffer
167+
write := func(sw io.Writer) (int, error) {
168+
bz := make([]byte, tc.size)
169+
return sw.Write(bz)
170+
}
171+
count, err := skipWrite(&buff, uint64(tc.skip), write)
172+
require.NoError(t, err)
173+
require.Equal(t, tc.expected, count)
174+
require.Equal(t, tc.expected, len(buff.Bytes()))
175+
})
176+
}
177+
}
178+
179+
var DefaultHashFunction = uint64(mh.SHA2_256)
180+
181+
func DAGImport(dserv format.DAGService, fi io.Reader) (format.Node, error) {
182+
prefix, err := merkledag.PrefixForCidVersion(1)
183+
if err != nil {
184+
return nil, err
185+
}
186+
prefix.MhType = DefaultHashFunction
187+
188+
spl := chunk.NewSizeSplitter(fi, 1024*1024)
189+
dbp := helpers.DagBuilderParams{
190+
Maxlinks: 1024,
191+
RawLeaves: true,
192+
193+
CidBuilder: cidutil.InlineBuilder{
194+
Builder: prefix,
195+
Limit: 32,
196+
},
197+
198+
Dagserv: dserv,
199+
}
200+
201+
db, err := dbp.New(spl)
202+
if err != nil {
203+
return nil, err
204+
}
205+
206+
return balanced.Layout(db)
207+
}

0 commit comments

Comments
 (0)