Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/google/adk/events/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,19 @@ class Event(LlmResponse):
conversation history.
"""

tool_usage_metadata: Optional[dict[str, types.GenerateContentResponseUsageMetadata]] = None
"""Token usage metadata for tools and sub-agents invoked during this event.

Maps tool/agent names to their respective usage metadata, enabling granular
cost tracking for nested agent architectures and Vertex AI features.

Example:
{
"vertex_ai_search": UsageMetadata(prompt_token_count=100, ...),
"sub_agent_name": UsageMetadata(prompt_token_count=500, ...),
}
"""

# The following are computed fields.
# Do not assign the ID. It will be assigned by the session.
id: str = ''
Expand Down
4 changes: 4 additions & 0 deletions src/google/adk/flows/llm_flows/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -962,12 +962,16 @@ def __build_response_event(
parts=[part_function_response],
)

# Collect tool usage metadata from tool context
tool_usage = tool_context.get_all_tool_usage()

function_response_event = Event(
invocation_id=invocation_context.invocation_id,
author=invocation_context.agent.name,
content=content,
actions=tool_context.actions,
branch=invocation_context.branch,
tool_usage_metadata=tool_usage if tool_usage else None,
)

return function_response_event
Expand Down
42 changes: 42 additions & 0 deletions src/google/adk/models/llm_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,48 @@ class LlmResponse(BaseModel):
It can be used to identify and chain interactions for stateful conversations.
"""

@staticmethod
def merge_usage_metadata(
metadata_list: list[Optional[types.GenerateContentResponseUsageMetadata]]
) -> Optional[types.GenerateContentResponseUsageMetadata]:
"""Merges multiple usage metadata objects into a single aggregate.

Args:
metadata_list: List of usage metadata objects to merge.

Returns:
Merged usage metadata with cumulative token counts, or None if all inputs
are None.
"""
if not metadata_list or all(m is None for m in metadata_list):
return None

total_prompt_tokens = 0
total_candidates_tokens = 0
total_tokens = 0
total_cached_tokens = 0

for metadata in metadata_list:
if metadata:
total_prompt_tokens += metadata.prompt_token_count or 0
total_candidates_tokens += metadata.candidates_token_count or 0
total_tokens += metadata.total_token_count or 0
if hasattr(metadata, 'cached_content_token_count'):
total_cached_tokens += metadata.cached_content_token_count or 0

# Create merged metadata
merged = types.GenerateContentResponseUsageMetadata(
prompt_token_count=total_prompt_tokens,
candidates_token_count=total_candidates_tokens,
total_token_count=total_tokens,
)

# Add cached tokens if any were present
if total_cached_tokens > 0:
merged.cached_content_token_count = total_cached_tokens

return merged

@staticmethod
def create(
generate_content_response: types.GenerateContentResponse,
Expand Down
18 changes: 18 additions & 0 deletions src/google/adk/plugins/bigquery_agent_analytics_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2364,6 +2364,24 @@ async def after_model_callback(
if usage_dict:
content_dict["usage"] = usage_dict

# Add tool-level usage metadata from nested agents and Vertex AI features
if hasattr(llm_response, "tool_usage_metadata") and llm_response.tool_usage_metadata:
tool_usage_dict = {}
for tool_name, tool_usage in llm_response.tool_usage_metadata.items():
if tool_usage:
tool_usage_entry = {}
if hasattr(tool_usage, "prompt_token_count"):
tool_usage_entry["prompt"] = tool_usage.prompt_token_count
if hasattr(tool_usage, "candidates_token_count"):
tool_usage_entry["completion"] = tool_usage.candidates_token_count
if hasattr(tool_usage, "total_token_count"):
tool_usage_entry["total"] = tool_usage.total_token_count
if tool_usage_entry:
tool_usage_dict[tool_name] = tool_usage_entry

if tool_usage_dict:
content_dict["tool_usage"] = tool_usage_dict

if content_dict:
content_str = content_dict
else:
Expand Down
33 changes: 33 additions & 0 deletions src/google/adk/telemetry/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,39 @@ def trace_tool_call(
else:
span.set_attribute('gcp.vertex.agent.tool_response', '{}')

# Add tool-level usage metadata if available
if (
function_response_event is not None
and function_response_event.tool_usage_metadata
):
total_prompt_tokens = 0
total_completion_tokens = 0
total_tokens = 0

for tool_name, usage_metadata in function_response_event.tool_usage_metadata.items():
if usage_metadata:
total_prompt_tokens += getattr(usage_metadata, 'prompt_token_count', 0) or 0
total_completion_tokens += getattr(usage_metadata, 'candidates_token_count', 0) or 0
total_tokens += getattr(usage_metadata, 'total_token_count', 0) or 0

if total_tokens > 0:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, total_prompt_tokens)
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, total_completion_tokens)
span.set_attribute('gcp.vertex.agent.tool_usage_total_tokens', total_tokens)

# Add detailed breakdown as custom attribute
span.set_attribute(
'gcp.vertex.agent.tool_usage_breakdown',
_safe_json_serialize({
name: {
'prompt_tokens': getattr(usage, 'prompt_token_count', 0) or 0,
'completion_tokens': getattr(usage, 'candidates_token_count', 0) or 0,
'total_tokens': getattr(usage, 'total_token_count', 0) or 0,
}
for name, usage in function_response_event.tool_usage_metadata.items()
})
)
Comment on lines +222 to +248
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation iterates over function_response_event.tool_usage_metadata twice: once to calculate the total token counts and again to create the breakdown dictionary. This can be optimized by performing both operations in a single loop. This refactoring will improve efficiency and make the code more concise.

    total_prompt_tokens = 0
    total_completion_tokens = 0
    total_tokens = 0
    breakdown = {}

    for name, usage in function_response_event.tool_usage_metadata.items():
      if not usage:
        continue

      prompt_tokens = getattr(usage, 'prompt_token_count', 0) or 0
      completion_tokens = getattr(usage, 'candidates_token_count', 0) or 0
      usage_total_tokens = getattr(usage, 'total_token_count', 0) or 0

      total_prompt_tokens += prompt_tokens
      total_completion_tokens += completion_tokens
      total_tokens += usage_total_tokens

      breakdown[name] = {
          'prompt_tokens': prompt_tokens,
          'completion_tokens': completion_tokens,
          'total_tokens': usage_total_tokens,
      }

    if total_tokens > 0:
      span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, total_prompt_tokens)
      span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, total_completion_tokens)
      span.set_attribute('gcp.vertex.agent.tool_usage_total_tokens', total_tokens)

      # Add detailed breakdown as custom attribute
      span.set_attribute(
          'gcp.vertex.agent.tool_usage_breakdown',
          _safe_json_serialize(breakdown),
      )



def trace_merged_tool_calls(
response_event_id: str,
Expand Down
18 changes: 18 additions & 0 deletions src/google/adk/tools/agent_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,24 @@ async def run_async(
state=state_dict,
)

# Aggregate usage metadata from sub-agent execution
usage_metadata_list = []
last_content = None
async with Aclosing(
runner.run_async(
user_id=session.user_id, session_id=session.id, new_message=content
)
) as agen:
async for event in agen:
# Collect usage metadata from each event
if event.usage_metadata:
usage_metadata_list.append(event.usage_metadata)

# Also collect tool-level usage from nested events
if event.tool_usage_metadata:
for tool_name, tool_usage in event.tool_usage_metadata.items():
usage_metadata_list.append(tool_usage)

# Forward state delta to parent session.
if event.actions.state_delta:
tool_context.state.update(event.actions.state_delta)
Expand All @@ -261,6 +272,13 @@ async def run_async(
# to avoid "Attempted to exit cancel scope in a different task" errors
await runner.close()

# Aggregate and record usage for this sub-agent
if usage_metadata_list:
from ..models.llm_response import LlmResponse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This local import should be moved to the top of the file (e.g., with other from .. imports) to adhere to PEP 8 style guidelines and improve code readability. Top-level imports make dependencies explicit and easier to manage.

aggregated_usage = LlmResponse.merge_usage_metadata(usage_metadata_list)
if aggregated_usage:
tool_context.set_tool_usage(self.agent.name, aggregated_usage)

if last_content is None or last_content.parts is None:
return ''
merged_text = '\n'.join(
Expand Down
34 changes: 34 additions & 0 deletions src/google/adk/tools/tool_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,45 @@ def __init__(
super().__init__(invocation_context, event_actions=event_actions)
self.function_call_id = function_call_id
self.tool_confirmation = tool_confirmation
self._tool_usage: dict[str, Any] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better type safety, consider using a more specific type hint instead of Any. You can use a forward reference string (e.g., 'types.GenerateContentResponseUsageMetadata') to avoid circular import issues. This would also require adding from google.genai import types to the TYPE_CHECKING block at the top of the file.

Suggested change
self._tool_usage: dict[str, Any] = {}
self._tool_usage: dict[str, "types.GenerateContentResponseUsageMetadata"] = {}


@property
def actions(self) -> EventActions:
return self._event_actions

def set_tool_usage(
self,
tool_name: str,
usage_metadata: Any,
) -> None:
"""Records usage metadata for a tool or sub-agent invocation.

Args:
tool_name: Name of the tool or agent that generated usage.
usage_metadata: Usage metadata object (GenerateContentResponseUsageMetadata
or dict with token counts).
"""
self._tool_usage[tool_name] = usage_metadata

def get_tool_usage(self, tool_name: str) -> Optional[Any]:
"""Retrieves usage metadata for a specific tool.

Args:
tool_name: Name of the tool to retrieve usage for.

Returns:
Usage metadata if recorded, None otherwise.
"""
return self._tool_usage.get(tool_name)

def get_all_tool_usage(self) -> dict[str, Any]:
"""Returns all tool usage metadata recorded in this context.

Returns:
Dictionary mapping tool names to their usage metadata.
"""
return self._tool_usage.copy()
Comment on lines +68 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To improve type safety and code clarity, the type hints for usage_metadata and the return types of these new methods should be made more specific than Any. Using 'types.GenerateContentResponseUsageMetadata' with a forward reference string is recommended. This change should be applied consistently with the update to _tool_usage.

  def set_tool_usage(
      self,
      tool_name: str,
      usage_metadata: "types.GenerateContentResponseUsageMetadata",
  ) -> None:
    """Records usage metadata for a tool or sub-agent invocation.

    Args:
      tool_name: Name of the tool or agent that generated usage.
      usage_metadata: Usage metadata object (GenerateContentResponseUsageMetadata
        or dict with token counts).
    """
    self._tool_usage[tool_name] = usage_metadata

  def get_tool_usage(self, tool_name: str) -> Optional["types.GenerateContentResponseUsageMetadata"]:
    """Retrieves usage metadata for a specific tool.

    Args:
      tool_name: Name of the tool to retrieve usage for.

    Returns:
      Usage metadata if recorded, None otherwise.
    """
    return self._tool_usage.get(tool_name)

  def get_all_tool_usage(self) -> dict[str, "types.GenerateContentResponseUsageMetadata"]:
    """Returns all tool usage metadata recorded in this context.

    Returns:
      Dictionary mapping tool names to their usage metadata.
    """
    return self._tool_usage.copy()


def request_credential(self, auth_config: AuthConfig) -> None:
if not self.function_call_id:
raise ValueError('function_call_id is not set.')
Expand Down