Skip to content

Commit 1ca3182

Browse files
authored
xet: Update chunk indexes in the chunkCache after deduplication (#1780)
Update the chunk indexes in the chunkCache after deduplication to match the new xorb chunks. The problem happens if a previous chunk in the xorb is removed during backtrack deduplication. Then the chunk indexes for the following chunks in teh xorb were updated in the passed `chunkMetadata` array, but not in the chunkCache. Which meant that if the chunks were requested again eg (due to uploading another file with shared content in the same `uploadShards` call) the chunk indexes provided would be incorrect. cc @assafvayner @mishig25 for viz Follow up #1771 and #1779 --- Also improved the debug/replay script to support multiple files upload, and added a test
1 parent c242011 commit 1ca3182

File tree

4 files changed

+191
-47
lines changed

4 files changed

+191
-47
lines changed

packages/hub/scripts/debug-xet.ts

Lines changed: 82 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,22 @@ import { existsSync } from "node:fs";
1010

1111
/**
1212
* This script debugs xet uploads by capturing all network data locally
13-
* It takes a local file, repo, and token, then uploads while saving:
13+
* It takes one or more local files, repo, and token, then uploads while saving:
1414
* - Dedup shards as dedup_[chunk_hash]_shard.bin
1515
* - Uploaded xorbs as uploaded_xorb_1.bin, uploaded_xorb_2.bin, etc.
1616
* - Uploaded shards as uploaded_shard_1.bin, uploaded_shard_2.bin, etc.
1717
*
18-
* Normal mode: Captures all upload data to upload_[filename]/ directory
18+
* Normal mode: Captures all upload data to upload_[filename]/ directory (single file) or multiple-files/ directory (multiple files)
1919
* Replay mode: Validates upload data matches previously captured local files
2020
*
2121
* Usage:
22+
* Single file:
2223
* pnpm --filter hub debug-xet -f <local_file> -t <write_token> -r <xet_repo>
2324
* pnpm --filter hub debug-xet -f <local_file> -t <write_token> -r <xet_repo> --replay
25+
*
26+
* Multiple files (comma-separated):
27+
* pnpm --filter hub debug-xet -f <file1,file2,file3> -t <write_token> -r <xet_repo>
28+
* pnpm --filter hub debug-xet -f <file1,file2,file3> -t <write_token> -r <xet_repo> --replay
2429
*/
2530

2631
interface DebugFetchStats {
@@ -182,32 +187,34 @@ function createDebugFetch(args: { debugDir: string; replay?: boolean }): {
182187
};
183188
}
184189

185-
async function* createFileSource(filepath: string): AsyncGenerator<{
190+
async function* createMultiFileSource(filepaths: string[]): AsyncGenerator<{
186191
content: Blob;
187192
path: string;
188193
sha256: string;
189194
}> {
190-
const filename = basename(filepath);
191-
console.log(`Processing ${filename}...`);
192-
193-
const blob: Blob = await FileBlob.create(filepath);
194-
195-
// Calculate sha256
196-
console.log(`Calculating SHA256 for ${filename}...`);
197-
const sha256Iterator = sha256(blob, { useWebWorker: false });
198-
let res: IteratorResult<number, string>;
199-
do {
200-
res = await sha256Iterator.next();
201-
} while (!res.done);
202-
const sha256Hash = res.value;
203-
204-
console.log(`SHA256 for ${filename}: ${sha256Hash}`);
205-
206-
yield {
207-
content: blob,
208-
path: filename,
209-
sha256: sha256Hash,
210-
};
195+
for (const filepath of filepaths) {
196+
const filename = basename(filepath);
197+
console.log(`Processing ${filename}...`);
198+
199+
const blob: Blob = await FileBlob.create(filepath);
200+
201+
// Calculate sha256
202+
console.log(`Calculating SHA256 for ${filename}...`);
203+
const sha256Iterator = sha256(blob, { useWebWorker: false });
204+
let res: IteratorResult<number, string>;
205+
do {
206+
res = await sha256Iterator.next();
207+
} while (!res.done);
208+
const sha256Hash = res.value;
209+
210+
console.log(`SHA256 for ${filename}: ${sha256Hash}`);
211+
212+
yield {
213+
content: blob,
214+
path: filename,
215+
sha256: sha256Hash,
216+
};
217+
}
211218
}
212219

213220
async function main() {
@@ -233,20 +240,27 @@ async function main() {
233240
});
234241

235242
if (!args.token || !args.repo || !args.file) {
236-
console.error("Usage: pnpm --filter hub debug-xet -f <local_file> -t <write_token> -r <xet_repo>");
243+
console.error("Usage: pnpm --filter hub debug-xet -f <file1,file2,file3> -t <write_token> -r <xet_repo>");
237244
console.error("Example: pnpm --filter hub debug-xet -f ./model.bin -t hf_... -r myuser/myrepo");
245+
console.error("Example: pnpm --filter hub debug-xet -f ./model1.bin,./model2.bin -t hf_... -r myuser/myrepo");
238246
console.error("Options:");
239247
console.error(" --replay Use local dedup info instead of remote");
240248
process.exit(1);
241249
}
242250

243-
if (!existsSync(args.file)) {
244-
console.error(`❌ File ${args.file} does not exist`);
245-
process.exit(1);
251+
// Parse comma-separated file paths
252+
const filePaths = args.file.split(",").map((f) => f.trim());
253+
254+
// Validate all files exist
255+
for (const filePath of filePaths) {
256+
if (!existsSync(filePath)) {
257+
console.error(`❌ File ${filePath} does not exist`);
258+
process.exit(1);
259+
}
246260
}
247261

248-
const filename = basename(args.file);
249-
const debugDir = `upload_${filename}`;
262+
// Determine debug directory name
263+
const debugDir = filePaths.length > 1 ? "multiple-files" : `upload_${basename(filePaths[0])}`;
250264

251265
// Handle debug directory based on mode
252266
if (args.replay) {
@@ -288,20 +302,30 @@ async function main() {
288302
rev: "main",
289303
};
290304

291-
console.log(`\n=== Starting debug upload for ${filename} ===`);
305+
console.log(
306+
`\n=== Starting debug upload for ${filePaths.length > 1 ? `${filePaths.length} files` : basename(filePaths[0])} ===`
307+
);
292308
if (args.replay) {
293309
console.log("🔄 Replay mode: Using local dedup info when available");
294310
}
295311

296-
// Get file stats
297-
const fileStats = await stat(args.file);
298-
console.log(`📄 File size: ${(fileStats.size / 1024 / 1024).toFixed(2)} MB`);
312+
// Get total file stats
313+
let totalSize = 0;
314+
for (const filePath of filePaths) {
315+
const fileStats = await stat(filePath);
316+
totalSize += fileStats.size;
317+
console.log(`📄 ${basename(filePath)}: ${(fileStats.size / 1_000_000).toFixed(2)} MB`);
318+
}
319+
console.log(`📊 Total size: ${(totalSize / 1_000_000).toFixed(2)} MB`);
299320

300-
// Process file through uploadShards
301-
const fileSource = createFileSource(args.file);
321+
// Process files through uploadShards
322+
const fileSource = createMultiFileSource(filePaths);
302323

303-
let dedupRatio = 0;
304-
let fileSha256 = "";
324+
const processedFiles: Array<{
325+
path: string;
326+
sha256: string;
327+
dedupRatio: number;
328+
}> = [];
305329

306330
for await (const event of uploadShards(fileSource, uploadParams)) {
307331
switch (event.event) {
@@ -310,8 +334,11 @@ async function main() {
310334
console.log(` SHA256: ${event.sha256}`);
311335
console.log(` Dedup ratio: ${(event.dedupRatio * 100).toFixed(2)}%`);
312336

313-
dedupRatio = event.dedupRatio;
314-
fileSha256 = event.sha256;
337+
processedFiles.push({
338+
path: event.path,
339+
sha256: event.sha256,
340+
dedupRatio: event.dedupRatio,
341+
});
315342
break;
316343
}
317344

@@ -327,9 +354,21 @@ async function main() {
327354

328355
console.log("\n=== DEBUG UPLOAD RESULTS ===");
329356
console.log(`📁 Debug directory: ${debugDir}`);
330-
console.log(`📄 Original file: ${filename} (${(fileStats.size / 1024 / 1024).toFixed(2)} MB)`);
331-
console.log(`🔒 SHA256: ${fileSha256}`);
332-
console.log(`📊 Deduplication: ${(dedupRatio * 100).toFixed(2)}%`);
357+
console.log(`📄 Processed files: ${processedFiles.length}`);
358+
console.log(`📊 Total size: ${(totalSize / 1024 / 1024).toFixed(2)} MB`);
359+
360+
// Show details for each file
361+
for (const file of processedFiles) {
362+
console.log(`\n🔒 ${file.path}:`);
363+
console.log(` SHA256: ${file.sha256}`);
364+
console.log(` Deduplication: ${(file.dedupRatio * 100).toFixed(2)}%`);
365+
}
366+
367+
// Calculate average dedup ratio
368+
const avgDedupRatio =
369+
processedFiles.length > 0 ? processedFiles.reduce((sum, f) => sum + f.dedupRatio, 0) / processedFiles.length : 0;
370+
371+
console.log(`\n📊 Average deduplication: ${(avgDedupRatio * 100).toFixed(2)}%`);
333372
console.log(`📤 Network calls:`);
334373
console.log(` - ${stats.xorbCount} xorb uploads`);
335374
console.log(` - ${stats.shardCount} shard uploads`);

packages/hub/src/utils/ChunkCache.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ export class ChunkCache {
8585
};
8686
}
8787

88+
updateChunkIndex(hash: string, chunkIndex: number): void {
89+
const index = this.map.get(hash);
90+
if (index === undefined) {
91+
throw new Error(`Chunk not found in cache: ${hash}`);
92+
}
93+
this.chunkIndices[index] = chunkIndex;
94+
}
95+
8896
removeChunkFromCache(hash: string): void {
8997
this.map.delete(hash);
9098
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import { describe, expect, it } from "vitest";
2+
import { backtrackDedup, CurrentXorbInfo } from "./createXorbs";
3+
import type { ShardData } from "./shardParser";
4+
import { ChunkCache } from "./ChunkCache";
5+
6+
describe("createXorb", () => {
7+
describe("backtrackDedup", () => {
8+
it("should update cache info for chunks that go back due to previous chunks being erased", () => {
9+
const xorb = new CurrentXorbInfo();
10+
11+
const chunkMetadata = [
12+
{
13+
xorbId: xorb.id,
14+
chunkIndex: 0,
15+
length: 101,
16+
},
17+
{
18+
xorbId: xorb.id,
19+
chunkIndex: 1,
20+
length: 101,
21+
},
22+
];
23+
xorb.chunks = [
24+
{
25+
hash: "chunk1",
26+
length: 101,
27+
offset: 0,
28+
},
29+
{
30+
hash: "chunk2",
31+
length: 101,
32+
offset: 101,
33+
},
34+
];
35+
const shardData: ShardData = {
36+
hmacKey: "shard1",
37+
xorbs: [
38+
{
39+
hash: "remoteXorb1",
40+
chunks: [
41+
{
42+
hash: "chunk0:shard1",
43+
startOffset: 0,
44+
unpackedLength: 100,
45+
},
46+
{
47+
hash: "chunk1:shard1",
48+
startOffset: 100,
49+
unpackedLength: 101,
50+
},
51+
],
52+
},
53+
],
54+
};
55+
56+
const computeHmac = (hash: string, key: string) => {
57+
return hash + ":" + key;
58+
};
59+
60+
const chunkCache = new ChunkCache();
61+
let chunkIndex = 0;
62+
for (const chunk of xorb.chunks) {
63+
chunkCache.addChunkToCache(chunk.hash, xorb.id, chunkIndex++, shardData.hmacKey);
64+
}
65+
let xorbIndex = 0;
66+
for (const xorb of shardData.xorbs) {
67+
xorbIndex--;
68+
for (let i = 0; i < xorb.chunks.length; i++) {
69+
chunkCache.addChunkToCache(xorb.chunks[i].hash, xorbIndex, i, shardData.hmacKey);
70+
}
71+
}
72+
const dedup = backtrackDedup(xorb, computeHmac, shardData, chunkCache, chunkMetadata, 0);
73+
expect(dedup).toBe(101);
74+
expect(xorb.chunks).toEqual([{ hash: "chunk2", length: 101, offset: 0 }]);
75+
expect(chunkMetadata).toEqual([
76+
{
77+
xorbId: -1,
78+
chunkIndex: 1,
79+
length: 101,
80+
},
81+
{
82+
xorbId: 0,
83+
chunkIndex: 0,
84+
length: 101,
85+
},
86+
]);
87+
// chunk1 should use remote hash now
88+
expect(chunkCache.getChunk("chunk1", computeHmac)).toEqual({ xorbIndex: -1, chunkIndex: 1 });
89+
// The xorb index for chunk2 should be 0 now that the previous chunk was erased from the xorb
90+
expect(chunkCache.getChunk("chunk2", computeHmac)).toEqual({ xorbIndex: 0, chunkIndex: 0 });
91+
});
92+
});
93+
});

packages/hub/src/utils/createXorbs.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ interface XorbEvent {
3333
}>;
3434
}
3535

36-
class CurrentXorbInfo {
36+
export class CurrentXorbInfo {
3737
id: number;
3838
offset: number;
3939
chunks: Array<{ hash: string; length: number; offset: number }>;
@@ -192,7 +192,6 @@ export async function* createXorbs(
192192
}
193193
let chunkIndex = xorb.chunks.length;
194194
let chunkXorbId = xorbId;
195-
fileChunks.push({ hash: chunk.hash, length: chunk.length });
196195

197196
// Remove chunks from source data
198197
const chunkToCopy = removeChunkFromSourceData(sourceChunks, chunk.length);
@@ -361,14 +360,14 @@ export async function* createXorbs(
361360
}
362361
}
363362

364-
function backtrackDedup(
363+
export function backtrackDedup(
365364
xorb: CurrentXorbInfo,
366365
computeHmac: (hash: string, key: string) => string,
367366
shardData: ShardData,
368367
chunkCache: ChunkCache,
369368
chunkMetadata: { xorbId: number | string; chunkIndex: number; length: number }[],
370369
dedupedBytes: number
371-
) {
370+
): number {
372371
const chunkIndexesToBacktrackFor = new Map<number, { xorbId: number; chunkIndex: number }>();
373372
for (
374373
let chunkToRecheckIndex = xorb.immutableData?.chunkIndex ?? 0;
@@ -453,10 +452,15 @@ function backtrackDedup(
453452
}
454453
xorb.chunks = newXorbChunks;
455454
xorb.offset = currentOffset;
455+
// Update chunkMetadata and chunkCache with new chunk indexes for the current xorb chunks
456456
for (const chunk of chunkMetadata) {
457457
if (chunk.xorbId === xorb.id) {
458458
const newIndex = oldIndexToNewIndex.get(chunk.chunkIndex);
459459
if (newIndex !== undefined) {
460+
const cached = chunkCache.getChunk(xorb.chunks[newIndex].hash, null);
461+
if (cached !== undefined && cached.xorbIndex === chunk.xorbId && cached.chunkIndex === chunk.chunkIndex) {
462+
chunkCache.updateChunkIndex(xorb.chunks[newIndex].hash, newIndex);
463+
}
460464
chunk.chunkIndex = newIndex;
461465
}
462466
}

0 commit comments

Comments
 (0)