Skip to content

Commit ca9b7d4

Browse files
EvanHahngmaclennan
andauthored
feat: only send/receive core keys with permission (#783)
Previously, we would add cores without checking anything. Now, we only add them once (1) their role is OK (2) they have a good core ownership record. Partly addresses [#268]. [#268]: #268 Co-Authored-By: Gregor MacLennan <[email protected]>
1 parent 4182e9f commit ca9b7d4

File tree

14 files changed

+212
-611
lines changed

14 files changed

+212
-611
lines changed

proto/extensions.proto

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,7 @@
11
syntax = "proto3";
22

33
message ProjectExtension {
4-
repeated bytes wantCoreKeys = 1;
5-
6-
repeated bytes authCoreKeys = 2;
7-
repeated bytes configCoreKeys = 3;
8-
repeated bytes dataCoreKeys = 4;
9-
repeated bytes blobIndexCoreKeys = 5;
10-
repeated bytes blobCoreKeys = 6;
4+
repeated bytes authCoreKeys = 1;
115
}
126

137
message HaveExtension {

src/core-manager/index.js

Lines changed: 26 additions & 173 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,16 @@ import Corestore from 'corestore'
33
import { debounce } from 'throttle-debounce'
44
import assert from 'node:assert/strict'
55
import { sql, eq } from 'drizzle-orm'
6-
import { discoveryKey } from 'hypercore-crypto'
7-
import Hypercore from 'hypercore'
86

97
import { HaveExtension, ProjectExtension } from '../generated/extensions.js'
108
import { Logger } from '../logger.js'
119
import { NAMESPACES } from '../constants.js'
12-
import { keyToId, noop } from '../utils.js'
10+
import { noop } from '../utils.js'
1311
import { coresTable } from '../schema/project.js'
1412
import * as rle from './bitfield-rle.js'
1513
import { CoreIndex } from './core-index.js'
1614

15+
/** @import Hypercore from 'hypercore' */
1716
/** @import { HypercorePeer, Namespace } from '../types.js' */
1817

1918
const WRITER_CORE_PREHAVES_DEBOUNCE_DELAY = 1000
@@ -46,12 +45,6 @@ export class CoreManager extends TypedEmitter {
4645
#haveExtension
4746
#deviceId
4847
#l
49-
/**
50-
* We use this to reduce network traffic caused by requesting the same key
51-
* from multiple clients.
52-
* TODO: Remove items from this set after a max age
53-
*/
54-
#keyRequests = new TrackedKeyRequests()
5548
#autoDownload
5649

5750
static get namespaces() {
@@ -152,8 +145,8 @@ export class CoreManager extends TypedEmitter {
152145
'mapeo/project',
153146
{
154147
encoding: ProjectExtensionCodec,
155-
onmessage: (msg, peer) => {
156-
this.#handleProjectMessage(msg, peer)
148+
onmessage: (msg) => {
149+
this.#handleProjectMessage(msg)
157150
},
158151
}
159152
)
@@ -169,12 +162,7 @@ export class CoreManager extends TypedEmitter {
169162
this.#sendHaves(peer, this.#coreIndex).catch(() => {
170163
this.#l.log('Failed to send pre-haves to newly-connected peer')
171164
})
172-
})
173-
this.#creatorCore.on('peer-remove', (peer) => {
174-
// When a peer is removed we clean up any unanswered key requests, so that
175-
// we will request from a different peer, and to avoid the tracking of key
176-
// requests growing without bounds.
177-
this.#keyRequests.deleteByPeerKey(peer.remotePublicKey)
165+
this.#sendAuthCoreKeys(peer)
178166
})
179167

180168
this.#ready = Promise.all(
@@ -251,7 +239,6 @@ export class CoreManager extends TypedEmitter {
251239
*/
252240
async close() {
253241
this.#state = 'closing'
254-
this.#keyRequests.clear()
255242
const promises = []
256243
for (const { core } of this.#coreIndex) {
257244
promises.push(core.close())
@@ -268,6 +255,7 @@ export class CoreManager extends TypedEmitter {
268255
* @returns {CoreRecord}
269256
*/
270257
addCore(key, namespace) {
258+
this.#l.log('Adding remote core %k to %s', key, namespace)
271259
return this.#addCore({ publicKey: key }, namespace, true)
272260
}
273261

@@ -359,69 +347,31 @@ export class CoreManager extends TypedEmitter {
359347
}
360348

361349
/**
362-
* Send an extension message over the project creator core replication stream
363-
* requesting a core key for the given discovery key.
350+
* We only add auth cores from the project extension messages. Cores in other
351+
* namespaces are added by the sync API from the core ownership docs
364352
*
365-
* @param {Buffer} peerKey
366-
* @param {Buffer} discoveryKey
353+
* @param {ProjectExtension} msg
367354
*/
368-
requestCoreKey(peerKey, discoveryKey) {
369-
// No-op if we already have this core
370-
if (this.getCoreByDiscoveryKey(discoveryKey)) return
371-
const peer = this.#creatorCore.peers.find((peer) => {
372-
return peer.remotePublicKey.equals(peerKey)
373-
})
374-
if (!peer) {
375-
// This should not happen because this is only called from SyncApi, which
376-
// checks the peer exists before calling this method.
377-
this.#l.log(
378-
'Attempted to request core key for %h, but no connected peer %h',
379-
discoveryKey,
380-
peerKey
381-
)
382-
return
355+
#handleProjectMessage({ authCoreKeys }) {
356+
for (const authCoreKey of authCoreKeys) {
357+
// Use public method - these must be persisted (private method defaults to persisted=false)
358+
this.addCore(authCoreKey, 'auth')
383359
}
384-
// Only request a key once, e.g. from the peer we first receive it from (we
385-
// can assume that a peer must have the key if we see the discovery key in
386-
// the protomux). This is necessary to reduce network traffic for many newly
387-
// connected peers - otherwise duplicate requests will be sent to every peer
388-
if (this.#keyRequests.has(discoveryKey)) return
389-
this.#keyRequests.set(discoveryKey, peerKey)
390-
391-
this.#l.log(
392-
'Requesting core key for discovery key %h from peer %h',
393-
discoveryKey,
394-
peerKey
395-
)
396-
const message = ProjectExtension.fromPartial({
397-
wantCoreKeys: [discoveryKey],
398-
})
399-
this.#projectExtension.send(message, peer)
400360
}
401361

402362
/**
403-
* @param {ProjectExtension} msg
363+
* Sends auth core keys to the given peer, skipping any keys that we know the
364+
* peer has already (depends on the peer having already replicated the auth
365+
* cores it has)
366+
*
404367
* @param {HypercorePeer} peer
405368
*/
406-
#handleProjectMessage({ wantCoreKeys, ...coreKeys }, peer) {
369+
#sendAuthCoreKeys(peer) {
407370
const message = ProjectExtension.create()
408-
let hasKeys = false
409-
for (const discoveryKey of wantCoreKeys) {
410-
const coreRecord = this.getCoreByDiscoveryKey(discoveryKey)
411-
if (!coreRecord) continue
412-
message[`${coreRecord.namespace}CoreKeys`].push(coreRecord.key)
413-
hasKeys = true
414-
}
415-
if (hasKeys) {
416-
this.#projectExtension.send(message, peer)
417-
}
418-
for (const namespace of NAMESPACES) {
419-
for (const coreKey of coreKeys[`${namespace}CoreKeys`]) {
420-
// Use public method - these must be persisted (private method defaults to persisted=false)
421-
this.addCore(coreKey, namespace)
422-
this.#keyRequests.deleteByDiscoveryKey(discoveryKey(coreKey))
423-
}
371+
for (const { key } of this.getCores('auth')) {
372+
message.authCoreKeys.push(key)
424373
}
374+
this.#projectExtension.send(message, peer)
425375
}
426376

427377
/**
@@ -478,21 +428,14 @@ export class CoreManager extends TypedEmitter {
478428
* ONLY FOR TESTING
479429
* Replicate all cores in core manager
480430
*
431+
* NB: Remote peers need to be manually added when unit testing core manager
432+
* without the Sync API
433+
*
481434
* @param {Parameters<Corestore['replicate']>[0]} stream
482435
*/
483436
[kCoreManagerReplicate](stream) {
484-
const protocolStream = Hypercore.createProtocolStream(stream, {
485-
ondiscoverykey: async (discoveryKey) => {
486-
const peer = await findPeer(
487-
this.creatorCore,
488-
// @ts-ignore
489-
protocolStream.noiseStream.remotePublicKey
490-
)
491-
if (!peer) return
492-
this.requestCoreKey(peer.remotePublicKey, discoveryKey)
493-
},
494-
})
495-
return this.#corestore.replicate(stream)
437+
const protocolStream = this.#corestore.replicate(stream)
438+
return protocolStream
496439
}
497440

498441
/**
@@ -559,93 +502,3 @@ const HaveExtensionCodec = {
559502
}
560503
},
561504
}
562-
563-
class TrackedKeyRequests {
564-
/** @type {Map<string, string>} */
565-
#byDiscoveryId = new Map()
566-
/** @type {Map<string, Set<string>>} */
567-
#byPeerId = new Map()
568-
569-
/**
570-
* @param {Buffer} discoveryKey
571-
* @param {Buffer} peerKey
572-
*/
573-
set(discoveryKey, peerKey) {
574-
const discoveryId = keyToId(discoveryKey)
575-
const peerId = keyToId(peerKey)
576-
const existingForPeer = this.#byPeerId.get(peerId) || new Set()
577-
this.#byDiscoveryId.set(discoveryId, peerId)
578-
existingForPeer.add(discoveryId)
579-
this.#byPeerId.set(peerId, existingForPeer)
580-
return this
581-
}
582-
/**
583-
* @param {Buffer} discoveryKey
584-
*/
585-
has(discoveryKey) {
586-
const discoveryId = keyToId(discoveryKey)
587-
return this.#byDiscoveryId.has(discoveryId)
588-
}
589-
/**
590-
* @param {Buffer} discoveryKey
591-
*/
592-
deleteByDiscoveryKey(discoveryKey) {
593-
const discoveryId = keyToId(discoveryKey)
594-
const peerId = this.#byDiscoveryId.get(discoveryId)
595-
if (!peerId) return false
596-
this.#byDiscoveryId.delete(discoveryId)
597-
const existingForPeer = this.#byPeerId.get(peerId)
598-
if (existingForPeer) {
599-
existingForPeer.delete(discoveryId)
600-
}
601-
return true
602-
}
603-
/**
604-
* @param {Buffer} peerKey
605-
*/
606-
deleteByPeerKey(peerKey) {
607-
const peerId = keyToId(peerKey)
608-
const existingForPeer = this.#byPeerId.get(peerId)
609-
if (!existingForPeer) return
610-
for (const discoveryId of existingForPeer) {
611-
this.#byDiscoveryId.delete(discoveryId)
612-
}
613-
this.#byPeerId.delete(peerId)
614-
}
615-
clear() {
616-
this.#byDiscoveryId.clear()
617-
this.#byPeerId.clear()
618-
}
619-
}
620-
621-
/**
622-
* @param {Hypercore<"binary", Buffer>} core
623-
* @param {Buffer} publicKey
624-
* @param {{ timeout?: number }} [opts]
625-
*/
626-
function findPeer(core, publicKey, { timeout = 200 } = {}) {
627-
const peer = core.peers.find((peer) => {
628-
return peer.remotePublicKey.equals(publicKey)
629-
})
630-
if (peer) return peer
631-
// This is called from the from the handleDiscoveryId event, which can
632-
// happen before the peer connection is fully established, so we wait for
633-
// the `peer-add` event, with a timeout in case the peer never gets added
634-
return new Promise(function (res) {
635-
const timeoutId = setTimeout(function () {
636-
core.off('peer-add', onPeer)
637-
res(null)
638-
}, timeout)
639-
640-
core.on('peer-add', onPeer)
641-
642-
/** @param {HypercorePeer} peer */
643-
function onPeer(peer) {
644-
if (peer.remotePublicKey.equals(publicKey)) {
645-
clearTimeout(timeoutId)
646-
core.off('peer-add', onPeer)
647-
res(peer)
648-
}
649-
}
650-
})
651-
}

src/core-ownership.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ export class CoreOwnership extends TypedEmitter {
116116
return this.#dataType.getByDocId(deviceId)
117117
}
118118

119+
async getAll() {
120+
await this.#ownershipWriteDone
121+
return this.#dataType.getMany()
122+
}
123+
119124
/**
120125
*
121126
* @param {KeyPair} identityKeypair

src/generated/extensions.d.ts

Lines changed: 15 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
1-
/// <reference types="node" />
21
import _m0 from "protobufjs/minimal.js";
32
export interface ProjectExtension {
4-
wantCoreKeys: Buffer[];
53
authCoreKeys: Buffer[];
6-
configCoreKeys: Buffer[];
7-
dataCoreKeys: Buffer[];
8-
blobIndexCoreKeys: Buffer[];
9-
blobCoreKeys: Buffer[];
104
}
115
export interface HaveExtension {
126
discoveryKey: Buffer;
@@ -28,60 +22,23 @@ export declare function haveExtension_NamespaceToNumber(object: HaveExtension_Na
2822
export declare const ProjectExtension: {
2923
encode(message: ProjectExtension, writer?: _m0.Writer): _m0.Writer;
3024
decode(input: _m0.Reader | Uint8Array, length?: number): ProjectExtension;
31-
create<I extends {
32-
wantCoreKeys?: Buffer[];
33-
authCoreKeys?: Buffer[];
34-
configCoreKeys?: Buffer[];
35-
dataCoreKeys?: Buffer[];
36-
blobIndexCoreKeys?: Buffer[];
37-
blobCoreKeys?: Buffer[];
38-
} & {
39-
wantCoreKeys?: Buffer[] & Buffer[] & { [K in Exclude<keyof I["wantCoreKeys"], keyof Buffer[]>]: never; };
40-
authCoreKeys?: Buffer[] & Buffer[] & { [K_1 in Exclude<keyof I["authCoreKeys"], keyof Buffer[]>]: never; };
41-
configCoreKeys?: Buffer[] & Buffer[] & { [K_2 in Exclude<keyof I["configCoreKeys"], keyof Buffer[]>]: never; };
42-
dataCoreKeys?: Buffer[] & Buffer[] & { [K_3 in Exclude<keyof I["dataCoreKeys"], keyof Buffer[]>]: never; };
43-
blobIndexCoreKeys?: Buffer[] & Buffer[] & { [K_4 in Exclude<keyof I["blobIndexCoreKeys"], keyof Buffer[]>]: never; };
44-
blobCoreKeys?: Buffer[] & Buffer[] & { [K_5 in Exclude<keyof I["blobCoreKeys"], keyof Buffer[]>]: never; };
45-
} & { [K_6 in Exclude<keyof I, keyof ProjectExtension>]: never; }>(base?: I): ProjectExtension;
46-
fromPartial<I_1 extends {
47-
wantCoreKeys?: Buffer[];
48-
authCoreKeys?: Buffer[];
49-
configCoreKeys?: Buffer[];
50-
dataCoreKeys?: Buffer[];
51-
blobIndexCoreKeys?: Buffer[];
52-
blobCoreKeys?: Buffer[];
53-
} & {
54-
wantCoreKeys?: Buffer[] & Buffer[] & { [K_7 in Exclude<keyof I_1["wantCoreKeys"], keyof Buffer[]>]: never; };
55-
authCoreKeys?: Buffer[] & Buffer[] & { [K_8 in Exclude<keyof I_1["authCoreKeys"], keyof Buffer[]>]: never; };
56-
configCoreKeys?: Buffer[] & Buffer[] & { [K_9 in Exclude<keyof I_1["configCoreKeys"], keyof Buffer[]>]: never; };
57-
dataCoreKeys?: Buffer[] & Buffer[] & { [K_10 in Exclude<keyof I_1["dataCoreKeys"], keyof Buffer[]>]: never; };
58-
blobIndexCoreKeys?: Buffer[] & Buffer[] & { [K_11 in Exclude<keyof I_1["blobIndexCoreKeys"], keyof Buffer[]>]: never; };
59-
blobCoreKeys?: Buffer[] & Buffer[] & { [K_12 in Exclude<keyof I_1["blobCoreKeys"], keyof Buffer[]>]: never; };
60-
} & { [K_13 in Exclude<keyof I_1, keyof ProjectExtension>]: never; }>(object: I_1): ProjectExtension;
25+
create<I extends Exact<DeepPartial<ProjectExtension>, I>>(base?: I): ProjectExtension;
26+
fromPartial<I extends Exact<DeepPartial<ProjectExtension>, I>>(object: I): ProjectExtension;
6127
};
6228
export declare const HaveExtension: {
6329
encode(message: HaveExtension, writer?: _m0.Writer): _m0.Writer;
6430
decode(input: _m0.Reader | Uint8Array, length?: number): HaveExtension;
65-
create<I extends {
66-
discoveryKey?: Buffer;
67-
start?: number;
68-
encodedBitfield?: Buffer;
69-
namespace?: HaveExtension_Namespace;
70-
} & {
71-
discoveryKey?: Buffer;
72-
start?: number;
73-
encodedBitfield?: Buffer;
74-
namespace?: HaveExtension_Namespace;
75-
} & { [K in Exclude<keyof I, keyof HaveExtension>]: never; }>(base?: I): HaveExtension;
76-
fromPartial<I_1 extends {
77-
discoveryKey?: Buffer;
78-
start?: number;
79-
encodedBitfield?: Buffer;
80-
namespace?: HaveExtension_Namespace;
81-
} & {
82-
discoveryKey?: Buffer;
83-
start?: number;
84-
encodedBitfield?: Buffer;
85-
namespace?: HaveExtension_Namespace;
86-
} & { [K_1 in Exclude<keyof I_1, keyof HaveExtension>]: never; }>(object: I_1): HaveExtension;
31+
create<I extends Exact<DeepPartial<HaveExtension>, I>>(base?: I): HaveExtension;
32+
fromPartial<I extends Exact<DeepPartial<HaveExtension>, I>>(object: I): HaveExtension;
8733
};
34+
type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined;
35+
type DeepPartial<T> = T extends Builtin ? T : T extends Array<infer U> ? Array<DeepPartial<U>> : T extends ReadonlyArray<infer U> ? ReadonlyArray<DeepPartial<U>> : T extends {} ? {
36+
[K in keyof T]?: DeepPartial<T[K]>;
37+
} : Partial<T>;
38+
type KeysOfUnion<T> = T extends T ? keyof T : never;
39+
type Exact<P, I extends P> = P extends Builtin ? P : P & {
40+
[K in keyof P]: Exact<P[K], I[K]>;
41+
} & {
42+
[K in Exclude<keyof I, KeysOfUnion<P>>]: never;
43+
};
44+
export {};

0 commit comments

Comments
 (0)