Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ lcov.info

# Node dependencies
node_modules/
.pnpm-store/

# Docker volumes and debug logs
.postgres
Expand Down
56 changes: 53 additions & 3 deletions docs/aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ accepts the following arguments:
dimension
- A mandatory `interval`
- An optional `current` to indicate whether to include the current,
partially filled bucket in the response. Can be either `ignore` (the
default) or `include` (still **TODO** and not implemented)
partially filled bucket in the response. Can be either `exclude` (the
default) or `include`
- Optional `timestamp_{gte|gt|lt|lte|eq|in}` filters to restrict the range
of timestamps to return. The timestamp to filter by must be a string
containing microseconds since the epoch. The value `"1704164640000000"`
Expand All @@ -189,7 +189,7 @@ accepts the following arguments:

```graphql
token_stats(interval: "hour",
current: ignore,
current: exclude,
where: {
token: "0x1234",
timestamp_gte: 1234567890,
Expand All @@ -201,3 +201,53 @@ token_stats(interval: "hour",
avgVolume
}
```

### Current Bucket

By default, aggregation queries return only completed, rolled-up buckets
(`current: exclude`). These are buckets whose time interval has ended and
whose data has been fully aggregated by `graph-node`'s rollup process.

Setting `current: include` adds an additional, partially filled bucket that
is computed on-the-fly from the unrolled timeseries data that has been
inserted since the last rollup. This current bucket aggregates raw data
points from the source timeseries table that have not yet been rolled up
into the aggregation table. It covers the time period from the end of the
last completed bucket up to the most recent data point.

- `current: exclude` (default) — return only completed, rolled-up buckets
- `current: include` — also return the in-progress bucket computed from
unrolled source data

The current bucket is useful when you need near-real-time aggregation data
without waiting for the next rollup cycle to complete.

#### Nested Aggregation Queries

The `current` argument also works on nested aggregation fields accessed
through a parent entity. For example, if a `Token` entity has a derived
aggregation field `tokenStats`, you can query the current bucket for each
token:

```graphql
{
tokens {
id
name
tokenStats(interval: "hour", current: include) {
timestamp
totalVolume
}
}
}
```

This returns both the completed rolled-up hourly buckets and the current
in-progress bucket for each token's stats.

#### Limitations

Current bucket support for nested aggregation fields is only available when
the field references a single aggregation type. It is not supported when the
aggregation field is accessed through an interface with multiple
implementations.
16 changes: 16 additions & 0 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ pub struct EntityQuery {
pub query_id: Option<String>,

pub trace: bool,

pub aggregation_current: Option<AggregationCurrent>,
}

impl EntityQuery {
Expand All @@ -484,6 +486,7 @@ impl EntityQuery {
logger: None,
query_id: None,
trace: false,
aggregation_current: None,
}
}

Expand Down Expand Up @@ -543,6 +546,19 @@ impl EntityQuery {
}
}

/// Indicates whether the current, partially filled bucket should be included in the response.
///
/// This is only relevant for aggregation entity queries.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum AggregationCurrent {
/// Exclude the current, partially filled bucket from the response.
#[default]
Exclude,

/// Include the current, partially filled bucket in the response.
Include,
}

/// Operation types that lead to changes in assignments
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "lowercase")]
Expand Down
7 changes: 7 additions & 0 deletions graph/src/schema/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,11 @@ impl FilterOps {
"",
s::Type::NamedType("OrderDirection".to_string()),
),
input_value(
"current",
"",
s::Type::NamedType("Aggregation_current".to_string()),
),
],
};

Expand Down Expand Up @@ -2231,6 +2236,8 @@ type Gravatar @entity {
assert_eq!("Aggregation_interval", interval.value_type.get_base_type());
let filter = field.argument("where").unwrap();
assert_eq!(&filter_type, filter.value_type.get_base_type());
let current = field.argument("current").unwrap();
assert_eq!("Aggregation_current", current.value_type.get_base_type());

let s::TypeDefinition::InputObject(filter) = schema
.get_type_definition_from_type(&filter.value_type)
Expand Down
1 change: 1 addition & 0 deletions graph/src/schema/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub mod kw {
pub const INTERVALS: &str = "intervals";
pub const INTERVAL: &str = "interval";
pub const CUMULATIVE: &str = "cumulative";
pub const CURRENT: &str = "current";
}

/// The internal representation of a subgraph schema, i.e., the
Expand Down
9 changes: 9 additions & 0 deletions graph/src/schema/meta.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,12 @@ enum Aggregation_interval {
hour
day
}

"Indicates whether the current, partially filled bucket should be included in the response. Defaults to `exclude`"
enum Aggregation_current {
"Exclude the current, partially filled bucket from the response"
exclude

"Include the current, partially filled bucket in the response"
include
}
22 changes: 21 additions & 1 deletion graphql/src/execution/ast.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::{BTreeSet, HashSet};

use graph::{
components::store::{AttributeNames, ChildMultiplicity, EntityOrder},
components::store::{AggregationCurrent, AttributeNames, ChildMultiplicity, EntityOrder},
data::{graphql::ObjectOrInterface, store::ID},
env::ENV_VARS,
prelude::{anyhow, q, r, s, QueryExecutionError, ValueMap},
Expand Down Expand Up @@ -364,6 +364,26 @@ impl Field {
})
.transpose()
}

pub fn aggregation_current(&self) -> Result<Option<AggregationCurrent>, QueryExecutionError> {
let Some(value) = self.argument_value(kw::CURRENT) else {
return Ok(None);
};

if let r::Value::Enum(current) = value {
match current.as_str() {
"exclude" => return Ok(Some(AggregationCurrent::Exclude)),
"include" => return Ok(Some(AggregationCurrent::Include)),
_ => {}
}
}

Err(QueryExecutionError::InvalidArgumentError(
self.position,
kw::CURRENT.to_string(),
q::Value::from(value.clone()),
))
}
}

impl ValueMap for Field {
Expand Down
19 changes: 12 additions & 7 deletions graphql/src/store/prefetch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Run a GraphQL query and fetch all the entitied needed to build the
//! final result

use graph::components::store::AggregationCurrent;
use graph::data::graphql::ObjectTypeExt;
use graph::data::query::Trace;
use graph::data::store::Id;
Expand All @@ -9,7 +10,6 @@ use graph::data::store::IdType;
use graph::data::store::QueryObject;
use graph::data::value::{Object, Word};
use graph::prelude::{r, CacheWeight, CheapClone};
use graph::schema::kw;
use graph::schema::AggregationInterval;
use graph::schema::Field;
use graph::slog::warn;
Expand Down Expand Up @@ -626,6 +626,7 @@ impl<'a> Loader<'a> {
let child_type = input_schema
.object_or_interface(field_type.field_type.get_base_type(), child_interval)
.expect("we only collect fields that are objects or interfaces");
let aggregation_current = field.aggregation_current()?;

let join = if at_root {
MaybeJoin::Root { child_type }
Expand All @@ -644,6 +645,7 @@ impl<'a> Loader<'a> {
let field_type = object_type
.field(&field.name)
.expect("field names are valid");

MaybeJoin::Nested(Join::new(
&input_schema,
object_type.cheap_clone(),
Expand All @@ -652,7 +654,10 @@ impl<'a> Loader<'a> {
))
};

match self.fetch(&parents, &join, field).await {
match self
.fetch(&parents, &join, field, aggregation_current)
.await
{
Ok((children, trace)) => {
let exec_fut = Box::pin(self.execute_selection_set(
children,
Expand Down Expand Up @@ -696,6 +701,7 @@ impl<'a> Loader<'a> {
parents: &[&mut Node],
join: &MaybeJoin<'_>,
field: &a::Field,
aggregation_current: Option<AggregationCurrent>,
) -> Result<(Vec<Node>, Trace), QueryExecutionError> {
let input_schema = self.resolver.store.input_schema().await?;
let child_type = join.child_type();
Expand All @@ -715,11 +721,6 @@ impl<'a> Loader<'a> {
// that causes unnecessary work in the database
query.order = EntityOrder::Unordered;
}
// Apply default timestamp ordering for aggregations if no custom order is specified
if child_type.is_aggregation() && matches!(query.order, EntityOrder::Default) {
let ts = child_type.field(kw::TIMESTAMP).unwrap();
query.order = EntityOrder::Descending(ts.name.to_string(), ts.value_type);
}
query.logger = Some(self.ctx.logger.cheap_clone());
if let Some(r::Value::String(id)) = field.argument_value(ARG_ID) {
query.filter = Some(
Expand All @@ -728,6 +729,10 @@ impl<'a> Loader<'a> {
);
}

if child_type.is_aggregation() {
query.aggregation_current = Some(aggregation_current.unwrap_or_default());
}

if let MaybeJoin::Nested(join) = join {
// For anything but the root node, restrict the children we select
// by the parent list
Expand Down
9 changes: 9 additions & 0 deletions graphql/src/store/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use graph::data::value::Object;
use graph::data::value::Value as DataValue;
use graph::prelude::{r, TryFromValue, ENV_VARS};
use graph::schema::ast::{self as sast, FilterOp};
use graph::schema::kw;
use graph::schema::{EntityType, InputSchema, ObjectOrInterface};

use crate::execution::ast as a;
Expand Down Expand Up @@ -552,6 +553,14 @@ fn build_order(
}
}
}
// Apply a default ordering to the aggregations so that the most recent buckets are returned first
(None, _) if entity.is_aggregation() => {
let ts = entity
.field(kw::TIMESTAMP)
.expect("aggregation entities have timestamps");

EntityOrder::Descending(ts.name.to_string(), ts.value_type)
}
(None, _) => EntityOrder::Default,
};
Ok(order)
Expand Down
Loading