Skip to content

Commit 0039e60

Browse files
authored
Merge pull request #946 from Winter-Soren/feat/945-persistent-storage-for-peerstore
feat/945-persistent-storage-for-peerstore
2 parents 71d3d50 + 8e27505 commit 0039e60

36 files changed

+7278
-16
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Example usage of persistent peerstore with safe serialization.
4+
5+
This example demonstrates how to use the new persistent peerstore implementation
6+
with Protocol Buffer serialization instead of unsafe pickle.
7+
"""
8+
9+
import asyncio
10+
from pathlib import Path
11+
import tempfile
12+
13+
from multiaddr import Multiaddr
14+
15+
from libp2p.peer.id import ID
16+
from libp2p.peer.persistent import (
17+
create_async_peerstore,
18+
create_sync_peerstore,
19+
)
20+
21+
22+
async def async_example():
23+
"""Demonstrate async persistent peerstore usage."""
24+
print("=== Async Persistent Peerstore Example ===")
25+
26+
with tempfile.TemporaryDirectory() as temp_dir:
27+
db_path = Path(temp_dir) / "async_peers.db"
28+
29+
# Create peerstore with safe serialization and configurable sync
30+
async with create_async_peerstore(
31+
db_path=db_path,
32+
backend="sqlite",
33+
sync_interval=0.5, # Sync every 0.5 seconds
34+
auto_sync=True,
35+
) as peerstore:
36+
# Create a test peer
37+
peer_id = ID.from_base58("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN")
38+
39+
# Add some addresses
40+
addr1 = Multiaddr("/ip4/127.0.0.1/tcp/4001")
41+
addr2 = Multiaddr("/ip6/::1/tcp/4001")
42+
43+
await peerstore.add_addrs_async(peer_id, [addr1, addr2], 3600)
44+
45+
# Add protocols
46+
await peerstore.add_protocols_async(
47+
peer_id, ["/ipfs/ping/1.0.0", "/ipfs/id/1.0.0"]
48+
)
49+
50+
# Add metadata
51+
await peerstore.put_async(peer_id, "agent", "py-libp2p-example")
52+
53+
print(f"Added peer {peer_id}")
54+
print(f"Addresses: {await peerstore.addrs_async(peer_id)}")
55+
print(f"Protocols: {await peerstore.get_protocols_async(peer_id)}")
56+
print(f"Metadata: {await peerstore.get_async(peer_id, 'agent')}")
57+
58+
print("Peerstore closed safely using context manager")
59+
60+
61+
def sync_example():
62+
"""Demonstrate sync persistent peerstore usage."""
63+
print("\n=== Sync Persistent Peerstore Example ===")
64+
65+
with tempfile.TemporaryDirectory() as temp_dir:
66+
db_path = Path(temp_dir) / "sync_peers.db"
67+
68+
# Create peerstore with safe serialization and configurable sync
69+
with create_sync_peerstore(
70+
db_path=db_path,
71+
backend="sqlite",
72+
sync_interval=1.0, # Sync every 1 second
73+
auto_sync=False, # Manual sync control
74+
) as peerstore:
75+
# Create a test peer
76+
peer_id = ID.from_base58("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN")
77+
78+
# Add some addresses
79+
addr1 = Multiaddr("/ip4/192.168.1.100/tcp/4001")
80+
addr2 = Multiaddr("/ip4/10.0.0.1/tcp/4001")
81+
82+
peerstore.add_addrs(peer_id, [addr1, addr2], 7200)
83+
84+
# Add protocols
85+
peerstore.add_protocols(peer_id, ["/libp2p/circuit/relay/0.1.0"])
86+
87+
# Add metadata
88+
peerstore.put(peer_id, "version", "1.0.0")
89+
90+
# Manual sync since auto_sync=False
91+
peerstore.datastore.sync(b"")
92+
93+
print(f"Added peer {peer_id}")
94+
print(f"Addresses: {peerstore.addrs(peer_id)}")
95+
print(f"Protocols: {peerstore.get_protocols(peer_id)}")
96+
print(f"Metadata: {peerstore.get(peer_id, 'version')}")
97+
98+
print("Peerstore closed safely using context manager")
99+
100+
101+
def security_example():
102+
"""Demonstrate security improvements."""
103+
print("\n=== Security Improvements ===")
104+
print("✅ Replaced unsafe pickle with Protocol Buffers")
105+
print("✅ Added context manager support for proper resource cleanup")
106+
print("✅ Improved SQLite thread safety with WAL mode")
107+
print("✅ Added configurable sync for better performance")
108+
print("✅ Enhanced error handling with specific exceptions")
109+
print("✅ Added proper file permissions (0600) for database files")
110+
111+
112+
if __name__ == "__main__":
113+
# Run async example
114+
asyncio.run(async_example())
115+
116+
# Run sync example
117+
sync_example()
118+
119+
# Show security improvements
120+
security_example()
121+
122+
print("\n🎉 All examples completed successfully!")
123+
print("The persistent peerstore now uses safe Protocol Buffer serialization")
124+
print("and provides better resource management and performance control.")
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Example demonstrating the usage of PersistentPeerStore.
4+
5+
This example shows how to use the PersistentPeerStore with different datastore backends
6+
to maintain peer information across application restarts.
7+
"""
8+
9+
from pathlib import Path
10+
import tempfile
11+
12+
from multiaddr import Multiaddr
13+
import trio
14+
15+
from libp2p.peer.id import ID
16+
from libp2p.peer.persistent import (
17+
AsyncPersistentPeerStore,
18+
create_async_leveldb_peerstore,
19+
create_async_memory_peerstore,
20+
create_async_rocksdb_peerstore,
21+
create_async_sqlite_peerstore,
22+
)
23+
24+
25+
async def demonstrate_peerstore_operations(
26+
peerstore: AsyncPersistentPeerStore, name: str
27+
):
28+
"""Demonstrate basic peerstore operations."""
29+
print(f"\n=== {name} PeerStore Demo ===")
30+
31+
# Create some test peer IDs
32+
peer_id_1 = ID.from_base58("QmPeer1")
33+
peer_id_2 = ID.from_base58("QmPeer2")
34+
35+
# Add addresses for peers
36+
addr1 = Multiaddr("/ip4/127.0.0.1/tcp/4001")
37+
addr2 = Multiaddr("/ip4/192.168.1.1/tcp/4002")
38+
39+
print(f"Adding addresses for {peer_id_1}")
40+
await peerstore.add_addrs_async(peer_id_1, [addr1], 3600) # 1 hour TTL
41+
42+
print(f"Adding addresses for {peer_id_2}")
43+
await peerstore.add_addrs_async(peer_id_2, [addr2], 7200) # 2 hours TTL
44+
45+
# Add protocols
46+
print(f"Adding protocols for {peer_id_1}")
47+
await peerstore.add_protocols_async(
48+
peer_id_1, ["/ipfs/ping/1.0.0", "/ipfs/id/1.0.0"]
49+
)
50+
51+
print(f"Adding protocols for {peer_id_2}")
52+
await peerstore.add_protocols_async(
53+
peer_id_2, ["/ipfs/ping/1.0.0", "/ipfs/kad/1.0.0"]
54+
)
55+
56+
# Add metadata
57+
print(f"Adding metadata for {peer_id_1}")
58+
await peerstore.put_async(peer_id_1, "agent", "go-libp2p/0.1.0")
59+
await peerstore.put_async(peer_id_1, "version", "1.0.0")
60+
61+
print(f"Adding metadata for {peer_id_2}")
62+
await peerstore.put_async(peer_id_2, "agent", "js-libp2p/0.1.0")
63+
await peerstore.put_async(peer_id_2, "version", "2.0.0")
64+
65+
# Record latency metrics
66+
print(f"Recording latency for {peer_id_1}")
67+
await peerstore.record_latency_async(peer_id_1, 0.05) # 50ms
68+
69+
print(f"Recording latency for {peer_id_2}")
70+
await peerstore.record_latency_async(peer_id_2, 0.1) # 100ms
71+
72+
# Retrieve and display information
73+
print(f"\nRetrieved peer info for {peer_id_1}:")
74+
try:
75+
peer_info = await peerstore.peer_info_async(peer_id_1)
76+
print(f" Addresses: {[str(addr) for addr in peer_info.addrs]}")
77+
except Exception as e:
78+
print(f" Error: {e}")
79+
80+
print(f"\nRetrieved protocols for {peer_id_1}:")
81+
try:
82+
protocols = await peerstore.get_protocols_async(peer_id_1)
83+
print(f" Protocols: {protocols}")
84+
except Exception as e:
85+
print(f" Error: {e}")
86+
87+
print(f"\nRetrieved metadata for {peer_id_1}:")
88+
try:
89+
agent = await peerstore.get_async(peer_id_1, "agent")
90+
version = await peerstore.get_async(peer_id_1, "version")
91+
print(f" Agent: {agent}")
92+
print(f" Version: {version}")
93+
except Exception as e:
94+
print(f" Error: {e}")
95+
96+
print(f"\nRetrieved latency for {peer_id_1}:")
97+
try:
98+
latency = await peerstore.latency_EWMA_async(peer_id_1)
99+
print(f" Latency EWMA: {latency:.3f}s")
100+
except Exception as e:
101+
print(f" Error: {e}")
102+
103+
# List all peers
104+
peer_ids = await peerstore.peer_ids_async()
105+
valid_peer_ids = await peerstore.valid_peer_ids_async()
106+
peers_with_addrs = await peerstore.peers_with_addrs_async()
107+
print(f"\nAll peer IDs: {[str(pid) for pid in peer_ids]}")
108+
print(f"Valid peer IDs: {[str(pid) for pid in valid_peer_ids]}")
109+
print(f"Peers with addresses: {[str(pid) for pid in peers_with_addrs]}")
110+
111+
112+
async def demonstrate_persistence():
113+
"""Demonstrate persistence across restarts."""
114+
print("\n=== Persistence Demo ===")
115+
116+
# Create a temporary directory for SQLite database
117+
with tempfile.TemporaryDirectory() as temp_dir:
118+
db_path = Path(temp_dir) / "peerstore.db"
119+
120+
# First session - add some data
121+
print("First session: Adding peer data...")
122+
peerstore1 = create_async_sqlite_peerstore(str(db_path))
123+
124+
peer_id = ID.from_base58("QmPersistentPeer")
125+
addr = Multiaddr("/ip4/10.0.0.1/tcp/4001")
126+
127+
await peerstore1.add_addrs_async(peer_id, [addr], 3600)
128+
await peerstore1.add_protocols_async(peer_id, ["/ipfs/ping/1.0.0"])
129+
await peerstore1.put_async(peer_id, "session", "first")
130+
131+
print(f"Added peer {peer_id} with address {addr}")
132+
protocols = await peerstore1.get_protocols_async(peer_id)
133+
metadata = await peerstore1.get_async(peer_id, "session")
134+
print(f"Peer protocols: {protocols}")
135+
print(f"Peer metadata: {metadata}")
136+
137+
# Close the first peerstore
138+
await peerstore1.close_async()
139+
140+
# Second session - data should persist
141+
print("\nSecond session: Reopening peerstore...")
142+
peerstore2 = create_async_sqlite_peerstore(str(db_path))
143+
144+
# Check if data persisted
145+
try:
146+
peer_info = await peerstore2.peer_info_async(peer_id)
147+
protocols = await peerstore2.get_protocols_async(peer_id)
148+
metadata = await peerstore2.get_async(peer_id, "session")
149+
print(f"Retrieved peer info: {[str(addr) for addr in peer_info.addrs]}")
150+
print(f"Retrieved protocols: {protocols}")
151+
print(f"Retrieved metadata: {metadata}")
152+
print("✅ Data persisted successfully!")
153+
except Exception as e:
154+
print(f"❌ Data did not persist: {e}")
155+
156+
# Update data in second session
157+
await peerstore2.put_async(peer_id, "session", "second")
158+
updated_metadata = await peerstore2.get_async(peer_id, "session")
159+
print(f"Updated metadata: {updated_metadata}")
160+
161+
await peerstore2.close_async()
162+
163+
164+
async def demonstrate_different_backends():
165+
"""Demonstrate different datastore backends."""
166+
print("\n=== Different Backend Demo ===")
167+
168+
# Memory backend (not persistent)
169+
print("\n1. Memory Backend (not persistent):")
170+
memory_peerstore = create_async_memory_peerstore()
171+
await demonstrate_peerstore_operations(memory_peerstore, "Memory")
172+
await memory_peerstore.close_async()
173+
174+
# SQLite backend
175+
print("\n2. SQLite Backend:")
176+
with tempfile.TemporaryDirectory() as temp_dir:
177+
sqlite_peerstore = create_async_sqlite_peerstore(Path(temp_dir) / "sqlite.db")
178+
await demonstrate_peerstore_operations(sqlite_peerstore, "SQLite")
179+
await sqlite_peerstore.close_async()
180+
181+
# LevelDB backend (if available)
182+
print("\n3. LevelDB Backend:")
183+
try:
184+
with tempfile.TemporaryDirectory() as temp_dir:
185+
leveldb_peerstore = create_async_leveldb_peerstore(
186+
Path(temp_dir) / "leveldb"
187+
)
188+
await demonstrate_peerstore_operations(leveldb_peerstore, "LevelDB")
189+
await leveldb_peerstore.close_async()
190+
except ImportError:
191+
print("LevelDB backend not available (plyvel not installed)")
192+
193+
# RocksDB backend (if available)
194+
print("\n4. RocksDB Backend:")
195+
try:
196+
with tempfile.TemporaryDirectory() as temp_dir:
197+
rocksdb_peerstore = create_async_rocksdb_peerstore(
198+
Path(temp_dir) / "rocksdb"
199+
)
200+
await demonstrate_peerstore_operations(rocksdb_peerstore, "RocksDB")
201+
await rocksdb_peerstore.close_async()
202+
except ImportError:
203+
print("RocksDB backend not available (pyrocksdb not installed)")
204+
205+
206+
async def demonstrate_async_operations():
207+
"""Demonstrate async operations and cleanup."""
208+
print("\n=== Async Operations Demo ===")
209+
210+
with tempfile.TemporaryDirectory() as temp_dir:
211+
peerstore = create_async_sqlite_peerstore(Path(temp_dir) / "async.db")
212+
213+
# Start cleanup task
214+
print("Starting cleanup task...")
215+
async with trio.open_nursery() as nursery:
216+
nursery.start_soon(peerstore.start_cleanup_task, 1) # 1 second interval
217+
218+
# Add some peers
219+
peer_id = ID.from_base58("QmAsyncPeer")
220+
addr = Multiaddr("/ip4/127.0.0.1/tcp/4001")
221+
await peerstore.add_addrs_async(peer_id, [addr], 1) # 1 second TTL
222+
223+
print("Added peer with 1-second TTL")
224+
addrs = await peerstore.addrs_async(peer_id)
225+
print(f"Peer addresses: {[str(addr) for addr in addrs]}")
226+
227+
# Wait for expiration
228+
print("Waiting for peer to expire...")
229+
await trio.sleep(2)
230+
231+
# Check if peer expired
232+
try:
233+
addrs = await peerstore.addrs_async(peer_id)
234+
print(f"Peer still has addresses: {[str(addr) for addr in addrs]}")
235+
except Exception as e:
236+
print(f"Peer expired: {e}")
237+
238+
# Stop the cleanup task
239+
nursery.cancel_scope.cancel()
240+
241+
await peerstore.close_async()
242+
243+
244+
async def main():
245+
"""Main demonstration function."""
246+
print("PersistentPeerStore Usage Examples")
247+
print("=" * 50)
248+
249+
# Demonstrate basic operations
250+
basic_peerstore = create_async_memory_peerstore()
251+
await demonstrate_peerstore_operations(basic_peerstore, "Basic")
252+
await basic_peerstore.close_async()
253+
254+
# Demonstrate persistence
255+
await demonstrate_persistence()
256+
257+
# Demonstrate different backends
258+
await demonstrate_different_backends()
259+
260+
# Demonstrate async operations
261+
await demonstrate_async_operations()
262+
263+
print("\n" + "=" * 50)
264+
print("All examples completed!")
265+
266+
267+
if __name__ == "__main__":
268+
trio.run(main)

0 commit comments

Comments
 (0)