diff --git a/core/src/amp_subgraph/manager.rs b/core/src/amp_subgraph/manager.rs index ae272830880..121de23f735 100644 --- a/core/src/amp_subgraph/manager.rs +++ b/core/src/amp_subgraph/manager.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use alloy::primitives::BlockNumber; @@ -8,9 +9,11 @@ use graph::{ components::{ link_resolver::{LinkResolver, LinkResolverContext}, metrics::MetricsRegistry, + network_provider::{AmpChainConfig, AmpClients, ChainName}, store::{DeploymentLocator, SubgraphStore}, subgraph::SubgraphInstanceManager, }, + data::subgraph::network_name_from_raw_manifest, env::EnvVars, log::factory::LoggerFactory, prelude::CheapClone, @@ -31,7 +34,8 @@ pub struct Manager { monitor: Monitor, subgraph_store: Arc, link_resolver: Arc, - amp_client: Arc, + amp_clients: AmpClients, + amp_chain_configs: HashMap, } impl Manager @@ -47,7 +51,8 @@ where cancel_token: &CancellationToken, subgraph_store: Arc, link_resolver: Arc, - amp_client: Arc, + amp_clients: AmpClients, + amp_chain_configs: HashMap, ) -> Self { let logger = logger_factory.component_logger("AmpSubgraphManager", None); let logger_factory = logger_factory.with_parent(logger); @@ -61,7 +66,8 @@ where monitor, subgraph_store, link_resolver, - amp_client, + amp_clients, + amp_chain_configs, } } } @@ -112,16 +118,48 @@ where .await .context("failed to load subgraph manifest")?; - let raw_manifest = serde_yaml::from_slice(&manifest_bytes) + let raw_manifest: serde_yaml::Mapping = serde_yaml::from_slice(&manifest_bytes) .context("failed to parse subgraph manifest")?; + // Extract the network name from the raw manifest to look + // up the per-chain Amp client. + let network_name = network_name_from_raw_manifest(&raw_manifest); + + let amp_client = match &network_name { + Some(network) => match manager.amp_clients.get(network) { + Some(client) => client, + None => { + anyhow::bail!( + "Amp is not configured for chain '{}'; \ + cannot start Amp subgraph '{}'", + network, + deployment.hash + ); + } + }, + None => { + anyhow::bail!( + "no network name found in manifest for Amp subgraph '{}'", + deployment.hash + ); + } + }; + + let amp_context = network_name.as_ref().and_then(|chain| { + manager + .amp_chain_configs + .get(chain) + .map(|cfg| cfg.context()) + }); + let mut manifest = amp::Manifest::resolve::( &logger, manager.link_resolver.cheap_clone(), - manager.amp_client.cheap_clone(), + amp_client.cheap_clone(), manager.env_vars.max_spec_version.cheap_clone(), deployment.hash.cheap_clone(), raw_manifest, + amp_context, ) .await?; @@ -139,7 +177,7 @@ where let runner_context = runner::Context::new( &logger, &manager.env_vars.amp, - manager.amp_client.cheap_clone(), + amp_client, store, deployment.hash.cheap_clone(), manifest, diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 5d0c89ae171..cae8c2141da 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -15,9 +15,12 @@ use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities}; use graph::components::metrics::gas::GasMetrics; use graph::components::metrics::subgraph::DeploymentStatusMetric; +use graph::components::network_provider::AmpClients; use graph::components::store::SourceableStore; use graph::components::subgraph::ProofOfIndexingVersion; -use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6}; +use graph::data::subgraph::{ + network_name_from_raw_manifest, UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6, +}; use graph::data::value::Word; use graph::data_source::causality_region::CausalityRegionSeq; use graph::env::EnvVars; @@ -41,7 +44,7 @@ pub struct SubgraphInstanceManager { link_resolver: Arc, ipfs_service: IpfsService, arweave_service: ArweaveService, - amp_client: Option>, + amp_clients: AmpClients, static_filters: bool, env_vars: Arc, @@ -175,7 +178,7 @@ impl SubgraphInstanceManager { link_resolver: Arc, ipfs_service: IpfsService, arweave_service: ArweaveService, - amp_client: Option>, + amp_clients: AmpClients, static_filters: bool, ) -> Self { let logger = logger_factory.component_logger("SubgraphInstanceManager", None); @@ -189,7 +192,7 @@ impl SubgraphInstanceManager { instances: SubgraphKeepAlive::new(sg_metrics), link_resolver, ipfs_service, - amp_client, + amp_clients, static_filters, env_vars, arweave_service, @@ -267,6 +270,12 @@ impl SubgraphInstanceManager { let subgraph_store = self.subgraph_store.cheap_clone(); let registry = self.metrics_registry.cheap_clone(); + // Look up the per-chain Amp client based on the network from the + // raw manifest (before the manifest is moved into parse). + let amp_client = network_name_from_raw_manifest(&raw_manifest) + .as_ref() + .and_then(|network| self.amp_clients.get(network)); + let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), raw_manifest)?; @@ -300,7 +309,8 @@ impl SubgraphInstanceManager { .resolve( &deployment.hash, &link_resolver, - self.amp_client.cheap_clone(), + amp_client, + None, &logger, ENV_VARS.max_spec_version.clone(), ) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 9b0efe21abc..8227f1912c5 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -1,19 +1,20 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use async_trait::async_trait; use graph::amp; use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap}; +use graph::components::network_provider::ChainName; use graph::components::{ link_resolver::LinkResolverContext, - network_provider::AmpChainNames, + network_provider::{AmpChainConfig, AmpChainNames, AmpClients}, store::{DeploymentId, DeploymentLocator, SubscriptionManager}, subgraph::Settings, }; use graph::data::{ - subgraph::{schema::DeploymentCreate, Graft}, + subgraph::{network_name_from_raw_manifest, schema::DeploymentCreate, Graft}, value::Word, }; -use graph::futures03::{self, future::TryFutureExt, Stream, StreamExt}; +use graph::futures03::{self, future::TryFutureExt, Stream, StreamExt, TryStreamExt}; use graph::prelude::{CreateSubgraphResult, SubgraphRegistrar as SubgraphRegistrarTrait, *}; use graph::util::futures::{retry_strategy, RETRY_DEFAULT_LIMIT}; use tokio_retry::Retry; @@ -25,7 +26,8 @@ pub struct SubgraphRegistrar { provider: Arc

, store: Arc, subscription_manager: Arc, - amp_client: Option>, + amp_clients: AmpClients, + amp_chain_configs: HashMap, chains: Arc, node_id: NodeId, version_switching_mode: SubgraphVersionSwitchingMode, @@ -47,7 +49,8 @@ where provider: Arc

, store: Arc, subscription_manager: Arc, - amp_client: Option>, + amp_clients: AmpClients, + amp_chain_configs: HashMap, chains: Arc, node_id: NodeId, version_switching_mode: SubgraphVersionSwitchingMode, @@ -66,7 +69,8 @@ where provider, store, subscription_manager, - amp_client, + amp_clients, + amp_chain_configs, chains, node_id, version_switching_mode, @@ -297,6 +301,18 @@ where SubgraphRegistrarError::ResolveError(SubgraphManifestResolveError::ResolveError(e)) })?; + // Extract the network name from the raw manifest and resolve the + // per-chain Amp client (if any). + let resolved_amp_chain = network_name_from_raw_manifest(&raw) + .map(|network| self.amp_chain_names.resolve(&network)); + let amp_client = resolved_amp_chain + .as_ref() + .and_then(|chain| self.amp_clients.get(chain)); + let amp_context = resolved_amp_chain + .as_ref() + .and_then(|chain| self.amp_chain_configs.get(chain)) + .map(|cfg| cfg.context()); + // Give priority to deployment specific history_blocks value. let history_blocks = history_blocks.or(self.settings.for_name(&name).map(|c| c.history_blocks)); @@ -316,7 +332,8 @@ where debug_fork, self.version_switching_mode, &resolver, - self.amp_client.cheap_clone(), + amp_client.cheap_clone(), + amp_context.clone(), history_blocks, &self.amp_chain_names, ) @@ -336,7 +353,8 @@ where debug_fork, self.version_switching_mode, &resolver, - self.amp_client.cheap_clone(), + amp_client, + amp_context, history_blocks, &self.amp_chain_names, ) @@ -401,6 +419,54 @@ where } } +/// Resolves a block pointer for an Amp subgraph by querying the Amp Flight +/// service for the block hash at the given `block_number`. +async fn resolve_amp_start_block( + amp_client: &AC, + logger: &Logger, + context_dataset: &str, + context_table: &str, + block_number: BlockNumber, +) -> Result { + let sql = format!( + "SELECT * FROM {}.{} WHERE _block_num = {} LIMIT 1", + context_dataset, context_table, block_number + ); + + let mut stream = amp_client.query(logger, &sql, None); + + // Find the first Batch response, skipping any Reorg variants. + let data = loop { + match stream.try_next().await? { + Some(amp::client::ResponseBatch::Batch { data }) => break data, + Some(amp::client::ResponseBatch::Reorg(_)) => continue, + None => { + return Err(anyhow!( + "Amp query returned no batches for block {}", + block_number + )); + } + } + }; + + if data.num_rows() == 0 { + return Err(anyhow!( + "Amp query returned empty batch for block {}", + block_number + )); + } + + let (_col_name, decoder) = graph::amp::codec::utils::auto_block_hash_decoder(&data)?; + let hash = decoder.decode(0)?.ok_or_else(|| { + anyhow!( + "Amp query returned null block hash for block {}", + block_number + ) + })?; + + Ok(BlockPtr::new(hash.into(), block_number)) +} + /// Resolves the subgraph's earliest block async fn resolve_start_block( manifest: &SubgraphManifest, @@ -465,16 +531,22 @@ async fn create_subgraph_version, amp_client: Option>, + amp_context: Option<(String, String)>, history_blocks_override: Option, amp_chain_names: &AmpChainNames, ) -> Result { let raw_string = serde_yaml::to_string(&raw).unwrap(); + // Keep copies for Amp start block resolution after the manifest is resolved. + let amp_client_for_start_block = amp_client.clone(); + let amp_context_for_start_block = amp_context.clone(); + let unvalidated = UnvalidatedSubgraphManifest::::resolve( deployment.clone(), raw, resolver, amp_client, + amp_context, logger, ENV_VARS.max_spec_version.clone(), ) @@ -496,6 +568,7 @@ async fn create_subgraph_version(resolved_name.clone()) @@ -517,7 +590,40 @@ async fn create_subgraph_version Some(block), - None => resolve_start_block(&manifest, &*chain, &logger).await?, + None => { + let min_start_block = + manifest.start_blocks().into_iter().min().expect( + "cannot identify minimum start block because there are no data sources", + ); + + match ( + min_start_block, + &_client_for_start_block, + &_context_for_start_block, + ) { + // Genesis block — no resolution needed. + (0, _, _) => None, + // Amp subgraph with start_block > 0 — try Amp-based resolution. + (min, Some(client), Some((dataset, table))) if is_amp_subgraph => { + match resolve_amp_start_block(client.as_ref(), &logger, dataset, table, min - 1) + .await + { + Ok(ptr) => Some(ptr), + Err(e) => { + warn!( + logger, + "Amp block pointer resolution failed, falling back to RPC"; + "error" => e.to_string(), + "block_number" => min - 1 + ); + resolve_start_block(&manifest, &*chain, &logger).await? + } + } + } + // Non-Amp subgraph — use RPC. + _ => resolve_start_block(&manifest, &*chain, &logger).await?, + } + } }; let base_block = match &manifest.graft { @@ -584,3 +690,244 @@ async fn create_subgraph_version) -> std::fmt::Result { + write!(f, "{}", self.0) + } + } + + impl std::error::Error for MockAmpError {} + + impl IsDeterministic for MockAmpError { + fn is_deterministic(&self) -> bool { + false + } + } + + #[derive(Clone)] + struct MockAmpClient { + /// Recorded queries (for assertion). + recorded_queries: Arc>>, + /// Batches to return from `query()`. If `None`, the stream returns an error. + response: Arc, String>>, + } + + impl MockAmpClient { + fn new_ok(batches: Vec) -> Self { + Self { + recorded_queries: Arc::new(Mutex::new(Vec::new())), + response: Arc::new(Ok(batches)), + } + } + + fn new_err(msg: &str) -> Self { + Self { + recorded_queries: Arc::new(Mutex::new(Vec::new())), + response: Arc::new(Err(msg.to_string())), + } + } + + fn recorded_queries(&self) -> Vec { + self.recorded_queries.lock().unwrap().clone() + } + } + + impl amp::Client for MockAmpClient { + type Error = MockAmpError; + + fn schema( + &self, + _logger: &Logger, + _query: impl ToString, + ) -> BoxFuture<'static, Result> { + unimplemented!("schema not needed in tests") + } + + fn query( + &self, + _logger: &Logger, + query: impl ToString, + _request_metadata: Option, + ) -> BoxStream<'static, Result> { + let query_str = query.to_string(); + self.recorded_queries.lock().unwrap().push(query_str); + + let response = self.response.clone(); + Box::pin(graph::futures03::stream::iter(match response.as_ref() { + Ok(batches) => batches.iter().cloned().map(Ok).collect::>(), + Err(msg) => vec![Err(MockAmpError(msg.clone()))], + })) + } + } + + // -- Test helpers ----------------------------------------------------- + + /// Creates a RecordBatch with a single "block_hash" column containing one + /// 32-byte FixedSizeBinary value. + fn make_block_hash_batch(hash: AllocBlockHash) -> RecordBatch { + let schema = Schema::new(vec![Field::new( + "block_hash", + DataType::FixedSizeBinary(32), + false, + )]); + let values: Vec<&[u8]> = vec![hash.as_slice()]; + RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new( + FixedSizeBinaryArray::try_from_iter(values.into_iter()).unwrap(), + )], + ) + .unwrap() + } + + // -- Tests ------------------------------------------------------------ + + #[tokio::test] + async fn resolve_amp_start_block() { + let alloy_hash = AllocBlockHash::from([0xABu8; 32]); + let expected_hash: BlockHash = alloy_hash.into(); + let batch = make_block_hash_batch(alloy_hash); + let client = MockAmpClient::new_ok(vec![ResponseBatch::Batch { data: batch }]); + let logger = Logger::root(slog::Discard, o!()); + + let result = + super::resolve_amp_start_block(&client, &logger, "my_dataset", "blocks", 99).await; + + let ptr = result.expect("should succeed"); + assert_eq!(ptr.hash, expected_hash); + assert_eq!(ptr.number, 99); + + // Verify the SQL query. + let queries = client.recorded_queries(); + assert_eq!(queries.len(), 1); + assert_eq!( + queries[0], + "SELECT * FROM my_dataset.blocks WHERE _block_num = 99 LIMIT 1" + ); + } + + #[tokio::test] + async fn amp_subgraph_start_block_uses_amp_resolution() { + // When an Amp client + context are available and min_start_block > 0, + // the Amp path should be used and produce the correct BlockPtr. + let alloy_hash = AllocBlockHash::from([0xCDu8; 32]); + let expected_hash: BlockHash = alloy_hash.into(); + let batch = make_block_hash_batch(alloy_hash); + let client = MockAmpClient::new_ok(vec![ResponseBatch::Batch { data: batch }]); + let logger = Logger::root(slog::Discard, o!()); + + // Simulate min_start_block = 100 → query block 99 + let block_number = 100 - 1; + let result = + super::resolve_amp_start_block(&client, &logger, "eth_mainnet", "blocks", block_number) + .await; + + let ptr = result.expect("should succeed"); + assert_eq!(ptr.hash, expected_hash); + assert_eq!(ptr.number, block_number); + + let queries = client.recorded_queries(); + assert_eq!( + queries[0], + "SELECT * FROM eth_mainnet.blocks WHERE _block_num = 99 LIMIT 1" + ); + } + + #[tokio::test] + async fn amp_start_block_falls_back_to_rpc() { + // When the Amp query fails, resolve_amp_start_block returns an error. + let client = MockAmpClient::new_err("network error"); + let logger = Logger::root(slog::Discard, o!()); + + let result = + super::resolve_amp_start_block(&client, &logger, "my_dataset", "blocks", 99).await; + + assert!( + result.is_err(), + "should return an error so caller falls back to RPC" + ); + assert!( + result.unwrap_err().to_string().contains("network error"), + "error should contain the original cause" + ); + } + + #[tokio::test] + async fn amp_start_block_zero_returns_none() { + // start_block = 0 means genesis — the caller should not invoke + // resolve_amp_start_block at all. We verify the matching logic inline: + // when min_start_block == 0, the result is None. + let min_start_block: i32 = 0; + let mock_client = Arc::new(MockAmpClient::new_ok(vec![])); + let amp_client: Option> = Some(mock_client.clone()); + let amp_context: Option<(String, String)> = Some(("ds".to_string(), "blocks".to_string())); + + let result: Option = match (min_start_block, &_client, &_context) { + (0, _, _) => None, + _ => panic!("should not reach non-zero path"), + }; + + assert!( + result.is_none(), + "start_block=0 should produce None (genesis)" + ); + + // Verify the client was never called. + let queries = mock_client.recorded_queries(); + assert!( + queries.is_empty(), + "Amp client should not be queried for genesis" + ); + } + + #[tokio::test] + async fn resolve_amp_start_block_no_batches() { + // If the Amp query returns no batches at all, it should error. + let client = MockAmpClient::new_ok(vec![]); + let logger = Logger::root(slog::Discard, o!()); + + let result = super::resolve_amp_start_block(&client, &logger, "ds", "tbl", 50).await; + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("no batches")); + } + + #[tokio::test] + async fn resolve_amp_start_block_empty_batch() { + // If the Amp query returns an empty batch (0 rows), it should error. + let schema = Schema::new(vec![Field::new( + "block_hash", + DataType::FixedSizeBinary(32), + false, + )]); + let empty_batch = RecordBatch::new_empty(Arc::new(schema)); + let client = MockAmpClient::new_ok(vec![ResponseBatch::Batch { data: empty_batch }]); + let logger = Logger::root(slog::Discard, o!()); + + let result = super::resolve_amp_start_block(&client, &logger, "ds", "tbl", 50).await; + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("empty batch")); + } +} diff --git a/docs/config.md b/docs/config.md index bcf15fc2c56..8245529c26a 100644 --- a/docs/config.md +++ b/docs/config.md @@ -114,9 +114,8 @@ The configuration for a chain `name` is specified in the section - `protocol`: the protocol type being indexed, default `ethereum` (alternatively `near`, `cosmos`,`arweave`,`starknet`) - `polling_interval`: the polling interval for the block ingestor (default 500ms) -- `amp`: the network name used by AMP for this chain; defaults to the chain name. - Set this when AMP uses a different name than graph-node (e.g., `amp = "ethereum-mainnet"` - on a chain named `mainnet`). +- `amp`: a TOML table configuring Amp Flight service for this chain. See + [Amp Configuration](#amp-configuration) below for details. - `provider`: a list of providers for that chain A `provider` is an object with the following characteristics: @@ -167,11 +166,17 @@ optimisations. ingestor = "block_ingestor_node" [chains.mainnet] shard = "vip" -amp = "ethereum-mainnet" provider = [ { label = "mainnet1", url = "http://..", features = [], headers = { Authorization = "Bearer foo" } }, { label = "mainnet2", url = "http://..", features = [ "archive", "traces" ] } ] +[chains.mainnet.amp] +address = "http://amp-flight.example.com:50051" +token = "my-secret-token" +context_dataset = "ethereum" +context_table = "blocks" +network = "ethereum-mainnet" + [chains.sepolia] shard = "primary" provider = [ { label = "sepolia", url = "http://..", features = [] } ] @@ -182,6 +187,51 @@ protocol = "near" provider = [ { label = "near", details = { type = "firehose", url = "https://..", key = "", features = ["compression", "filters"] } } ] ``` +### Amp Configuration + +Amp data sources use a Flight service to query the blockchain data. +Amp is configured per-chain using a TOML table under `[chains..amp]`. +When the `amp` section is absent for a chain, Amp is disabled for that chain. + +The `[chains..amp]` table supports the following fields: + +- `address` (String, **required**): The Amp Flight service endpoint + (e.g., `"http://amp-flight.example.com:50051"`). +- `token` (String, optional): Authentication token for the Amp Flight service. +- `context_dataset` (String, **required**): The dataset in the Amp Flight + service that contains the context table. This identifies the logical + grouping (dataset) within the Flight service where the block-level context + table resides. +- `context_table` (String, **required**): The table within the context dataset + that provides block-level context (block hash, block number, and timestamp). + This should typically point to the blocks table (not transactions or logs), + since it reliably contains one row per block with the block hash. +- `network` (String, optional): The Amp network name when it differs from + the graph-node chain name (e.g., `"ethereum-mainnet"` on a chain named + `mainnet`). Defaults to the chain name if omitted. + +Example: + +```toml +[chains.mainnet.amp] +address = "http://amp-flight.example.com:50051" +token = "my-secret-token" +context_dataset = "ethereum" +context_table = "blocks" +network = "ethereum-mainnet" +``` + +#### Removed ENV/CLI flags + +The following environment variables and CLI flags have been removed in favor +of per-chain TOML configuration: + +- `GRAPH_AMP_FLIGHT_SERVICE_ADDRESS` environment variable / + `--amp-flight-service-address` CLI flag — use `address` in + `[chains..amp]` instead. +- `GRAPH_AMP_FLIGHT_SERVICE_TOKEN` environment variable — use `token` in + `[chains..amp]` instead. + ### Controlling the number of subgraphs using a provider **This feature is experimental and might be removed in a future release** diff --git a/graph/src/amp/manifest/data_source/raw.rs b/graph/src/amp/manifest/data_source/raw.rs index 3369eebc54d..57f3a0c7d08 100644 --- a/graph/src/amp/manifest/data_source/raw.rs +++ b/graph/src/amp/manifest/data_source/raw.rs @@ -8,10 +8,9 @@ use anyhow::anyhow; use arrow::{array::RecordBatch, datatypes::Schema}; use futures03::future::try_join_all; use itertools::Itertools; -use lazy_regex::regex_is_match; use semver::Version; use serde::Deserialize; -use slog::{debug, error, Logger}; +use slog::{debug, Logger}; use thiserror::Error; use super::{Abi, DataSource, Source, Table, Transformer}; @@ -22,7 +21,9 @@ use crate::{ auto_block_hash_decoder, auto_block_number_decoder, auto_block_timestamp_decoder, }, error::IsDeterministic, - sql::{BlockRangeQueryBuilder, ContextQuery, ValidQuery}, + sql::{ + normalize_sql_ident, validate_ident, BlockRangeQueryBuilder, ContextQuery, ValidQuery, + }, }, components::link_resolver::{LinkResolver, LinkResolverContext}, data::subgraph::DeploymentHash, @@ -67,6 +68,7 @@ impl RawDataSource { link_resolver: &dyn LinkResolver, amp_client: &impl amp::Client, input_schema: Option<&InputSchema>, + amp_context: Option<(String, String)>, ) -> Result { let Self { name, @@ -79,7 +81,8 @@ impl RawDataSource { let logger = logger.new(slog::o!("data_source" => name.clone())); debug!(logger, "Resolving data source"); - validate_ident(&name).map_err(|e| e.source_context("invalid `name`"))?; + validate_ident(&name) + .map_err(|e| Error::InvalidValue(e).source_context("invalid `name`"))?; Self::validate_kind(kind)?; let source = source @@ -87,7 +90,14 @@ impl RawDataSource { .map_err(|e| e.source_context("invalid `source`"))?; let transformer = transformer - .resolve(&logger, link_resolver, amp_client, input_schema, &source) + .resolve( + &logger, + link_resolver, + amp_client, + input_schema, + &source, + amp_context, + ) .await .map_err(|e| e.source_context("invalid `transformer`"))?; @@ -233,6 +243,7 @@ impl RawTransformer { amp_client: &impl amp::Client, input_schema: Option<&InputSchema>, source: &Source, + amp_context: Option<(String, String)>, ) -> Result { let Self { api_version, @@ -250,6 +261,7 @@ impl RawTransformer { tables, source, &abis, + amp_context, ) .await?; @@ -307,6 +319,7 @@ impl RawTransformer { tables: Vec, source: &Source, abis: &[Abi], + amp_context: Option<(String, String)>, ) -> Result, Error> { const MAX_TABLES: usize = 100; @@ -320,23 +333,27 @@ impl RawTransformer { ))); } - let table_futs = tables.into_iter().enumerate().map(|(i, table)| async move { - let logger = logger.new(slog::o!("table_name" => table.name.clone())); - debug!(logger, "Resolving table"; - "file" => ?&table.file - ); - - table - .resolve( - &logger, - link_resolver, - amp_client, - input_schema, - source, - abis, - ) - .await - .map_err(|e| e.source_context(format!("invalid `tables` at index {i}"))) + let table_futs = tables.into_iter().enumerate().map(|(i, table)| { + let amp_context = amp_context.clone(); + async move { + let logger = logger.new(slog::o!("table_name" => table.name.clone())); + debug!(logger, "Resolving table"; + "file" => ?&table.file + ); + + table + .resolve( + &logger, + link_resolver, + amp_client, + input_schema, + source, + abis, + amp_context, + ) + .await + .map_err(|e| e.source_context(format!("invalid `tables` at index {i}"))) + } }); try_join_all(table_futs).await @@ -365,7 +382,8 @@ impl RawAbi { ) -> Result { let Self { name, file } = self; - validate_ident(&name).map_err(|e| e.source_context("invalid `name`"))?; + validate_ident(&name) + .map_err(|e| Error::InvalidValue(e).source_context("invalid `name`"))?; let contract = Self::resolve_contract(logger, link_resolver, file).await?; Ok(Abi { name, contract }) @@ -429,10 +447,12 @@ impl RawTable { input_schema: Option<&InputSchema>, source: &Source, abis: &[Abi], + amp_context: Option<(String, String)>, ) -> Result { let Self { name, query, file } = self; - validate_ident(&name).map_err(|e| e.source_context("invalid `name`"))?; + validate_ident(&name) + .map_err(|e| Error::InvalidValue(e).source_context("invalid `name`"))?; let query = match Self::resolve_query(query, source, abis)? { Some(query) => query, None => Self::resolve_file(logger, link_resolver, file, source, abis).await?, @@ -443,7 +463,7 @@ impl RawTable { for field in schema.fields() { validate_ident(field.name()).map_err(|e| { - e.source_context(format!( + Error::InvalidValue(e).source_context(format!( "invalid query output schema: invalid column '{}'", field.name() )) @@ -454,9 +474,9 @@ impl RawTable { logger, amp_client, input_schema, - source, query, schema.clone(), + amp_context, ) .await?; @@ -551,9 +571,9 @@ impl RawTable { logger: &Logger, amp_client: &impl amp::Client, input_schema: Option<&InputSchema>, - source: &Source, query: ValidQuery, schema: Schema, + amp_context: Option<(String, String)>, ) -> Result { debug!(logger, "Resolving block range query builder"); @@ -571,79 +591,52 @@ impl RawTable { return Ok(BlockRangeQueryBuilder::new(query, block_number_column)); } - debug!(logger, "Resolving context query"); - let mut context_query: Option = None; - - // TODO: Context is embedded in the original query using INNER JOIN to ensure availability for every output row. - // This requires all source tables to match or exceed the expected query output size. - let context_sources_iter = source - .tables - .iter() - .map(|table| (source.dataset.as_str(), table.as_str())); - - for (dataset, table) in context_sources_iter { - let context_logger = logger.new(slog::o!( - "context_dataset" => dataset.to_string(), - "context_table" => table.to_string() - )); - debug!(context_logger, "Loading context schema"); - let schema_query = format!("SELECT * FROM {dataset}.{table}"); - let schema = match Self::resolve_schema(logger, amp_client, schema_query).await { - Ok(schema) => schema, - Err(e) => { - error!(context_logger, "Failed to load context schema"; - "e" => ?e - ); - continue; - } - }; - - let record_batch = RecordBatch::new_empty(schema.clone().into()); - let mut columns = Vec::new(); - - if need_block_hash_column { - let Ok((block_hash_column, _)) = auto_block_hash_decoder(&record_batch) else { - debug!( - context_logger, - "Context schema does not contain block hash column, skipping" - ); - continue; - }; - - columns.push(block_hash_column); - } - - if need_block_timestamp_column { - let Ok((block_timestamp_column, _)) = auto_block_timestamp_decoder(&record_batch) - else { - debug!( - context_logger, - "Context schema does not contain block timestamp column, skipping" - ); - continue; - }; - - columns.push(block_timestamp_column); - } - - debug!(context_logger, "Creating context query"); - context_query = Some(ContextQuery::new( - query, - block_number_column, - dataset, - table, - columns, - )); - break; + let (context_dataset, context_table) = amp_context.ok_or_else(|| { + Error::InvalidQuery(anyhow!( + "query requires context columns (block hash/timestamp) but no Amp context config is available" + )) + })?; + + debug!(logger, "Resolving context query"; + "context_dataset" => &context_dataset, + "context_table" => &context_table + ); + + let schema_query = format!("SELECT * FROM {context_dataset}.{context_table}"); + let context_schema = Self::resolve_schema(logger, amp_client, schema_query).await?; + + let context_record_batch = RecordBatch::new_empty(context_schema.into()); + let mut columns = Vec::new(); + + if need_block_hash_column { + let (block_hash_column, _) = + auto_block_hash_decoder(&context_record_batch).map_err(|_| { + Error::InvalidQuery(anyhow!( + "context table '{context_dataset}.{context_table}' does not contain a block hash column" + )) + })?; + columns.push(block_hash_column); } - if let Some(context_query) = context_query { - return Ok(BlockRangeQueryBuilder::new_with_context(context_query)); + if need_block_timestamp_column { + let (block_timestamp_column, _) = + auto_block_timestamp_decoder(&context_record_batch).map_err(|_| { + Error::InvalidQuery(anyhow!( + "context table '{context_dataset}.{context_table}' does not contain a block timestamp column" + )) + })?; + columns.push(block_timestamp_column); } - Err(Error::InvalidQuery(anyhow!( - "query is required to output block numbers, block hashes and block timestamps" - ))) + let context_query = ContextQuery::new( + query, + block_number_column, + &context_dataset, + &context_table, + columns, + ); + + Ok(BlockRangeQueryBuilder::new_with_context(context_query)) } } @@ -696,18 +689,259 @@ impl IsDeterministic for Error { } } -fn validate_ident(s: &str) -> Result<(), Error> { - if !regex_is_match!("^[a-zA-Z_][a-zA-Z0-9_-]{0,100}$", s) { - return Err(Error::InvalidValue( - anyhow!("invalid identifier '{s}': must start with a letter or an underscore, and contain only letters, numbers, hyphens, and underscores") - )); +#[cfg(test)] +mod tests { + use super::*; + use crate::amp::error::IsDeterministic; + use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; + use futures03::future::BoxFuture; + use std::collections::HashMap; + use std::sync::Mutex; + + #[derive(Debug, thiserror::Error)] + #[error("mock error: schema not found for query")] + struct MockError; + + impl IsDeterministic for MockError { + fn is_deterministic(&self) -> bool { + true + } } - Ok(()) -} -fn normalize_sql_ident(s: &str) -> String { - match validate_ident(s) { - Ok(()) => s.to_lowercase(), - Err(_e) => sqlparser_latest::ast::Ident::with_quote('"', s).to_string(), + /// A mock Amp client that returns pre-configured schemas keyed by query string. + struct MockAmpClient { + schemas: Mutex>, + } + + impl MockAmpClient { + fn new() -> Self { + Self { + schemas: Mutex::new(HashMap::new()), + } + } + + fn add_schema(&self, query: &str, schema: Schema) { + self.schemas + .lock() + .unwrap() + .insert(query.to_string(), schema); + } + } + + impl amp::Client for MockAmpClient { + type Error = MockError; + + fn schema( + &self, + _logger: &slog::Logger, + query: impl ToString, + ) -> BoxFuture<'static, Result> { + let query_str = query.to_string(); + let schema = self.schemas.lock().unwrap().get(&query_str).cloned(); + Box::pin(async move { schema.ok_or(MockError) }) + } + + fn query( + &self, + _logger: &slog::Logger, + _query: impl ToString, + _request_metadata: Option, + ) -> futures03::stream::BoxStream<'static, Result> + { + Box::pin(futures03::stream::empty()) + } + } + + fn test_logger() -> slog::Logger { + slog::Logger::root(slog::Discard, slog::o!()) + } + + fn schema_without_hash() -> Schema { + Schema::new(vec![ + Field::new("_block_num", DataType::UInt64, false), + Field::new("value", DataType::Utf8, true), + ]) + } + + fn schema_with_all_columns() -> Schema { + Schema::new(vec![ + Field::new("_block_num", DataType::UInt64, false), + Field::new("block_hash", DataType::FixedSizeBinary(32), false), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("value", DataType::Utf8, true), + ]) + } + + fn context_schema_with_hash() -> Schema { + Schema::new(vec![ + Field::new("_block_num", DataType::UInt64, false), + Field::new("block_hash", DataType::FixedSizeBinary(32), false), + ]) + } + + fn make_valid_query(sql: &str) -> ValidQuery { + ValidQuery::new( + sql, + "my_dataset", + ["my_table"].iter().copied(), + &alloy::primitives::Address::ZERO, + std::iter::empty::<(&str, &alloy::json_abi::JsonAbi)>(), + ) + .unwrap() + } + + /// When a query lacks block hash/timestamp columns, the context CTE uses + /// context_dataset and context_table from config. + #[tokio::test] + async fn context_query_uses_config() { + let logger = test_logger(); + let client = MockAmpClient::new(); + + let main_query = "SELECT _block_num, value FROM my_dataset.my_table"; + client.add_schema(main_query, schema_without_hash()); + client.add_schema( + "SELECT * FROM ctx_dataset.ctx_blocks", + context_schema_with_hash(), + ); + + let valid_query = make_valid_query(main_query); + + let result = RawTable::resolve_block_range_query_builder( + &logger, + &client, + None, + valid_query, + schema_without_hash(), + Some(("ctx_dataset".to_string(), "ctx_blocks".to_string())), + ) + .await; + + assert!( + result.is_ok(), + "Expected success when config provides context; got: {:?}", + result.err() + ); + } + + /// The old source.tables iteration is replaced — config values are the sole + /// source of context dataset and table. When no config is provided and context + /// columns are needed, resolution fails. + #[tokio::test] + async fn context_query_always_has_config() { + let logger = test_logger(); + let client = MockAmpClient::new(); + + let main_query = "SELECT _block_num, value FROM my_dataset.my_table"; + client.add_schema(main_query, schema_without_hash()); + + let valid_query = make_valid_query(main_query); + + let result = RawTable::resolve_block_range_query_builder( + &logger, + &client, + None, + valid_query, + schema_without_hash(), + None, + ) + .await; + + assert!( + result.is_err(), + "Expected error when amp_context is None and context columns are needed" + ); + let err_msg = format!("{:#}", result.unwrap_err()); + assert!( + err_msg.contains("no Amp context config"), + "Error should mention missing config; got: {err_msg}" + ); + } + + /// When the query already includes block hash and timestamp columns, + /// context config is not needed and resolution succeeds. + #[tokio::test] + async fn context_query_not_needed_when_columns_present() { + let logger = test_logger(); + let client = MockAmpClient::new(); + + let main_query = "SELECT _block_num, block_hash, timestamp, value FROM my_dataset.my_table"; + client.add_schema(main_query, schema_with_all_columns()); + + let valid_query = make_valid_query(main_query); + + let result = RawTable::resolve_block_range_query_builder( + &logger, + &client, + None, + valid_query, + schema_with_all_columns(), + None, + ) + .await; + + assert!( + result.is_ok(), + "Expected success when all columns present; got: {:?}", + result.err() + ); + } + + /// AmpChainConfig context fields are threaded through the full resolution + /// chain (RawDataSource → RawTransformer → RawTable). + #[tokio::test] + async fn context_config_threaded_through_resolution() { + let logger = test_logger(); + let client = MockAmpClient::new(); + + let main_query = "SELECT _block_num, value FROM my_dataset.my_table"; + client.add_schema(main_query, schema_without_hash()); + client.add_schema( + "SELECT * FROM ctx_dataset.ctx_blocks", + context_schema_with_hash(), + ); + + let link_resolver = crate::components::link_resolver::FileLinkResolver::default(); + + let raw_data_source = RawDataSource { + name: "test_ds".to_string(), + kind: "amp".to_string(), + network: "mainnet".to_string(), + source: RawSource { + dataset: "my_dataset".to_string(), + tables: vec!["my_table".to_string()], + address: None, + start_block: None, + end_block: None, + }, + transformer: RawTransformer { + api_version: semver::Version::new(0, 0, 1), + abis: None, + tables: vec![RawTable { + name: "TestEntity".to_string(), + query: Some(main_query.to_string()), + file: None, + }], + }, + }; + + let result = raw_data_source + .resolve( + &logger, + &link_resolver, + &client, + None, + Some(("ctx_dataset".to_string(), "ctx_blocks".to_string())), + ) + .await; + + assert!( + result.is_ok(), + "Expected successful resolution with threaded context config; got: {:?}", + result.err() + ); } } diff --git a/graph/src/amp/manifest/mod.rs b/graph/src/amp/manifest/mod.rs index 028d567332c..87f39bb1d0d 100644 --- a/graph/src/amp/manifest/mod.rs +++ b/graph/src/amp/manifest/mod.rs @@ -44,6 +44,7 @@ impl Manifest { max_spec_version: Version, deployment: DeploymentHash, raw_manifest: serde_yaml::Mapping, + amp_context: Option<(String, String)>, ) -> Result { let unresolved_manifest = UnresolvedSubgraphManifest::::parse(deployment.cheap_clone(), raw_manifest) @@ -54,6 +55,7 @@ impl Manifest { &deployment, &link_resolver, Some(amp_client), + amp_context, logger, max_spec_version, ) diff --git a/graph/src/amp/sql/mod.rs b/graph/src/amp/sql/mod.rs index 02355895afa..b6cc50036ed 100644 --- a/graph/src/amp/sql/mod.rs +++ b/graph/src/amp/sql/mod.rs @@ -1,3 +1,30 @@ pub mod query_builder; pub use self::query_builder::{BlockRangeQueryBuilder, ContextQuery, ValidQuery}; + +use anyhow::anyhow; +use lazy_regex::regex_is_match; + +/// Validates that `s` is a simple SQL identifier: starts with a letter or +/// underscore and contains only `[a-zA-Z0-9_-]`, up to 101 characters. +pub fn validate_ident(s: &str) -> Result<(), anyhow::Error> { + if !regex_is_match!("^[a-zA-Z_][a-zA-Z0-9_-]{0,100}$", s) { + return Err(anyhow!( + "invalid identifier '{s}': must start with a letter or an underscore, \ + and contain only letters, numbers, hyphens, and underscores" + )); + } + Ok(()) +} + +/// Normalizes a SQL identifier for safe interpolation into SQL `FROM` clauses. +/// +/// Simple identifiers (matching `validate_ident`) are lowercased and returned +/// unquoted. Identifiers with special characters are double-quoted using +/// `sqlparser_latest::ast::Ident::with_quote`. +pub fn normalize_sql_ident(s: &str) -> String { + match validate_ident(s) { + Ok(()) => s.to_lowercase(), + Err(_) => sqlparser_latest::ast::Ident::with_quote('"', s).to_string(), + } +} diff --git a/graph/src/components/network_provider/mod.rs b/graph/src/components/network_provider/mod.rs index 3677ed6447f..d0ba4595875 100644 --- a/graph/src/components/network_provider/mod.rs +++ b/graph/src/components/network_provider/mod.rs @@ -16,7 +16,9 @@ pub use self::provider_check::ProviderCheckStatus; pub use self::provider_manager::ProviderCheckStrategy; pub use self::provider_manager::ProviderManager; +use crate::http::Uri; use std::collections::HashMap; +use std::sync::Arc; // Used to increase memory efficiency. // Currently, there is no need to create a separate type for this. @@ -26,10 +28,83 @@ pub type ChainName = crate::data::value::Word; // Currently, there is no need to create a separate type for this. pub type ProviderName = crate::data::value::Word; -/// Maps AMP network names to internal graph-node chain names. +/// Resolved per-chain Amp configuration, with the address parsed as a `Uri`. /// -/// AMP-powered subgraphs may use different network names than graph-node -/// (e.g., AMP uses `"ethereum-mainnet"` while graph-node uses `"mainnet"`). +/// This struct is the *runtime* counterpart of the TOML-level `AmpConfig` +/// (which stores the address as a plain `String`). The `Config::amp_chain_configs()` +/// method bridges the two by parsing each address string into a `Uri`. +#[derive(Clone, Debug)] +pub struct AmpChainConfig { + pub address: Uri, + pub token: Option, + pub context_dataset: String, + pub context_table: String, + pub network: Option, +} + +impl AmpChainConfig { + /// Returns the context dataset and table as a tuple. + pub fn context(&self) -> (String, String) { + (self.context_dataset.clone(), self.context_table.clone()) + } +} + +/// Holds per-chain Amp Flight clients, keyed by chain name. +/// +/// This wrapper is used to pass per-chain Amp clients through the system +/// instead of a single global `Option>`. Use `get(chain_name)` to +/// retrieve the client for a specific chain. +pub struct AmpClients { + clients: HashMap>, +} + +impl AmpClients { + /// Creates a new `AmpClients` from a map of chain names to clients. + pub fn new(clients: HashMap>) -> Self { + Self { clients } + } + + /// Returns the Amp client for the given chain, or `None` if no client + /// is configured for that chain. + pub fn get(&self, chain_name: &ChainName) -> Option> { + self.clients.get(chain_name).cloned() + } + + /// Returns `true` if no Amp clients are configured. + pub fn is_empty(&self) -> bool { + self.clients.is_empty() + } +} + +// Manual Clone impl: only requires `Arc: Clone` (always true), not `AC: Clone`. +impl Clone for AmpClients { + fn clone(&self) -> Self { + Self { + clients: self.clients.clone(), + } + } +} + +impl std::fmt::Debug for AmpClients { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AmpClients") + .field("chains", &self.clients.keys().collect::>()) + .finish() + } +} + +impl Default for AmpClients { + fn default() -> Self { + Self { + clients: HashMap::new(), + } + } +} + +/// Maps Amp network names to internal graph-node chain names. +/// +/// Amp-powered subgraphs may use different network names than graph-node +/// (e.g., Amp uses `"ethereum-mainnet"` while graph-node uses `"mainnet"`). /// This type provides a config-driven translation layer. #[derive(Clone, Debug, Default)] pub struct AmpChainNames(HashMap); @@ -39,7 +114,7 @@ impl AmpChainNames { AmpChainNames(mapping) } - /// Returns the internal chain name for an AMP alias, or the input + /// Returns the internal chain name for an Amp alias, or the input /// unchanged if no alias matches. pub fn resolve(&self, name: &ChainName) -> ChainName { self.0.get(name).cloned().unwrap_or_else(|| name.clone()) @@ -50,6 +125,21 @@ impl AmpChainNames { mod tests { use super::*; + #[test] + fn amp_chain_config_context_returns_tuple() { + let cfg = AmpChainConfig { + address: "http://localhost:50051".parse().unwrap(), + token: None, + context_dataset: "eth_mainnet".to_string(), + context_table: "blocks".to_string(), + network: None, + }; + assert_eq!( + cfg.context(), + ("eth_mainnet".to_string(), "blocks".to_string()) + ); + } + #[test] fn amp_chain_names_resolve_known_alias() { let mut map = HashMap::new(); @@ -72,4 +162,65 @@ mod tests { ChainName::from("mainnet") ); } + + #[test] + fn amp_clients_returns_client_for_configured_chain() { + let mut map = HashMap::new(); + map.insert(ChainName::from("mainnet"), Arc::new(42u32)); + let clients = AmpClients::new(map); + let client = clients.get(&ChainName::from("mainnet")); + assert!(client.is_some()); + assert_eq!(*client.unwrap(), 42); + } + + #[test] + fn amp_clients_returns_none_for_unconfigured_chain() { + let map: HashMap> = HashMap::new(); + let clients = AmpClients::new(map); + assert!(clients.get(&ChainName::from("mainnet")).is_none()); + } + + /// Verifies the condition that causes Amp manager registration: + /// `!amp_clients.is_empty()` is true when at least one chain has config. + #[test] + fn amp_manager_registered_when_chain_has_config() { + let mut map = HashMap::new(); + map.insert(ChainName::from("mainnet"), Arc::new(42u32)); + let clients = AmpClients::new(map); + assert!( + !clients.is_empty(), + "Amp manager should be registered when at least one chain has config" + ); + } + + /// Verifies the condition that skips Amp manager registration: + /// `amp_clients.is_empty()` is true when no chains have config. + #[test] + fn amp_manager_not_registered_without_config() { + let clients: AmpClients = AmpClients::new(HashMap::new()); + assert!( + clients.is_empty(), + "Amp manager should not be registered when no chains have config" + ); + } + + /// Simulates the error path in downstream consumers: when a subgraph + /// references a chain with no Amp client, the consumer should treat + /// `get()` returning `None` as an error. + #[test] + fn amp_clients_error_for_unconfigured_amp_chain() { + let mut map = HashMap::new(); + map.insert(ChainName::from("mainnet"), Arc::new(1u32)); + let clients = AmpClients::new(map); + + // "matic" is not configured. + let result = clients + .get(&ChainName::from("matic")) + .ok_or_else(|| "Amp is not configured for chain 'matic'".to_string()); + assert!(result.is_err()); + assert_eq!( + result.unwrap_err(), + "Amp is not configured for chain 'matic'" + ); + } } diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index 46154155319..d0ca5e65a93 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -14,7 +14,11 @@ pub mod status; pub use features::{SubgraphFeature, SubgraphFeatureValidationError}; -use crate::{cheap_clone::CheapClone, components::store::BLOCK_NUMBER_MAX, object}; +use crate::{ + cheap_clone::CheapClone, + components::{network_provider::ChainName, store::BLOCK_NUMBER_MAX}, + object, +}; use anyhow::{anyhow, Context, Error}; use futures03::future::try_join_all; use itertools::Itertools; @@ -822,6 +826,7 @@ impl UnvalidatedSubgraphManifest { raw: serde_yaml::Mapping, resolver: &Arc, amp_client: Option>, + amp_context: Option<(String, String)>, logger: &Logger, max_spec_version: semver::Version, ) -> Result { @@ -831,6 +836,7 @@ impl UnvalidatedSubgraphManifest { raw, resolver, amp_client, + amp_context, logger, max_spec_version, ) @@ -971,16 +977,30 @@ impl SubgraphManifest { raw: serde_yaml::Mapping, resolver: &Arc, amp_client: Option>, + amp_context: Option<(String, String)>, logger: &Logger, max_spec_version: semver::Version, ) -> Result { let unresolved = UnresolvedSubgraphManifest::parse(id.cheap_clone(), raw)?; let resolved = unresolved - .resolve(&id, resolver, amp_client, logger, max_spec_version) + .resolve( + &id, + resolver, + amp_client, + amp_context, + logger, + max_spec_version, + ) .await?; Ok(resolved) } + pub fn is_amp_subgraph(&self) -> bool { + self.data_sources + .iter() + .all(|ds| matches!(ds, DataSource::Amp(_))) + } + pub fn network_name(&self) -> String { // Assume the manifest has been validated, ensuring network names are homogenous self.data_sources @@ -1114,6 +1134,7 @@ impl UnresolvedSubgraphManifest { deployment_hash: &DeploymentHash, resolver: &Arc, amp_client: Option>, + amp_context: Option<(String, String)>, logger: &Logger, max_spec_version: semver::Version, ) -> Result, SubgraphManifestResolveError> { @@ -1166,10 +1187,12 @@ impl UnresolvedSubgraphManifest { }; let data_sources = try_join_all(data_sources.into_iter().enumerate().map(|(idx, ds)| { + let amp_context = amp_context.clone(); ds.resolve( deployment_hash, resolver, amp_client.cheap_clone(), + amp_context, logger, idx as u32, &spec_version, @@ -1370,6 +1393,21 @@ fn display_vector(input: &[impl std::fmt::Display]) -> impl std::fmt::Display { format!("[{}]", formatted_errors) } +/// Extracts the network name from the first data source in a raw manifest YAML mapping. +/// +/// Navigates `dataSources[0].network` and returns the network name as an owned string, +/// or `None` if any step in the path is missing. +pub fn network_name_from_raw_manifest(raw: &serde_yaml::Mapping) -> Option { + use serde_yaml::Value; + raw.get(Value::String("dataSources".to_owned())) + .and_then(|ds| ds.as_sequence()) + .and_then(|ds| ds.first()) + .and_then(|ds| ds.as_mapping()) + .and_then(|ds| ds.get(Value::String("network".to_owned()))) + .and_then(|n| n.as_str()) + .map(|s| s.into()) +} + #[test] fn test_subgraph_name_validation() { assert!(SubgraphName::new("a").is_ok()); @@ -1416,3 +1454,47 @@ fn test_display_vector() { format!("{}", manifest_validation_error) ) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn network_name_from_raw_manifest_extracts_network() { + use serde_yaml::{Mapping, Value}; + + let mut ds = Mapping::new(); + ds.insert( + Value::String("network".to_owned()), + Value::String("mainnet".to_owned()), + ); + + let mut raw = Mapping::new(); + raw.insert( + Value::String("dataSources".to_owned()), + Value::Sequence(vec![Value::Mapping(ds)]), + ); + + assert_eq!( + network_name_from_raw_manifest(&raw), + Some(ChainName::from("mainnet")) + ); + } + + #[test] + fn network_name_from_raw_manifest_returns_none_when_missing() { + use serde_yaml::{Mapping, Value}; + + // Empty mapping — no dataSources key at all + let empty = Mapping::new(); + assert_eq!(network_name_from_raw_manifest(&empty), None); + + // dataSources is an empty sequence + let mut raw = Mapping::new(); + raw.insert( + Value::String("dataSources".to_owned()), + Value::Sequence(vec![]), + ); + assert_eq!(network_name_from_raw_manifest(&raw), None); + } +} diff --git a/graph/src/data_source/mod.rs b/graph/src/data_source/mod.rs index cd34ca62857..f488e882313 100644 --- a/graph/src/data_source/mod.rs +++ b/graph/src/data_source/mod.rs @@ -360,6 +360,7 @@ impl UnresolvedDataSource { deployment_hash: &DeploymentHash, resolver: &Arc, amp_client: Option>, + amp_context: Option<(String, String)>, logger: &Logger, manifest_idx: u32, spec_version: &semver::Version, @@ -395,7 +396,13 @@ impl UnresolvedDataSource { } Self::Amp(raw_data_source) => match amp_client { Some(amp_client) => raw_data_source - .resolve(logger, resolver.as_ref(), amp_client.as_ref(), input_schema) + .resolve( + logger, + resolver.as_ref(), + amp_client.as_ref(), + input_schema, + amp_context, + ) .await .map(DataSource::Amp) .map_err(Error::from), diff --git a/graph/src/data_source/subgraph.rs b/graph/src/data_source/subgraph.rs index 0207aee4df3..38766d7d977 100644 --- a/graph/src/data_source/subgraph.rs +++ b/graph/src/data_source/subgraph.rs @@ -331,6 +331,7 @@ impl UnresolvedDataSource { &deployment_hash, &resolver, amp_client, + None, logger, LATEST_VERSION.clone(), ) @@ -383,6 +384,7 @@ impl UnresolvedDataSource { &manifest.id, resolver, amp_client.cheap_clone(), + None, logger, LATEST_VERSION.clone(), ) diff --git a/graph/src/env/amp.rs b/graph/src/env/amp.rs index a6a02b194c3..e8cdd59ba7c 100644 --- a/graph/src/env/amp.rs +++ b/graph/src/env/amp.rs @@ -24,11 +24,6 @@ pub struct AmpEnv { /// /// Defaults to `600` seconds. pub query_retry_max_delay: Duration, - - /// Token used to authenticate Amp Flight gRPC service requests. - /// - /// Defaults to `None`. - pub flight_service_token: Option, } impl AmpEnv { @@ -65,12 +60,31 @@ impl AmpEnv { .amp_query_retry_max_delay_seconds .map(Duration::from_secs) .unwrap_or(Self::DEFAULT_QUERY_RETRY_MAX_DELAY), - flight_service_token: raw_env.amp_flight_service_token.as_ref().and_then(|value| { - if value.is_empty() { - return None; - } - Some(value.to_string()) - }), } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::env::ENV_VARS; + + #[test] + fn amp_env_constructs_without_flight_fields() { + // Verify that AmpEnv constructs correctly with only its remaining fields + // (the Flight service token field has been removed). The ENV_VARS static + // is constructed at process start; if AmpEnv still had that field, this + // access would fail to compile. + let amp = &ENV_VARS.amp; + assert_eq!(amp.max_buffer_size, AmpEnv::DEFAULT_MAX_BUFFER_SIZE); + assert_eq!(amp.max_block_range, AmpEnv::DEFAULT_MAX_BLOCK_RANGE); + assert_eq!( + amp.query_retry_min_delay, + AmpEnv::DEFAULT_QUERY_RETRY_MIN_DELAY + ); + assert_eq!( + amp.query_retry_max_delay, + AmpEnv::DEFAULT_QUERY_RETRY_MAX_DELAY + ); + } +} diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index 23a6eaff579..cc7425764f6 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -616,8 +616,6 @@ struct Inner { amp_query_retry_min_delay_seconds: Option, #[envconfig(from = "GRAPH_AMP_QUERY_RETRY_MAX_DELAY_SECONDS")] amp_query_retry_max_delay_seconds: Option, - #[envconfig(from = "GRAPH_AMP_FLIGHT_SERVICE_TOKEN")] - amp_flight_service_token: Option, } #[derive(Clone, Debug)] diff --git a/node/resources/tests/full_config.toml b/node/resources/tests/full_config.toml index 88ba990c1e6..9c5c5bdb4d4 100644 --- a/node/resources/tests/full_config.toml +++ b/node/resources/tests/full_config.toml @@ -45,7 +45,6 @@ ingestor = "index_0" [chains.mainnet] shard = "primary" -amp = "ethereum-mainnet" provider = [ { label = "mainnet-0", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }, { label = "mainnet-1", details = { type = "web3call", url = "http://rpc.mainnet.io", features = ["archive", "traces"] }}, @@ -53,6 +52,13 @@ provider = [ { label = "firehose", details = { type = "firehose", url = "http://localhost:9000", features = [] }}, ] +[chains.mainnet.amp] +address = "http://localhost:50051" +token = "secret-token" +context_dataset = "eth" +context_table = "blocks" +network = "ethereum-mainnet" + [chains.ropsten] shard = "primary" provider = [ diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 792df8853c9..731e234b6a9 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -106,14 +106,6 @@ pub struct Opt { #[clap(long, help = "version label, used for prometheus metrics")] pub version_label: Option, - #[clap( - long, - value_name = "{HOST:PORT|URL}", - env = "GRAPH_AMP_FLIGHT_SERVICE_ADDRESS", - help = "The address of the Amp Flight gRPC service" - )] - pub amp_flight_service_address: Option, - #[clap(subcommand)] pub cmd: Command, } @@ -1348,7 +1340,6 @@ async fn main() -> anyhow::Result<()> { network_name, ipfs_url, arweave_url, - opt.amp_flight_service_address.clone(), config, metrics_ctx, node_id, diff --git a/node/src/config.rs b/node/src/config.rs index 69c174e5a76..285ffff92a3 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -1,7 +1,8 @@ use graph::{ + amp::sql::normalize_sql_ident, anyhow::Error, blockchain::BlockchainKind, - components::network_provider::{AmpChainNames, ChainName}, + components::network_provider::{AmpChainConfig, AmpChainNames, ChainName}, env::ENV_VARS, firehose::{SubgraphLimit, SUBGRAPHS_PER_CONN}, itertools::Itertools, @@ -121,8 +122,12 @@ impl Config { .chains .iter() .map(|(chain_name, chain)| { - let amp_name: ChainName = - chain.amp.as_deref().unwrap_or(chain_name.as_str()).into(); + let amp_name: ChainName = chain + .amp + .as_ref() + .and_then(|c| c.network.as_deref()) + .unwrap_or(chain_name.as_str()) + .into(); let internal_name: ChainName = chain_name.as_str().into(); (amp_name, internal_name) }) @@ -130,6 +135,37 @@ impl Config { AmpChainNames::new(mapping) } + /// Build a map from chain name to [`AmpChainConfig`] for every chain + /// that has an `[amp]` section. The `AmpConfig.address` string is parsed + /// into a `Uri`; this is expected to always succeed because + /// `ChainSection::validate` already rejects invalid URIs. + pub fn amp_chain_configs(&self) -> Result> { + let mut map = HashMap::new(); + for (chain_name, chain) in &self.chains.chains { + if let Some(amp) = &chain.amp { + let uri = amp.address.parse::().map_err(|e| { + anyhow!( + "invalid Amp address URI '{}' for chain '{}': {}", + amp.address, + chain_name, + e + ) + })?; + map.insert( + chain_name.as_str().into(), + AmpChainConfig { + address: uri, + token: amp.token.clone(), + context_dataset: normalize_sql_ident(&.context_dataset), + context_table: normalize_sql_ident(&.context_table), + network: amp.network.clone(), + }, + ); + } + } + Ok(map) + } + /// Check that the config is valid. fn validate(&mut self) -> Result<()> { if !self.stores.contains_key(PRIMARY_SHARD.as_str()) { @@ -449,29 +485,52 @@ impl ChainSection { chain.validate()? } - // Validate that effective AMP names are unique and don't collide + // Validate Amp address URIs. + for (chain_name, chain) in &self.chains { + if let Some(amp_config) = &chain.amp { + amp_config.address.parse::().map_err(|e| { + anyhow!( + "invalid Amp address URI '{}' for chain '{}': {}", + amp_config.address, + chain_name, + e + ) + })?; + } + } + + // Validate that effective Amp names are unique and don't collide // with other chain names. let mut amp_names: BTreeMap = BTreeMap::new(); for (chain_name, chain) in &self.chains { - let effective = chain.amp.as_deref().unwrap_or(chain_name.as_str()); + let effective = chain + .amp + .as_ref() + .and_then(|c| c.network.as_deref()) + .unwrap_or(chain_name.as_str()); if let Some(prev_chain) = amp_names.get(effective) { return Err(anyhow!( - "duplicate AMP name `{}`: used by chains `{}` and `{}`", + "duplicate Amp name `{}`: used by chains `{}` and `{}`", effective, prev_chain, chain_name )); } - // Check that an explicit amp alias doesn't collide with + // Check that an explicit amp network alias doesn't collide with // another chain's own name (which would be ambiguous). - if chain.amp.is_some() { + if chain + .amp + .as_ref() + .and_then(|c| c.network.as_deref()) + .is_some() + { if let Some(other) = self.chains.get(effective) { // Only a collision if the other chain doesn't also // set the same amp alias (which is covered by the // duplicate check above). - if other.amp.as_deref() != Some(effective) { + if other.amp.as_ref().and_then(|c| c.network.as_deref()) != Some(effective) { return Err(anyhow!( - "AMP alias `{}` on chain `{}` collides with chain `{}`", + "Amp alias `{}` on chain `{}` collides with chain `{}`", effective, chain_name, effective, @@ -584,6 +643,32 @@ impl ChainSection { } } +/// Per-chain Amp Flight service configuration. +/// +/// Parsed from a `[chains..amp]` TOML table. When the `amp` key is +/// absent, Amp is disabled for that chain. +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct AmpConfig { + /// Amp Flight service address (e.g. `http://localhost:50051`). + pub address: String, + /// Optional authentication token for the Amp Flight service. + pub token: Option, + /// The dataset in the Amp Flight service that contains the context table. + /// + /// This identifies the logical grouping (dataset) within the Flight + /// service where the block-level context table resides. + pub context_dataset: String, + /// The table providing block-level context: block hash, block number, + /// and timestamp. + /// + /// This should typically point to the blocks table (not transactions or + /// logs), since it reliably contains one row per block with the block hash. + pub context_table: String, + /// Optional Amp network name, used when the Amp network name differs + /// from the graph-node chain name. + pub network: Option, +} + #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] pub struct Chain { pub shard: String, @@ -596,10 +681,11 @@ pub struct Chain { pub polling_interval: Duration, #[serde(rename = "provider")] pub providers: Vec, - /// AMP network name alias. When set, AMP manifests using this name will - /// resolve to this chain. Defaults to the chain name. + /// Amp configuration table. When present, Amp is enabled for this chain + /// using the specified Flight service address, context dataset/table, + /// and optional auth token and network name override. #[serde(default)] - pub amp: Option, + pub amp: Option, } fn default_blockchain_kind() -> BlockchainKind { @@ -1289,7 +1375,8 @@ mod tests { use crate::config::{default_polling_interval, ChainSection, Web3Rule}; use super::{ - Chain, Config, FirehoseProvider, Provider, ProviderDetails, Shard, Transport, Web3Provider, + AmpConfig, Chain, ChainName, Config, FirehoseProvider, Provider, ProviderDetails, Shard, + Transport, Web3Provider, }; use graph::blockchain::BlockchainKind; use graph::firehose::SubgraphLimit; @@ -1962,18 +2049,117 @@ fdw_pool_size = [ assert_eq!(shard.fdw_pool_size.size_for(&other, "ashard").unwrap(), 5); } + #[test] + fn amp_config_toml_parses_full_table() { + let actual: Chain = toml::from_str( + r#" + shard = "primary" + provider = [] + [amp] + address = "http://localhost:50051" + token = "my-secret-token" + context_dataset = "eth" + context_table = "blocks" + network = "ethereum-mainnet" + "#, + ) + .unwrap(); + + assert_eq!( + actual.amp, + Some(AmpConfig { + address: "http://localhost:50051".to_string(), + token: Some("my-secret-token".to_string()), + context_dataset: "eth".to_string(), + context_table: "blocks".to_string(), + network: Some("ethereum-mainnet".to_string()), + }) + ); + } + + #[test] + fn amp_config_toml_parses_minimal_table() { + let actual: Chain = toml::from_str( + r#" + shard = "primary" + provider = [] + [amp] + address = "http://localhost:50051" + context_dataset = "eth" + context_table = "blocks" + "#, + ) + .unwrap(); + + assert_eq!( + actual.amp, + Some(AmpConfig { + address: "http://localhost:50051".to_string(), + token: None, + context_dataset: "eth".to_string(), + context_table: "blocks".to_string(), + network: None, + }) + ); + } + + #[test] + fn amp_config_toml_parses_without_amp() { + let actual: Chain = toml::from_str( + r#" + shard = "primary" + provider = [] + "#, + ) + .unwrap(); + + assert_eq!(actual.amp, None); + } + + #[test] + fn amp_config_toml_rejects_missing_required_field() { + // The `address` field is required; omitting it should cause a deserialization error. + let result = toml::from_str::( + r#" + shard = "primary" + provider = [] + [amp] + context_dataset = "eth" + context_table = "blocks" + "#, + ); + + assert!( + result.is_err(), + "expected deserialization error when address is missing" + ); + } + #[test] fn amp_chain_names_parsed_from_toml() { let actual: Chain = toml::from_str( r#" shard = "primary" provider = [] - amp = "ethereum-mainnet" + [amp] + address = "http://localhost:50051" + context_dataset = "eth" + context_table = "blocks" + network = "ethereum-mainnet" "#, ) .unwrap(); - assert_eq!(actual.amp, Some("ethereum-mainnet".to_string())); + assert_eq!( + actual.amp, + Some(AmpConfig { + address: "http://localhost:50051".to_string(), + token: None, + context_dataset: "eth".to_string(), + context_table: "blocks".to_string(), + network: Some("ethereum-mainnet".to_string()), + }) + ); } #[test] @@ -1997,11 +2183,19 @@ fdw_pool_size = [ [mainnet] shard = "primary" provider = [] - amp = "eth" + [mainnet.amp] + address = "http://localhost:50051" + context_dataset = "eth" + context_table = "blocks" + network = "eth" [sepolia] shard = "primary" provider = [] - amp = "eth" + [sepolia.amp] + address = "http://localhost:50052" + context_dataset = "eth" + context_table = "blocks" + network = "eth" "#, ) .unwrap(); @@ -2009,8 +2203,8 @@ fdw_pool_size = [ let err = section.validate(); assert!(err.is_err()); assert!( - err.unwrap_err().to_string().contains("duplicate AMP name"), - "expected duplicate AMP name error" + err.unwrap_err().to_string().contains("duplicate Amp name"), + "expected duplicate Amp name error" ); } @@ -2025,7 +2219,11 @@ fdw_pool_size = [ [sepolia] shard = "primary" provider = [] - amp = "mainnet" + [sepolia.amp] + address = "http://localhost:50051" + context_dataset = "eth" + context_table = "blocks" + network = "mainnet" "#, ) .unwrap(); @@ -2034,9 +2232,9 @@ fdw_pool_size = [ assert!(err.is_err()); let msg = err.unwrap_err().to_string(); // The alias "mainnet" on sepolia collides with the chain named - // "mainnet" whose effective AMP name is also "mainnet". + // "mainnet" whose effective Amp name is also "mainnet". assert!( - msg.contains("duplicate AMP name") || msg.contains("collides with chain"), + msg.contains("duplicate Amp name") || msg.contains("collides with chain"), "expected collision/duplicate error, got: {msg}" ); } @@ -2049,7 +2247,11 @@ fdw_pool_size = [ [mainnet] shard = "primary" provider = [] - amp = "ethereum-mainnet" + [mainnet.amp] + address = "http://localhost:50051" + context_dataset = "eth" + context_table = "blocks" + network = "ethereum-mainnet" [sepolia] shard = "primary" provider = [] @@ -2090,4 +2292,327 @@ fdw_pool_size = [ graph::components::network_provider::ChainName::from("unknown") ); } + + #[test] + fn amp_config_validation_rejects_invalid_address() { + let mut section = ChainSection { + ingestor: "default".to_string(), + chains: { + let mut chains = std::collections::BTreeMap::new(); + chains.insert( + "mainnet".to_string(), + Chain { + shard: "primary".to_string(), + protocol: BlockchainKind::Ethereum, + polling_interval: default_polling_interval(), + providers: vec![], + amp: Some(AmpConfig { + address: "not a valid uri!@#".to_string(), + token: None, + context_dataset: "eth".to_string(), + context_table: "blocks".to_string(), + network: None, + }), + }, + ); + chains + }, + }; + + let err = section.validate(); + assert!(err.is_err(), "expected validation error for invalid URI"); + let msg = err.unwrap_err().to_string(); + assert!( + msg.contains("invalid Amp address URI"), + "expected 'invalid Amp address URI' in error, got: {msg}" + ); + assert!( + msg.contains("mainnet"), + "expected chain name 'mainnet' in error, got: {msg}" + ); + } + + #[test] + fn amp_config_validation_accepts_valid_address() { + let mut section = ChainSection { + ingestor: "default".to_string(), + chains: { + let mut chains = std::collections::BTreeMap::new(); + chains.insert( + "mainnet".to_string(), + Chain { + shard: "primary".to_string(), + protocol: BlockchainKind::Ethereum, + polling_interval: default_polling_interval(), + providers: vec![], + amp: Some(AmpConfig { + address: "http://localhost:50051".to_string(), + token: None, + context_dataset: "eth".to_string(), + context_table: "blocks".to_string(), + network: None, + }), + }, + ); + chains + }, + }; + + section + .validate() + .expect("validation should pass for a valid URI"); + } + + #[test] + fn amp_chain_config_constructable() { + use graph::components::network_provider::AmpChainConfig; + use graph::http::Uri; + + let uri: Uri = "http://localhost:50051".parse().unwrap(); + let cfg = AmpChainConfig { + address: uri, + token: Some("secret".to_string()), + context_dataset: "eth".to_string(), + context_table: "blocks".to_string(), + network: Some("ethereum-mainnet".to_string()), + }; + + assert_eq!(cfg.address.to_string(), "http://localhost:50051/"); + assert_eq!(cfg.token.as_deref(), Some("secret")); + assert_eq!(cfg.context_dataset, "eth"); + assert_eq!(cfg.context_table, "blocks"); + assert_eq!(cfg.network.as_deref(), Some("ethereum-mainnet")); + } + + #[test] + fn amp_chain_configs_from_mixed() { + let section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + provider = [] + [mainnet.amp] + address = "http://localhost:50051" + token = "my-token" + context_dataset = "eth" + context_table = "blocks" + network = "ethereum-mainnet" + [sepolia] + shard = "primary" + provider = [] + "#, + ) + .unwrap(); + + let config = Config { + node: NodeId::new("test").unwrap(), + general: None, + stores: { + let mut s = std::collections::BTreeMap::new(); + s.insert( + "primary".to_string(), + toml::from_str::(r#"connection = "postgresql://u:p@h/db""#).unwrap(), + ); + s + }, + chains: section, + deployment: toml::from_str("[[rule]]\nshards = [\"primary\"]\nindexers = [\"test\"]") + .unwrap(), + }; + + let map = config.amp_chain_configs().unwrap(); + + // Only mainnet (with amp) should be in the map + assert_eq!(map.len(), 1); + assert!(!map.contains_key(&ChainName::from("sepolia"))); + + let mainnet = map + .get(&ChainName::from("mainnet")) + .expect("mainnet should be in map"); + assert_eq!(mainnet.address.to_string(), "http://localhost:50051/"); + assert_eq!(mainnet.token.as_deref(), Some("my-token")); + assert_eq!(mainnet.context_dataset, "eth"); + assert_eq!(mainnet.context_table, "blocks"); + assert_eq!(mainnet.network.as_deref(), Some("ethereum-mainnet")); + } + + #[test] + fn amp_chain_config_invalid_address_returns_error() { + // Build a Config with an invalid address that bypasses validation + // (constructed directly, not via from_str which calls validate). + let section = ChainSection { + ingestor: "default".to_string(), + chains: { + let mut chains = std::collections::BTreeMap::new(); + chains.insert( + "mainnet".to_string(), + Chain { + shard: "primary".to_string(), + protocol: BlockchainKind::Ethereum, + polling_interval: default_polling_interval(), + providers: vec![], + amp: Some(AmpConfig { + address: "not a valid uri!@#".to_string(), + token: None, + context_dataset: "eth".to_string(), + context_table: "blocks".to_string(), + network: None, + }), + }, + ); + chains + }, + }; + + let config = Config { + node: NodeId::new("test").unwrap(), + general: None, + stores: { + let mut s = std::collections::BTreeMap::new(); + s.insert( + "primary".to_string(), + toml::from_str::(r#"connection = "postgresql://u:p@h/db""#).unwrap(), + ); + s + }, + chains: section, + deployment: toml::from_str("[[rule]]\nshards = [\"primary\"]\nindexers = [\"test\"]") + .unwrap(), + }; + + let result = config.amp_chain_configs(); + assert!(result.is_err(), "expected error for invalid URI"); + let msg = result.unwrap_err().to_string(); + assert!( + msg.contains("invalid Amp address URI"), + "expected 'invalid Amp address URI' in error, got: {msg}" + ); + assert!( + msg.contains("mainnet"), + "expected chain name in error, got: {msg}" + ); + } + + #[test] + fn parse_full_config() { + let content = read_resource_as_string("full_config.toml"); + let actual: Config = toml::from_str(&content).unwrap(); + + let mainnet = actual + .chains + .chains + .get("mainnet") + .expect("mainnet chain should exist"); + assert!( + mainnet.amp.is_some(), + "mainnet should have a non-None amp field" + ); + } + + #[test] + fn parse_full_config_amp_values() { + let content = read_resource_as_string("full_config.toml"); + let actual: Config = toml::from_str(&content).unwrap(); + + let mainnet = actual + .chains + .chains + .get("mainnet") + .expect("mainnet chain should exist"); + let amp = mainnet + .amp + .as_ref() + .expect("mainnet should have amp config"); + + assert_eq!(amp.address, "http://localhost:50051"); + assert_eq!(amp.token, Some("secret-token".to_string())); + assert_eq!(amp.context_dataset, "eth"); + assert_eq!(amp.context_table, "blocks"); + assert_eq!(amp.network, Some("ethereum-mainnet".to_string())); + } + + #[test] + fn amp_chain_config_normalizes_context_idents() { + let section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + provider = [] + [mainnet.amp] + address = "http://localhost:50051" + context_dataset = "ns/data@v1" + context_table = "my/table@2" + "#, + ) + .unwrap(); + + let config = Config { + node: NodeId::new("test").unwrap(), + general: None, + stores: { + let mut s = std::collections::BTreeMap::new(); + s.insert( + "primary".to_string(), + toml::from_str::(r#"connection = "postgresql://u:p@h/db""#).unwrap(), + ); + s + }, + chains: section, + deployment: toml::from_str("[[rule]]\nshards = [\"primary\"]\nindexers = [\"test\"]") + .unwrap(), + }; + + let map = config.amp_chain_configs().unwrap(); + let mainnet = map + .get(&ChainName::from("mainnet")) + .expect("mainnet should be in map"); + + // Identifiers with special characters should be double-quoted + assert_eq!(mainnet.context_dataset, "\"ns/data@v1\""); + assert_eq!(mainnet.context_table, "\"my/table@2\""); + } + + #[test] + fn amp_chain_config_lowercases_simple_context_idents() { + let section = toml::from_str::( + r#" + ingestor = "block_ingestor_node" + [mainnet] + shard = "primary" + provider = [] + [mainnet.amp] + address = "http://localhost:50051" + context_dataset = "Eth" + context_table = "Blocks" + "#, + ) + .unwrap(); + + let config = Config { + node: NodeId::new("test").unwrap(), + general: None, + stores: { + let mut s = std::collections::BTreeMap::new(); + s.insert( + "primary".to_string(), + toml::from_str::(r#"connection = "postgresql://u:p@h/db""#).unwrap(), + ); + s + }, + chains: section, + deployment: toml::from_str("[[rule]]\nshards = [\"primary\"]\nindexers = [\"test\"]") + .unwrap(), + }; + + let map = config.amp_chain_configs().unwrap(); + let mainnet = map + .get(&ChainName::from("mainnet")) + .expect("mainnet should be in map"); + + // Simple identifiers should be lowercased and unquoted + assert_eq!(mainnet.context_dataset, "eth"); + assert_eq!(mainnet.context_table, "blocks"); + } } diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 06f9b8cf652..018946a68f0 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, io::{BufRead, BufReader}, path::Path, time::Duration, @@ -6,8 +7,11 @@ use std::{ use anyhow::Result; use git_testament::{git_testament, render_testament}; -use graph::components::link_resolver::{ArweaveClient, FileSizeLimit}; use graph::components::subgraph::Settings; +use graph::components::{ + link_resolver::{ArweaveClient, FileSizeLimit}, + network_provider::ChainName, +}; use graph::data::graphql::load_manager::LoadManager; use graph::endpoint::EndpointMetrics; use graph::env::EnvVars; @@ -18,7 +22,7 @@ use graph::url::Url; use graph::{ amp, blockchain::{Blockchain, BlockchainKind, BlockchainMap}, - components::network_provider::AmpChainNames, + components::network_provider::{AmpChainNames, AmpClients}, }; use graph_core::polling_monitor::{arweave_service, ArweaveService, IpfsService}; use graph_graphql::prelude::GraphQlRunner; @@ -275,7 +279,8 @@ fn build_subgraph_registrar( subscription_manager: Arc, arweave_service: ArweaveService, ipfs_service: IpfsService, - amp_client: Option>, + amp_clients: AmpClients, + amp_chain_configs: HashMap, cancel_token: CancellationToken, amp_chain_names: Arc, ) -> Arc< @@ -295,7 +300,7 @@ where let mut subgraph_instance_managers = graph_core::subgraph_provider::SubgraphInstanceManagers::new(); - if let Some(amp_client) = amp_client.cheap_clone() { + if !amp_clients.is_empty() { let amp_instance_manager = graph_core::amp_subgraph::Manager::new( logger_factory, metrics_registry.cheap_clone(), @@ -303,7 +308,8 @@ where &cancel_token, network_store.subgraph_store(), link_resolver.cheap_clone(), - amp_client, + amp_clients.clone(), + amp_chain_configs.clone(), ); subgraph_instance_managers.add( @@ -322,7 +328,7 @@ where link_resolver.clone(), ipfs_service, arweave_service, - amp_client.cheap_clone(), + amp_clients.clone(), static_filters, ); @@ -351,7 +357,8 @@ where Arc::new(subgraph_provider), network_store.subgraph_store(), subscription_manager, - amp_client, + amp_clients, + amp_chain_configs, blockchain_map, node_id.clone(), version_switching_mode, @@ -505,34 +512,54 @@ pub async fn run( &logger_factory, ); - let amp_client = match opt.amp_flight_service_address.as_deref() { - Some(amp_flight_service_address) => { - let addr: graph::http::Uri = amp_flight_service_address - .parse() - .expect("Invalid Amp Flight service address"); + let amp_chain_configs = config + .amp_chain_configs() + .expect("Failed to load Amp chain configs"); - debug!(logger, "Connecting to Amp Flight service"; - "host" => ?addr.host(), - "port" => ?addr.port() + let amp_clients = { + if amp_chain_configs.is_empty() { + info!( + logger, + "Amp support disabled — no chains have [amp] configuration" ); - - let mut amp_client = amp::FlightClient::new(addr.clone()) - .await - .expect("Failed to connect to Amp Flight service"); - - if let Some(auth_token) = &env_vars.amp.flight_service_token { - amp_client.set_auth_token(auth_token); + AmpClients::new(std::collections::HashMap::new()) + } else { + let mut clients = std::collections::HashMap::new(); + for (chain_name, amp_chain_config) in &_chain_configs { + debug!(logger, "Connecting to Amp Flight service"; + "chain" => chain_name.as_str(), + "host" => ?amp_chain_config.address.host(), + "port" => ?amp_chain_config.address.port() + ); + match amp::FlightClient::new(amp_chain_config.address.clone()).await { + Ok(mut client) => { + if let Some(token) = &_chain_config.token { + client.set_auth_token(token); + } + info!(logger, "Amp Flight client connected"; + "chain" => chain_name.as_str(), + "host" => ?amp_chain_config.address.host() + ); + clients.insert(chain_name.clone(), Arc::new(client)); + } + Err(e) => { + error!(logger, "Failed to connect Amp Flight client"; + "chain" => chain_name.as_str(), + "error" => e.to_string() + ); + } + } } - - info!(logger, "Amp-powered subgraphs enabled"; - "amp_flight_service_host" => ?addr.host() - ); - - Some(Arc::new(amp_client)) - } - None => { - warn!(logger, "Amp-powered subgraphs disabled"); - None + if clients.is_empty() { + warn!( + logger, + "Amp-powered subgraphs disabled — all configured chains failed to connect" + ); + } else { + let chain_names: Vec<&str> = clients.keys().map(|s| s.as_str()).collect(); + info!(logger, "Amp enabled for chains: {}", chain_names.join(", ")); + } + AmpClients::new(clients) } }; @@ -570,7 +597,7 @@ pub async fn run( blockchain_map.clone(), network_store.clone(), link_resolver.clone(), - amp_client.cheap_clone(), + amp_clients.clone(), ); if !opt.disable_block_ingestor { @@ -597,7 +624,8 @@ pub async fn run( subscription_manager, arweave_service, ipfs_service, - amp_client, + amp_clients, + amp_chain_configs, cancel_token, amp_chain_names, ); diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 2c23eb5151d..08b884ff476 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -11,7 +11,7 @@ use graph::amp; use graph::anyhow::bail; use graph::cheap_clone::CheapClone; use graph::components::link_resolver::{ArweaveClient, FileSizeLimit}; -use graph::components::network_provider::chain_id_validator; +use graph::components::network_provider::{chain_id_validator, AmpClients}; use graph::components::store::DeploymentLocator; use graph::components::subgraph::{Settings, SubgraphInstanceManager as _}; use graph::endpoint::EndpointMetrics; @@ -21,7 +21,7 @@ use graph::prelude::{ SubgraphCountMetric, SubgraphName, SubgraphRegistrar, SubgraphStore, SubgraphVersionSwitchingMode, ENV_VARS, }; -use graph::slog::{debug, info, Logger}; +use graph::slog::{debug, error, info, warn, Logger}; use graph_core::polling_monitor::{arweave_service, ipfs_service}; use tokio_util::sync::CancellationToken; @@ -40,7 +40,6 @@ pub async fn run( _network_name: String, ipfs_url: Vec, arweave_url: String, - amp_flight_service_address: Option, config: Config, metrics_ctx: MetricsContext, node_id: NodeId, @@ -144,41 +143,75 @@ pub async fn run( let mut subgraph_instance_managers = graph_core::subgraph_provider::SubgraphInstanceManagers::new(); - let amp_client = match amp_flight_service_address { - Some(amp_flight_service_address) => { - let addr = amp_flight_service_address - .parse() - .expect("Invalid Amp Flight service address"); + let amp_chain_configs = config + .amp_chain_configs() + .expect("Failed to load Amp chain configs"); - let mut amp_client = amp::FlightClient::new(addr) - .await - .expect("Failed to connect to Amp Flight service"); - - if let Some(auth_token) = &env_vars.amp.flight_service_token { - amp_client.set_auth_token(auth_token); - } - - let amp_client = Arc::new(amp_client); - let amp_instance_manager = graph_core::amp_subgraph::Manager::new( - &logger_factory, - metrics_registry.cheap_clone(), - env_vars.cheap_clone(), - &cancel_token, - network_store.subgraph_store(), - link_resolver.cheap_clone(), - amp_client.cheap_clone(), - ); - - subgraph_instance_managers.add( - graph_core::subgraph_provider::SubgraphProcessingKind::Amp, - Arc::new(amp_instance_manager), + let amp_clients = { + if amp_chain_configs.is_empty() { + info!( + logger, + "Amp support disabled — no chains have [amp] configuration" ); - - Some(amp_client) + AmpClients::new(std::collections::HashMap::new()) + } else { + let mut clients = std::collections::HashMap::new(); + for (chain_name, amp_chain_config) in &_chain_configs { + debug!(logger, "Connecting to Amp Flight service"; + "chain" => chain_name.as_str(), + "host" => ?amp_chain_config.address.host(), + "port" => ?amp_chain_config.address.port() + ); + match amp::FlightClient::new(amp_chain_config.address.clone()).await { + Ok(mut client) => { + if let Some(token) = &_chain_config.token { + client.set_auth_token(token); + } + info!(logger, "Amp Flight client connected"; + "chain" => chain_name.as_str(), + "host" => ?amp_chain_config.address.host() + ); + clients.insert(chain_name.clone(), Arc::new(client)); + } + Err(e) => { + error!(logger, "Failed to connect Amp Flight client"; + "chain" => chain_name.as_str(), + "error" => e.to_string() + ); + } + } + } + if clients.is_empty() { + warn!( + logger, + "Amp-powered subgraphs disabled — all configured chains failed to connect" + ); + } else { + let chain_names: Vec<&str> = clients.keys().map(|s| s.as_str()).collect(); + info!(logger, "Amp enabled for chains: {}", chain_names.join(", ")); + } + AmpClients::new(clients) } - None => None, }; + if !amp_clients.is_empty() { + let amp_instance_manager = graph_core::amp_subgraph::Manager::new( + &logger_factory, + metrics_registry.cheap_clone(), + env_vars.cheap_clone(), + &cancel_token, + network_store.subgraph_store(), + link_resolver.cheap_clone(), + amp_clients.clone(), + amp_chain_configs.clone(), + ); + + subgraph_instance_managers.add( + graph_core::subgraph_provider::SubgraphProcessingKind::Amp, + Arc::new(amp_instance_manager), + ); + } + let subgraph_instance_manager = graph_core::subgraph::SubgraphInstanceManager::new( &logger_factory, env_vars.cheap_clone(), @@ -189,7 +222,7 @@ pub async fn run( link_resolver.cheap_clone(), ipfs_service, arweave_service, - amp_client.cheap_clone(), + amp_clients.clone(), static_filters, ); @@ -216,7 +249,8 @@ pub async fn run( subgraph_provider.cheap_clone(), subgraph_store.clone(), panicking_subscription_manager, - amp_client, + amp_clients, + amp_chain_configs, blockchain_map, node_id.clone(), SubgraphVersionSwitchingMode::Instant, diff --git a/node/src/opt.rs b/node/src/opt.rs index 9372d4f1472..9441921ceae 100644 --- a/node/src/opt.rs +++ b/node/src/opt.rs @@ -230,14 +230,6 @@ pub struct Opt { help = "Port for the graphman GraphQL server" )] pub graphman_port: u16, - - #[clap( - long, - value_name = "{HOST:PORT|URL}", - env = "GRAPH_AMP_FLIGHT_SERVICE_ADDRESS", - help = "The address of the Amp Flight gRPC service" - )] - pub amp_flight_service_address: Option, } impl From for config::Opt { diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index b8385866d33..bee5fe3c1df 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -10,10 +10,11 @@ use git_testament::{git_testament, CommitKind}; use graph::amp; use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap}; use graph::components::link_resolver::LinkResolverContext; +use graph::components::network_provider::AmpClients; use graph::components::store::{BlockPtrForNumber, BlockStore, QueryPermit, Store}; use graph::components::versions::VERSIONS; use graph::data::graphql::{object, IntoValue, ObjectOrInterface, ValueMap}; -use graph::data::subgraph::{status, DeploymentFeatures}; +use graph::data::subgraph::{network_name_from_raw_manifest, status, DeploymentFeatures}; use graph::data::value::Object; use graph::futures03::TryFutureExt; use graph::prelude::*; @@ -101,7 +102,7 @@ pub struct IndexNodeResolver { blockchain_map: Arc, store: Arc, link_resolver: Arc, - amp_client: Option>, + amp_clients: AmpClients, bearer_token: Option, } @@ -114,7 +115,7 @@ where logger: &Logger, store: Arc, link_resolver: Arc, - amp_client: Option>, + amp_clients: AmpClients, bearer_token: Option, blockchain_map: Arc, ) -> Self { @@ -125,7 +126,7 @@ where blockchain_map, store, link_resolver, - amp_client, + amp_clients, bearer_token, } } @@ -520,6 +521,12 @@ where let kind = BlockchainKind::from_manifest(&raw_yaml) .map_err(SubgraphManifestResolveError::ResolveError)?; + // Extract the network name from the raw yaml to look up the + // per-chain Amp client. + let amp_client = network_name_from_raw_manifest(&raw_yaml) + .as_ref() + .and_then(|network| self.amp_clients.get(network)); + let max_spec_version = ENV_VARS.max_spec_version.clone(); let result = match kind { @@ -529,7 +536,8 @@ where deployment_hash.clone(), raw_yaml, &self.link_resolver, - self.amp_client.cheap_clone(), + amp_client.cheap_clone(), + None, &self.logger, max_spec_version, ) @@ -547,7 +555,8 @@ where deployment_hash.clone(), raw_yaml, &self.link_resolver, - self.amp_client.cheap_clone(), + amp_client, + None, &self.logger, max_spec_version, ) diff --git a/server/index-node/src/server.rs b/server/index-node/src/server.rs index 00b62c09ca2..58cc5b5dfbd 100644 --- a/server/index-node/src/server.rs +++ b/server/index-node/src/server.rs @@ -1,8 +1,8 @@ use graph::{ amp, blockchain::BlockchainMap, - cheap_clone::CheapClone, components::{ + network_provider::AmpClients, server::server::{start, ServerHandle}, store::Store, }, @@ -17,7 +17,7 @@ pub struct IndexNodeServer { blockchain_map: Arc, store: Arc, link_resolver: Arc, - amp_client: Option>, + amp_clients: AmpClients, } impl IndexNodeServer @@ -31,7 +31,7 @@ where blockchain_map: Arc, store: Arc, link_resolver: Arc, - amp_client: Option>, + amp_clients: AmpClients, ) -> Self { let logger = logger_factory.component_logger( "IndexNodeServer", @@ -47,7 +47,7 @@ where blockchain_map, store, link_resolver, - amp_client, + amp_clients, } } @@ -68,7 +68,7 @@ where self.blockchain_map.clone(), store, self.link_resolver.clone(), - self.amp_client.cheap_clone(), + self.amp_clients.clone(), )); start(logger_for_service.clone(), port, move |req| { diff --git a/server/index-node/src/service.rs b/server/index-node/src/service.rs index 09ddfd29038..68c89ef301c 100644 --- a/server/index-node/src/service.rs +++ b/server/index-node/src/service.rs @@ -16,7 +16,7 @@ use graph::hyper::header::{ use graph::hyper::{body::Body, Method, Request, Response, StatusCode}; use graph::amp; -use graph::components::{server::query::ServerError, store::Store}; +use graph::components::{network_provider::AmpClients, server::query::ServerError, store::Store}; use graph::data::query::{Query, QueryError, QueryResult, QueryResults}; use graph::prelude::{q, serde_json}; use graph::slog::{debug, error, Logger}; @@ -46,7 +46,7 @@ pub struct IndexNodeService { store: Arc, explorer: Arc>, link_resolver: Arc, - amp_client: Option>, + amp_clients: AmpClients, } impl IndexNodeService @@ -59,7 +59,7 @@ where blockchain_map: Arc, store: Arc, link_resolver: Arc, - amp_client: Option>, + amp_clients: AmpClients, ) -> Self { let explorer = Arc::new(Explorer::new(store.clone())); @@ -69,7 +69,7 @@ where store, explorer, link_resolver, - amp_client, + amp_clients, } } @@ -143,7 +143,7 @@ where &logger, store, self.link_resolver.clone(), - self.amp_client.cheap_clone(), + self.amp_clients.clone(), validated.bearer_token, self.blockchain_map.clone(), ); diff --git a/store/test-store/tests/chain/ethereum/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs index a201a9c233e..63b9a36726c 100644 --- a/store/test-store/tests/chain/ethereum/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -145,6 +145,7 @@ async fn try_resolve_manifest( raw, &resolver, Option::>::None, + None, &LOGGER, max_spec_version, ) @@ -175,6 +176,7 @@ async fn resolve_unvalidated(text: &str) -> UnvalidatedSubgraphManifest { raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_0_0_4.clone(), ) @@ -1330,6 +1332,7 @@ schema: raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_0_0_4.clone(), ) @@ -1373,6 +1376,7 @@ schema: raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_0_0_4.clone(), ) @@ -1450,6 +1454,7 @@ dataSources: raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_0_0_4.clone(), ) @@ -1529,6 +1534,7 @@ dataSources: raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_0_0_4.clone(), ) @@ -1619,6 +1625,7 @@ dataSources: raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_1_2_0.clone(), ) @@ -1693,6 +1700,7 @@ dataSources: raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_1_3_0.clone(), ) @@ -1844,6 +1852,7 @@ specVersion: 1.3.0 raw, &resolver, Option::>::None, + None, &LOGGER, SPEC_VERSION_1_3_0.clone(), ) diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 234890730e5..cdf09772365 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -585,7 +585,7 @@ pub async fn setup_inner( link_resolver.cheap_clone(), ipfs_service, arweave_service, - None, + graph::components::network_provider::AmpClients::::default(), static_filters, )); @@ -620,7 +620,7 @@ pub async fn setup_inner( blockchain_map.cheap_clone(), stores.network_store.cheap_clone(), link_resolver.cheap_clone(), - None, + graph::components::network_provider::AmpClients::::default(), )); let panicking_subscription_manager = Arc::new(PanicSubscriptionManager {}); @@ -631,7 +631,8 @@ pub async fn setup_inner( subgraph_provider.cheap_clone(), subgraph_store.clone(), panicking_subscription_manager, - Option::>::None, + graph::components::network_provider::AmpClients::::default(), + std::collections::HashMap::new(), blockchain_map.clone(), node_id.clone(), SubgraphVersionSwitchingMode::Instant,