Skip to content

Commit 111851e

Browse files
committed
all: Use async wasmtime
In particular, that makes the host function stubs in runtime/wasm/src/module/context.rs async which in turn will make it possible to use async implementations in lower levels of the stack, for example, in the store.
1 parent bb7d281 commit 111851e

File tree

24 files changed

+1468
-1048
lines changed

24 files changed

+1468
-1048
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ tonic = { version = "0.12.3", features = ["tls-roots", "gzip"] }
9595
tonic-build = { version = "0.12.3", features = ["prost"] }
9696
tower-http = { version = "0.6.6", features = ["cors"] }
9797
wasmparser = "0.118.1"
98-
wasmtime = { version = "33.0.2" }
98+
wasmtime = { version = "33.0.2", featuers = ["async"] }
9999
substreams = "=0.6.0"
100100
substreams-entity-change = "2"
101101
substreams-near-core = "=0.10.2"

chain/ethereum/src/runtime/abi.rs

Lines changed: 207 additions & 204 deletions
Large diffs are not rendered by default.

chain/ethereum/src/runtime/runtime_adapter.rs

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use graph::data::store::scalar::BigInt;
1414
use graph::data::subgraph::{API_VERSION_0_0_4, API_VERSION_0_0_9};
1515
use graph::data_source;
1616
use graph::data_source::common::{ContractCall, MappingABI};
17+
use graph::futures03::FutureExt as _;
1718
use graph::prelude::web3::types::H160;
1819
use graph::runtime::gas::Gas;
1920
use graph::runtime::{AscIndexId, IndexForAscTypeId};
@@ -95,20 +96,27 @@ impl blockchain::RuntimeAdapter<Chain> for RuntimeAdapter {
9596
let call_cache = call_cache.clone();
9697
let abis = abis.clone();
9798
move |ctx, wasm_ptr| {
98-
let eth_adapter =
99-
eth_adapters.call_or_cheapest(Some(&NodeCapabilities {
100-
archive,
101-
traces: false,
102-
}))?;
103-
ethereum_call(
104-
&eth_adapter,
105-
call_cache.clone(),
106-
ctx,
107-
wasm_ptr,
108-
&abis,
109-
eth_call_gas,
110-
)
111-
.map(|ptr| ptr.wasm_ptr())
99+
let eth_adapters = eth_adapters.cheap_clone();
100+
let call_cache = call_cache.cheap_clone();
101+
let abis = abis.cheap_clone();
102+
async move {
103+
let eth_adapter =
104+
eth_adapters.call_or_cheapest(Some(&NodeCapabilities {
105+
archive,
106+
traces: false,
107+
}))?;
108+
ethereum_call(
109+
&eth_adapter,
110+
call_cache.clone(),
111+
ctx,
112+
wasm_ptr,
113+
&abis,
114+
eth_call_gas,
115+
)
116+
.await
117+
.map(|ptr| ptr.wasm_ptr())
118+
}
119+
.boxed()
112120
}
113121
}),
114122
},
@@ -117,26 +125,37 @@ impl blockchain::RuntimeAdapter<Chain> for RuntimeAdapter {
117125
func: Arc::new({
118126
let eth_adapters = eth_adapters.clone();
119127
move |ctx, wasm_ptr| {
120-
let eth_adapter =
121-
eth_adapters.unverified_cheapest_with(&NodeCapabilities {
122-
archive,
123-
traces: false,
124-
})?;
125-
eth_get_balance(&eth_adapter, ctx, wasm_ptr).map(|ptr| ptr.wasm_ptr())
128+
let eth_adapters = eth_adapters.cheap_clone();
129+
async move {
130+
let eth_adapter =
131+
eth_adapters.unverified_cheapest_with(&NodeCapabilities {
132+
archive,
133+
traces: false,
134+
})?;
135+
eth_get_balance(&eth_adapter, ctx, wasm_ptr)
136+
.await
137+
.map(|ptr| ptr.wasm_ptr())
138+
}
139+
.boxed()
126140
}
127141
}),
128142
},
129143
HostFn {
130144
name: "ethereum.hasCode",
131145
func: Arc::new({
132-
let eth_adapters = eth_adapters.clone();
133146
move |ctx, wasm_ptr| {
134-
let eth_adapter =
135-
eth_adapters.unverified_cheapest_with(&NodeCapabilities {
136-
archive,
137-
traces: false,
138-
})?;
139-
eth_has_code(&eth_adapter, ctx, wasm_ptr).map(|ptr| ptr.wasm_ptr())
147+
let eth_adapters = eth_adapters.cheap_clone();
148+
async move {
149+
let eth_adapter =
150+
eth_adapters.unverified_cheapest_with(&NodeCapabilities {
151+
archive,
152+
traces: false,
153+
})?;
154+
eth_has_code(&eth_adapter, ctx, wasm_ptr)
155+
.await
156+
.map(|ptr| ptr.wasm_ptr())
157+
}
158+
.boxed()
140159
}
141160
}),
142161
},
@@ -170,10 +189,10 @@ impl blockchain::RuntimeAdapter<Chain> for RuntimeAdapter {
170189
}
171190

172191
/// function ethereum.call(call: SmartContractCall): Array<Token> | null
173-
fn ethereum_call(
192+
async fn ethereum_call(
174193
eth_adapter: &EthereumAdapter,
175194
call_cache: Arc<dyn EthereumCallCache>,
176-
ctx: HostFnCtx,
195+
ctx: HostFnCtx<'_>,
177196
wasm_ptr: u32,
178197
abis: &[Arc<MappingABI>],
179198
eth_call_gas: Option<u32>,
@@ -201,12 +220,12 @@ fn ethereum_call(
201220
ctx.metrics.cheap_clone(),
202221
)?;
203222
match result {
204-
Some(tokens) => Ok(asc_new(ctx.heap, tokens.as_slice(), &ctx.gas)?),
223+
Some(tokens) => Ok(asc_new(ctx.heap, tokens.as_slice(), &ctx.gas).await?),
205224
None => Ok(AscPtr::null()),
206225
}
207226
}
208227

209-
fn eth_get_balance(
228+
async fn eth_get_balance(
210229
eth_adapter: &EthereumAdapter,
211230
ctx: HostFnCtx<'_>,
212231
wasm_ptr: u32,
@@ -230,7 +249,7 @@ fn eth_get_balance(
230249
match result {
231250
Ok(v) => {
232251
let bigint = BigInt::from_unsigned_u256(&v);
233-
Ok(asc_new(ctx.heap, &bigint, &ctx.gas)?)
252+
Ok(asc_new(ctx.heap, &bigint, &ctx.gas).await?)
234253
}
235254
// Retry on any kind of error
236255
Err(EthereumRpcError::Web3Error(e)) => Err(HostExportError::PossibleReorg(e.into())),
@@ -240,7 +259,7 @@ fn eth_get_balance(
240259
}
241260
}
242261

243-
fn eth_has_code(
262+
async fn eth_has_code(
244263
eth_adapter: &EthereumAdapter,
245264
ctx: HostFnCtx<'_>,
246265
wasm_ptr: u32,
@@ -263,7 +282,7 @@ fn eth_has_code(
263282
.map(|v| !v.0.is_empty());
264283

265284
match result {
266-
Ok(v) => Ok(asc_new(ctx.heap, &AscWrapped { inner: v }, &ctx.gas)?),
285+
Ok(v) => Ok(asc_new(ctx.heap, &AscWrapped { inner: v }, &ctx.gas).await?),
267286
// Retry on any kind of error
268287
Err(EthereumRpcError::Web3Error(e)) => Err(HostExportError::PossibleReorg(e.into())),
269288
Err(EthereumRpcError::Timeout) => Err(HostExportError::PossibleReorg(

chain/ethereum/src/trigger.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use graph::data::subgraph::API_VERSION_0_0_2;
44
use graph::data::subgraph::API_VERSION_0_0_6;
55
use graph::data::subgraph::API_VERSION_0_0_7;
66
use graph::data_source::common::DeclaredCall;
7+
use graph::prelude::async_trait;
78
use graph::prelude::ethabi::ethereum_types::H160;
89
use graph::prelude::ethabi::ethereum_types::H256;
910
use graph::prelude::ethabi::ethereum_types::U128;
@@ -129,8 +130,9 @@ impl std::fmt::Debug for MappingTrigger {
129130
}
130131
}
131132

133+
#[async_trait]
132134
impl ToAscPtr for MappingTrigger {
133-
fn to_asc_ptr<H: AscHeap>(
135+
async fn to_asc_ptr<H: AscHeap>(
134136
self,
135137
heap: &mut H,
136138
gas: &GasCounter,
@@ -159,28 +161,31 @@ impl ToAscPtr for MappingTrigger {
159161
>,
160162
_,
161163
_,
162-
>(heap, &(ethereum_event_data, receipt.as_deref()), gas)?
164+
>(heap, &(ethereum_event_data, receipt.as_deref()), gas)
165+
.await?
163166
.erase()
164167
} else if api_version >= &API_VERSION_0_0_6 {
165168
asc_new::<
166169
AscEthereumEvent<AscEthereumTransaction_0_0_6, AscEthereumBlock_0_0_6>,
167170
_,
168171
_,
169-
>(heap, &ethereum_event_data, gas)?
172+
>(heap, &ethereum_event_data, gas)
173+
.await?
170174
.erase()
171175
} else if api_version >= &API_VERSION_0_0_2 {
172176
asc_new::<
173177
AscEthereumEvent<AscEthereumTransaction_0_0_2, AscEthereumBlock>,
174178
_,
175179
_,
176-
>(heap, &ethereum_event_data, gas)?
180+
>(heap, &ethereum_event_data, gas)
181+
.await?
177182
.erase()
178183
} else {
179184
asc_new::<
180185
AscEthereumEvent<AscEthereumTransaction_0_0_1, AscEthereumBlock>,
181186
_,
182187
_,
183-
>(heap, &ethereum_event_data, gas)?
188+
>(heap, &ethereum_event_data, gas).await?
184189
.erase()
185190
}
186191
}
@@ -197,25 +202,33 @@ impl ToAscPtr for MappingTrigger {
197202
AscEthereumCall_0_0_3<AscEthereumTransaction_0_0_6, AscEthereumBlock_0_0_6>,
198203
_,
199204
_,
200-
>(heap, &call, gas)?
205+
>(heap, &call, gas)
206+
.await?
201207
.erase()
202208
} else if heap.api_version() >= &Version::new(0, 0, 3) {
203209
asc_new::<
204210
AscEthereumCall_0_0_3<AscEthereumTransaction_0_0_2, AscEthereumBlock>,
205211
_,
206212
_,
207-
>(heap, &call, gas)?
213+
>(heap, &call, gas)
214+
.await?
208215
.erase()
209216
} else {
210-
asc_new::<AscEthereumCall, _, _>(heap, &call, gas)?.erase()
217+
asc_new::<AscEthereumCall, _, _>(heap, &call, gas)
218+
.await?
219+
.erase()
211220
}
212221
}
213222
MappingTrigger::Block { block } => {
214223
let block = EthereumBlockData::from(block.as_ref());
215224
if heap.api_version() >= &Version::new(0, 0, 6) {
216-
asc_new::<AscEthereumBlock_0_0_6, _, _>(heap, &block, gas)?.erase()
225+
asc_new::<AscEthereumBlock_0_0_6, _, _>(heap, &block, gas)
226+
.await?
227+
.erase()
217228
} else {
218-
asc_new::<AscEthereumBlock, _, _>(heap, &block, gas)?.erase()
229+
asc_new::<AscEthereumBlock, _, _>(heap, &block, gas)
230+
.await?
231+
.erase()
219232
}
220233
}
221234
})

0 commit comments

Comments
 (0)