Skip to content

Commit d22821c

Browse files
authored
Merge pull request #48 from ipfs-shipyard/feat/pin-api-kubo
Pin API alignment: option and response records, streaming/progress options
2 parents c9b7aa4 + a5b3604 commit d22821c

13 files changed

+586
-67
lines changed

src/CoreApi/DagApi.cs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -154,23 +154,32 @@ public async Task<DagStatSummary> StatAsync(string cid, IProgress<DagStatSummary
154154

155155
public Task<Stream> ExportAsync(string path, CancellationToken cancellationToken = default)
156156
{
157-
return ipfs.DownloadAsync("dag/export", cancellationToken, path);
157+
// Kubo expects POST for dag/export
158+
return ipfs.PostDownloadAsync("dag/export", cancellationToken, path);
158159
}
159160

160161
public async Task<CarImportOutput> ImportAsync(Stream stream, bool? pinRoots = null, bool stats = false, CancellationToken cancellationToken = default)
161162
{
162-
string[] options = [
163-
$"pin-roots={pinRoots.ToString().ToLowerInvariant()}",
164-
$"stats={stats.ToString().ToLowerInvariant()}"
165-
];
163+
// Respect Kubo default (pin roots = true) by omitting the flag when null.
164+
var optionsList = new System.Collections.Generic.List<string>();
165+
if (pinRoots.HasValue)
166+
optionsList.Add($"pin-roots={pinRoots.Value.ToString().ToLowerInvariant()}");
167+
168+
optionsList.Add($"stats={stats.ToString().ToLowerInvariant()}");
169+
var options = optionsList.ToArray();
166170

167171
using var resultStream = await ipfs.Upload2Async("dag/import", cancellationToken, stream, null, options);
168172

169173
// Read line-by-line
170174
using var reader = new StreamReader(resultStream);
171175

172-
// First output is always of type CarImportOutput
176+
// First output line may be absent on older Kubo when pin-roots=false
173177
var json = await reader.ReadLineAsync();
178+
if (string.IsNullOrEmpty(json))
179+
{
180+
return new CarImportOutput();
181+
}
182+
174183
var res = JsonConvert.DeserializeObject<CarImportOutput>(json);
175184
if (res is null)
176185
throw new InvalidDataException($"The response did not deserialize to {nameof(CarImportOutput)}.");
@@ -179,11 +188,14 @@ public async Task<CarImportOutput> ImportAsync(Stream stream, bool? pinRoots = n
179188
if (stats)
180189
{
181190
json = await reader.ReadLineAsync();
182-
var importStats = JsonConvert.DeserializeObject<CarImportStats>(json);
183-
if (importStats is null)
184-
throw new InvalidDataException($"The response did not deserialize a {nameof(CarImportStats)}.");
191+
if (!string.IsNullOrEmpty(json))
192+
{
193+
var importStats = JsonConvert.DeserializeObject<CarImportStats>(json);
194+
if (importStats is null)
195+
throw new InvalidDataException($"The response did not deserialize a {nameof(CarImportStats)}.");
185196

186-
res.Stats = importStats;
197+
res.Stats = importStats;
198+
}
187199
}
188200

189201
return res;

src/CoreApi/FileSystemApi.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,10 @@ private string[] ToApiOptions(AddFileOptions? options)
270270
opts.Add($"nocopy={options.NoCopy.ToString().ToLowerInvariant()}");
271271

272272
if (options.Pin is not null)
273-
opts.Add("pin=false");
273+
opts.Add($"pin={options.Pin.ToString().ToLowerInvariant()}");
274+
275+
if (!string.IsNullOrEmpty(options.PinName))
276+
opts.Add($"pin-name={options.PinName}");
274277

275278
if (options.Wrap is not null)
276279
opts.Add($"wrap-with-directory={options.Wrap.ToString().ToLowerInvariant()}");
@@ -291,10 +294,10 @@ private string[] ToApiOptions(AddFileOptions? options)
291294
opts.Add("progress=true");
292295

293296
if (options.Hash is not null)
294-
opts.Add($"hash=${options.Hash}");
297+
opts.Add($"hash={options.Hash}");
295298

296299
if (options.FsCache is not null)
297-
opts.Add($"fscache={options.Wrap.ToString().ToLowerInvariant()}");
300+
opts.Add($"fscache={options.FsCache.ToString().ToLowerInvariant()}");
298301

299302
if (options.ToFiles is not null)
300303
opts.Add($"to-files={options.ToFiles}");

src/CoreApi/PinApi.cs

Lines changed: 160 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
using Google.Protobuf;
2-
using Ipfs.CoreApi;
3-
using Newtonsoft.Json.Linq;
1+
using Ipfs.CoreApi;
42
using System.Collections.Generic;
53
using System.Linq;
64
using System.Threading;
75
using System.Threading.Tasks;
6+
using System;
7+
using System.IO;
8+
using Newtonsoft.Json;
89

10+
#nullable enable
911
namespace Ipfs.Http
1012
{
1113
class PinApi : IPinApi
@@ -17,41 +19,174 @@ internal PinApi(IpfsClient ipfs)
1719
this.ipfs = ipfs;
1820
}
1921

20-
public async Task<IEnumerable<Cid>> AddAsync(string path, bool recursive = true, CancellationToken cancel = default(CancellationToken))
22+
public async Task<IEnumerable<Cid>> AddAsync(string path, PinAddOptions options, CancellationToken cancel = default)
2123
{
22-
var opts = "recursive=" + recursive.ToString().ToLowerInvariant();
23-
var json = await ipfs.DoCommandAsync("pin/add", cancel, path, opts);
24-
return ((JArray)JObject.Parse(json)["Pins"])
25-
.Select(p => (Cid)(string)p);
24+
options ??= new PinAddOptions();
25+
var optList = new List<string>
26+
{
27+
"recursive=" + options.Recursive.ToString().ToLowerInvariant()
28+
};
29+
if (!string.IsNullOrEmpty(options.Name))
30+
{
31+
optList.Add("name=" + options.Name);
32+
}
33+
var json = await ipfs.DoCommandAsync("pin/add", cancel, path, optList.ToArray());
34+
var dto = JsonConvert.DeserializeObject<PinChangeResponseDto>(json);
35+
var pins = dto?.Pins ?? new List<string>();
36+
return pins.Select(p => (Cid)p);
2637
}
2738

28-
public async Task<IEnumerable<Cid>> ListAsync(CancellationToken cancel = default(CancellationToken))
39+
public async Task<IEnumerable<Cid>> AddAsync(string path, PinAddOptions options, IProgress<BlocksPinnedProgress> progress, CancellationToken cancel = default)
2940
{
30-
var json = await ipfs.DoCommandAsync("pin/ls", cancel);
31-
var keys = (JObject)(JObject.Parse(json)["Keys"]);
32-
return keys
33-
.Properties()
34-
.Select(p => (Cid)p.Name);
41+
options ??= new PinAddOptions();
42+
var optList = new List<string>
43+
{
44+
"recursive=" + options.Recursive.ToString().ToLowerInvariant(),
45+
"progress=true"
46+
};
47+
if (!string.IsNullOrEmpty(options.Name))
48+
{
49+
optList.Add("name=" + options.Name);
50+
}
51+
var pinned = new List<Cid>();
52+
var stream = await ipfs.PostDownloadAsync("pin/add", cancel, path, optList.ToArray());
53+
using var sr = new StreamReader(stream);
54+
while (!sr.EndOfStream && !cancel.IsCancellationRequested)
55+
{
56+
var line = await sr.ReadLineAsync();
57+
if (string.IsNullOrWhiteSpace(line))
58+
continue;
59+
var dto = JsonConvert.DeserializeObject<PinChangeResponseDto>(line);
60+
if (dto is null)
61+
continue;
62+
if (dto.Progress.HasValue)
63+
{
64+
progress?.Report(new BlocksPinnedProgress { BlocksPinned = dto.Progress.Value });
65+
}
66+
if (dto.Pins != null)
67+
{
68+
foreach (var p in dto.Pins)
69+
{
70+
pinned.Add((Cid)p);
71+
}
72+
}
73+
}
74+
return pinned;
3575
}
3676

37-
public async Task<IEnumerable<Cid>> ListAsync(PinType type, CancellationToken cancel = default(CancellationToken))
77+
public async IAsyncEnumerable<PinListItem> ListAsync([System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancel = default)
3878
{
39-
var typeOpt = type.ToString().ToLowerInvariant();
40-
var json = await ipfs.DoCommandAsync("pin/ls", cancel,
41-
null,
42-
$"type={typeOpt}");
43-
var keys = (JObject)(JObject.Parse(json)["Keys"]);
44-
return keys
45-
.Properties()
46-
.Select(p => (Cid)p.Name);
79+
// Default non-streaming, no names
80+
foreach (var item in await ListItemsOnceAsync(null, new List<string>(), cancel))
81+
{
82+
yield return item;
83+
}
84+
}
85+
86+
public async IAsyncEnumerable<PinListItem> ListAsync(PinType type, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancel = default)
87+
{
88+
var opts = new List<string> { $"type={type.ToString().ToLowerInvariant()}" };
89+
foreach (var item in await ListItemsOnceAsync(null, opts, cancel))
90+
{
91+
yield return item;
92+
}
93+
}
94+
95+
public async IAsyncEnumerable<PinListItem> ListAsync(PinListOptions options, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancel = default)
96+
{
97+
options ??= new PinListOptions();
98+
var opts = new List<string>();
99+
if (options.Type != PinType.All)
100+
opts.Add($"type={options.Type.ToString().ToLowerInvariant()}");
101+
if (!string.IsNullOrEmpty(options.Name))
102+
{
103+
opts.Add($"name={options.Name}");
104+
opts.Add("names=true");
105+
}
106+
else if (options.Names)
107+
{
108+
opts.Add("names=true");
109+
}
110+
111+
if (options.Stream)
112+
{
113+
await foreach (var item in ListItemsStreamAsync(null, opts, options.Names, cancel))
114+
{
115+
yield return item;
116+
}
117+
}
118+
else
119+
{
120+
foreach (var item in await ListItemsOnceAsync(null, opts, cancel))
121+
{
122+
yield return item;
123+
}
124+
}
47125
}
48126

49127
public async Task<IEnumerable<Cid>> RemoveAsync(Cid id, bool recursive = true, CancellationToken cancel = default(CancellationToken))
50128
{
51129
var opts = "recursive=" + recursive.ToString().ToLowerInvariant();
52130
var json = await ipfs.DoCommandAsync("pin/rm", cancel, id, opts);
53-
return ((JArray)JObject.Parse(json)["Pins"])
54-
.Select(p => (Cid)(string)p);
131+
var dto = JsonConvert.DeserializeObject<PinChangeResponseDto>(json);
132+
var pins = dto?.Pins ?? new List<string>();
133+
return pins.Select(p => (Cid)p);
134+
}
135+
136+
// Internal helper used by ListAsync overloads
137+
138+
async IAsyncEnumerable<PinListItem> ListItemsStreamAsync(string? path, List<string> opts, bool includeNames, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancel)
139+
{
140+
opts = new List<string>(opts) { "stream=true" };
141+
var stream = await ipfs.PostDownloadAsync("pin/ls", cancel, path, opts.ToArray());
142+
using var sr = new StreamReader(stream);
143+
while (!sr.EndOfStream && !cancel.IsCancellationRequested)
144+
{
145+
var line = await sr.ReadLineAsync();
146+
if (string.IsNullOrWhiteSpace(line))
147+
continue;
148+
var dto = JsonConvert.DeserializeObject<PinLsObjectDto>(line);
149+
if (dto is null || string.IsNullOrEmpty(dto.Cid))
150+
continue;
151+
yield return new PinListItem
152+
{
153+
Cid = (Cid)dto.Cid!,
154+
Type = ParseType(dto.Type),
155+
Name = dto.Name
156+
};
157+
}
158+
}
159+
160+
async Task<IEnumerable<PinListItem>> ListItemsOnceAsync(string? path, List<string> opts, CancellationToken cancel)
161+
{
162+
var json = await ipfs.DoCommandAsync("pin/ls", cancel, path, opts.ToArray());
163+
var root = JsonConvert.DeserializeObject<PinListResponseDto>(json);
164+
var list = new List<PinListItem>();
165+
if (root?.Keys != null)
166+
{
167+
foreach (var kv in root.Keys)
168+
{
169+
list.Add(new PinListItem
170+
{
171+
Cid = (Cid)kv.Key!,
172+
Type = ParseType(kv.Value?.Type),
173+
Name = string.IsNullOrEmpty(kv.Value?.Name) ? null : kv.Value!.Name
174+
});
175+
}
176+
}
177+
return list;
178+
}
179+
180+
static PinType ParseType(string? t)
181+
{
182+
return t?.ToLowerInvariant() switch
183+
{
184+
"direct" => PinType.Direct,
185+
"indirect" => PinType.Indirect,
186+
"recursive" => PinType.Recursive,
187+
"all" => PinType.All,
188+
_ => PinType.All
189+
};
55190
}
56191

57192
}

src/CoreApi/PinDto.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
using System.Collections.Generic;
2+
3+
#nullable enable
4+
namespace Ipfs.Http
5+
{
6+
/// <summary>
7+
/// Non-streaming response DTO for /api/v0/pin/ls.
8+
/// </summary>
9+
internal record PinListResponseDto
10+
{
11+
public Dictionary<string, PinInfoDto>? Keys { get; init; }
12+
}
13+
14+
/// <summary>
15+
/// DTO for entry value in PinListResponseDto.Keys.
16+
/// </summary>
17+
internal record PinInfoDto
18+
{
19+
public string? Name { get; init; }
20+
public string? Type { get; init; }
21+
}
22+
23+
/// <summary>
24+
/// Streaming response DTO for /api/v0/pin/ls?stream=true.
25+
/// </summary>
26+
internal record PinLsObjectDto
27+
{
28+
public string? Cid { get; init; }
29+
public string? Name { get; init; }
30+
public string? Type { get; init; }
31+
}
32+
33+
/// <summary>
34+
/// Response DTO for /api/v0/pin/add and /api/v0/pin/rm which both return a Pins array.
35+
/// </summary>
36+
internal record PinChangeResponseDto
37+
{
38+
public int? Progress { get; init; }
39+
public List<string>? Pins { get; init; }
40+
}
41+
}

src/IpfsHttpClient.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ Added missing IFileSystemApi.ListAsync. Doesn't fully replace the removed IFileS
102102
</ItemGroup>
103103

104104
<ItemGroup>
105-
<PackageReference Include="IpfsShipyard.Ipfs.Core" Version="0.7.0" />
105+
<PackageReference Include="IpfsShipyard.Ipfs.Core" Version="0.8.0" />
106106
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
107107
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
108108
<PackageReference Include="Multiformats.Base" Version="2.0.2" />

test/AsyncEnumerableTestHelpers.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using System.Collections.Generic;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
5+
namespace Ipfs.Http
6+
{
7+
internal static class AsyncEnumerableTestHelpers
8+
{
9+
public static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> source)
10+
{
11+
return source.ToArrayAsync().GetAwaiter().GetResult();
12+
}
13+
14+
public static async Task<T[]> ToArrayAsync<T>(this IAsyncEnumerable<T> source)
15+
{
16+
var list = new List<T>();
17+
await foreach (var item in source)
18+
{
19+
list.Add(item);
20+
}
21+
return list.ToArray();
22+
}
23+
}
24+
}

0 commit comments

Comments
 (0)