diff --git a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java index 3b0421b44..c024a92f0 100644 --- a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java +++ b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java @@ -13,17 +13,78 @@ /** * gRPC server interceptor that captures request metadata and context information, - * providing equivalent functionality to Python's grpc.aio.ServicerContext. + * providing equivalent functionality to Python's {@code grpc.aio.ServicerContext}. * - * This interceptor: - * - Extracts A2A extension headers from incoming requests - * - Captures ServerCall and Metadata for rich context access - * - Stores context information in gRPC Context for service method access - * - Provides proper equivalence to Python's ServicerContext + *
This interceptor executes before service methods are invoked, extracting A2A protocol + * headers and request metadata from the gRPC call and storing them in the gRPC {@link Context} + * for access by {@link io.a2a.transport.grpc.handler.GrpcHandler} and agent implementations. + * + *
All captured information is stored in the gRPC {@link Context} using keys from + * {@link io.a2a.transport.grpc.context.GrpcContextKeys}: + *
This interceptor is registered as an {@code @ApplicationScoped} CDI bean and automatically + * applied to gRPC services through Quarkus gRPC's {@code @RegisterInterceptor} annotation. + * + *
This interceptor provides functionality equivalent to Python's {@code grpc.aio.ServicerContext}, + * enabling Java handlers to access the same rich context information available in Python implementations: + *
This method extracts A2A protocol headers and request metadata, stores them + * in the gRPC {@link Context}, and proceeds with the call in the enhanced context. + * + *
Extraction Process: + *
This class provides a production-ready gRPC service built on Quarkus gRPC, + * implementing the A2A protocol with CDI integration, authentication, and + * interceptor support for metadata extraction. + * + *
This class is a Quarkus gRPC service ({@code @GrpcService}) that automatically: + *
The service is protected with {@code @Authenticated} annotation, requiring + * authentication for all gRPC method calls. Configure authentication in + * {@code application.properties}: + *
+ * quarkus.security.users.embedded.enabled=true + * quarkus.security.users.embedded.plain-text=true + * quarkus.security.users.embedded.users.alice=password + *+ * + *
The {@code @RegisterInterceptor} annotation automatically registers + * {@link A2AExtensionsInterceptor} to capture A2A protocol headers and + * metadata before service methods are invoked. + * + *
To customize context creation, provide a CDI bean implementing + * {@link CallContextFactory}: + *
{@code
+ * @ApplicationScoped
+ * public class CustomCallContextFactory implements CallContextFactory {
+ * @Override
+ * public ServerCallContext create(StreamObserver responseObserver) {
+ * // Custom context creation logic
+ * }
+ * }
+ * }
+ *
+ * @see io.a2a.transport.grpc.handler.GrpcHandler
+ * @see A2AExtensionsInterceptor
+ * @see CallContextFactory
+ */
@GrpcService
@RegisterInterceptor(A2AExtensionsInterceptor.class)
@Authenticated
@@ -29,11 +78,30 @@ public class QuarkusGrpcHandler extends GrpcHandler {
private final Executor executor;
/**
- * No-args constructor for CDI proxy creation.
- * CDI requires a non-private constructor to create proxies for @ApplicationScoped beans.
- * All fields are initialized by the @Inject constructor during actual bean creation.
+ * Constructs a new QuarkusGrpcHandler with CDI-injected dependencies.
+ *
+ * This constructor is invoked by CDI to create the gRPC service bean, + * injecting all required and optional dependencies. + * + *
Required Dependencies: + *
Optional Dependencies: + *
This class identifies the transport protocol used by the gRPC server implementation. + * It is automatically discovered by the A2A server framework through CDI to provide + * protocol-specific metadata to components that need to distinguish between different + * transport implementations. + * + *
This bean is automatically registered and can be injected where transport + * protocol information is needed: + *
{@code
+ * @Inject
+ * TransportMetadata transportMetadata;
+ *
+ * public void logProtocol() {
+ * String protocol = transportMetadata.getTransportProtocol();
+ * // Returns "grpc" for this implementation
+ * }
+ * }
+ *
+ * This package provides a production-ready gRPC server implementation built on + * Quarkus gRPC and Protocol Buffers, demonstrating best practices for A2A protocol + * integration with CDI, authentication, and interceptor support. + * + *
+ * gRPC Request (Protocol Buffers) + * ↓ + * A2AExtensionsInterceptor (metadata extraction) + * ↓ + * QuarkusGrpcHandler (@GrpcService) + * ├─ Protobuf → Domain conversion + * ├─ Create ServerCallContext + * ├─ Route to GrpcHandler (transport layer) + * └─ Domain → Protobuf conversion + * ↓ + * GrpcHandler (transport/grpc) + * ↓ + * RequestHandler (server-common) + * ↓ + * AgentExecutor (your implementation) + *+ * + *
Unary RPC (blocking): + *
Server Streaming RPC: + *
Required CDI Beans: + *
Optional CDI Beans: + *
Add Dependency: + *
{@code
+ *
+ * io.github.a2asdk
+ * a2a-java-sdk-reference-grpc
+ * ${a2a.version}
+ *
+ * }
+ *
+ * Provide Agent Card: + *
{@code
+ * @ApplicationScoped
+ * public class MyAgentCardProducer {
+ * @Produces @PublicAgentCard
+ * public AgentCard agentCard() {
+ * return new AgentCard.Builder()
+ * .name("My gRPC Agent")
+ * .description("Agent description")
+ * .url("http://localhost:9090")
+ * .capabilities(new AgentCapabilities.Builder()
+ * .streaming(true)
+ * .build())
+ * .build();
+ * }
+ * }
+ * }
+ *
+ * Provide Agent Executor: + *
{@code
+ * @ApplicationScoped
+ * public class MyAgentExecutorProducer {
+ * @Produces
+ * public AgentExecutor agentExecutor() {
+ * return new MyAgentExecutor();
+ * }
+ * }
+ * }
+ *
+ * gRPC Server: + *
+ * quarkus.grpc.server.port=9090 + * quarkus.grpc.server.host=0.0.0.0 + *+ * + *
Authentication: + *
+ * quarkus.security.users.embedded.enabled=true + * quarkus.security.users.embedded.plain-text=true + * quarkus.security.users.embedded.users.alice=password + *+ * + *
Custom Context Creation: + *
Provide a CDI bean implementing {@link io.a2a.transport.grpc.handler.CallContextFactory CallContextFactory}: + *
{@code
+ * @ApplicationScoped
+ * public class CustomCallContextFactory implements CallContextFactory {
+ * @Override
+ * public ServerCallContext create(StreamObserver responseObserver) {
+ * // Extract custom data from gRPC context
+ * Context grpcContext = Context.current();
+ * Metadata metadata = GrpcContextKeys.METADATA_KEY.get(grpcContext);
+ * String orgId = metadata.get(
+ * Metadata.Key.of("x-organization-id", Metadata.ASCII_STRING_MARSHALLER)
+ * );
+ *
+ * Map state = new HashMap<>();
+ * state.put("organization", orgId);
+ * state.put("grpc_response_observer", responseObserver);
+ *
+ * return new ServerCallContext(
+ * extractUser(),
+ * state,
+ * extractExtensions(grpcContext),
+ * extractVersion(grpcContext)
+ * );
+ * }
+ * }
+ * }
+ *
+ * This implementation provides equivalent functionality to Python's {@code grpc.aio} server: + *
These keys provide access to gRPC context information stored in + * {@link io.grpc.Context}, enabling rich context access in service method + * implementations similar to Python's {@code grpc.aio.ServicerContext}. + * + *
{@code
+ * public void processRequest(ServerCallContext context) {
+ * // Access gRPC context information
+ * Context grpcContext = Context.current();
+ * String method = GrpcContextKeys.GRPC_METHOD_NAME_KEY.get(grpcContext);
+ * Metadata metadata = GrpcContextKeys.METADATA_KEY.get(grpcContext);
+ * String peerInfo = GrpcContextKeys.PEER_INFO_KEY.get(grpcContext);
+ *
+ * // Access A2A protocol headers
+ * String version = GrpcContextKeys.VERSION_HEADER_KEY.get(grpcContext);
+ * String extensions = GrpcContextKeys.EXTENSIONS_HEADER_KEY.get(grpcContext);
+ * }
+ * }
+ *
+ * These context keys are populated by server interceptors (typically
+ * {@code A2AExtensionsInterceptor}) that capture request metadata and store
+ * it in the gRPC context before service methods are called.
+ *
+ * @see io.grpc.Context
+ * @see io.grpc.Metadata
+ * @see io.a2a.server.ServerCallContext
*/
public final class GrpcContextKeys {
@@ -54,9 +78,32 @@ public final class GrpcContextKeys {
* Context key for storing the peer information.
* Provides access to client connection details.
*/
- public static final Context.Key This mapping translates gRPC protobuf method names to their corresponding
+ * A2A protocol method name constants for consistent method identification across
+ * all transports.
+ *
+ * Method Mappings:
+ * This interface provides an extension point for customizing how {@link ServerCallContext}
+ * instances are created in gRPC applications. The default implementation in {@link GrpcHandler}
+ * extracts standard information (user, metadata, headers, peer info, protocol version), but
+ * applications can provide their own implementation to add custom context data.
+ *
+ * When no CDI bean implementing this interface is provided, {@link GrpcHandler}
+ * creates contexts with:
+ * This method is called for each incoming gRPC request to create the context
+ * that will be passed to the {@link io.a2a.server.requesthandlers.RequestHandler}
+ * and eventually to the {@link io.a2a.server.agentexecution.AgentExecutor}.
+ *
+ * Implementations should extract information from the current {@link io.grpc.Context}
+ * using {@link io.a2a.transport.grpc.context.GrpcContextKeys} to access metadata,
+ * method name, peer info, and A2A protocol headers.
+ *
+ * @param This abstract class implements the gRPC service interface for the A2A protocol,
+ * handling both unary (blocking) and server streaming RPC calls. It translates gRPC
+ * requests to A2A protocol operations, coordinates with the request handler and agent
+ * executor, and manages error handling with appropriate gRPC status codes.
+ *
+ * Unary RPC (blocking):
+ * Server Streaming RPC:
+ * A2A errors are mapped to gRPC status codes:
+ * The handler provides rich context information equivalent to Python's
+ * {@code grpc.aio.ServicerContext}:
+ * Subclasses must implement:
+ * This class is marked with {@code @Vetoed} to prevent direct CDI management.
+ * Subclasses should be CDI beans (e.g., {@code @GrpcService} in Quarkus) that
+ * inject dependencies and provide them through the abstract methods.
+ *
+ * @see io.a2a.grpc.A2AServiceGrpc.A2AServiceImplBase
+ * @see io.a2a.server.requesthandlers.RequestHandler
+ * @see CallContextFactory
+ * @see io.a2a.transport.grpc.context.GrpcContextKeys
+ */
@Vetoed
public abstract class GrpcHandler extends A2AServiceGrpc.A2AServiceImplBase {
@@ -75,10 +157,38 @@ public abstract class GrpcHandler extends A2AServiceGrpc.A2AServiceImplBase {
private static final Logger LOGGER = Logger.getLogger(GrpcHandler.class.getName());
+ /**
+ * Constructs a new GrpcHandler.
+ */
public GrpcHandler() {
}
+ /**
+ * Handles a unary (blocking) message send request.
+ *
+ * This method processes a message send request, waits for the agent to complete
+ * processing, and returns either a Task or Message in the response.
+ *
+ * Protocol Flow:
+ * Error Handling:
+ * This method processes a message send request with streaming response, where
+ * the agent can emit multiple events (artifacts, status updates, messages) as the
+ * task progresses.
+ *
+ * Protocol Flow:
+ * Streaming Characteristics:
+ * Error Handling:
+ * This method subscribes to the event publisher and converts each domain event
+ * to a protobuf {@link StreamResponse}, handling backpressure through the reactive
+ * streams subscription and detecting client disconnections.
+ *
+ * Backpressure Handling:
+ * Disconnect Detection:
+ * When the gRPC client disconnects:
+ * Final Event Detection:
+ * The stream completes automatically when a final task status is received:
+ * This method extracts authentication, metadata, and A2A protocol information
+ * from the gRPC context and packages them into a context object for use by the
+ * request handler and agent executor.
+ *
+ * Default Context Creation:
+ * If no {@link CallContextFactory} is provided, creates a context with:
+ * Custom Context Creation:
+ * If a {@link CallContextFactory} bean is present, delegates to
+ * {@link CallContextFactory#create(StreamObserver)} for custom context creation.
+ *
+ * Context Information:
+ * The gRPC context information is populated by server interceptors (typically
+ * {@code A2AExtensionsInterceptor}) that capture request metadata before service
+ * methods are invoked.
+ *
+ * @param This method converts domain-specific A2A errors to gRPC status codes with
+ * descriptive error messages, allowing clients to understand and handle errors
+ * appropriately.
+ *
+ * Error Mappings:
+ * This method attempts to detect the type of security exception based on the exception
+ * class name and maps it to the appropriate gRPC status code.
+ *
+ * Error Detection:
+ * Used for transport configuration validation with proper classloader context.
+ *
+ * @return the deployment classloader
+ */
protected ClassLoader getDeploymentClassLoader() {
return this.getClass().getClassLoader();
}
+ /**
+ * Sets a callback to be invoked when streaming subscription starts.
+ *
+ * This is a testing hook used to synchronize test execution with streaming setup.
+ * In production, this remains null.
+ *
+ * @param runnable the callback to invoke on subscription
+ */
public static void setStreamingSubscribedRunnable(Runnable runnable) {
streamingSubscribedRunnable = runnable;
}
+ /**
+ * Returns the request handler instance for processing A2A protocol requests.
+ *
+ * @return the request handler
+ */
protected abstract RequestHandler getRequestHandler();
+ /**
+ * Returns the public agent card defining the agent's capabilities and metadata.
+ *
+ * @return the agent card
+ */
protected abstract AgentCard getAgentCard();
+ /**
+ * Returns the extended agent card with additional capabilities, or null if not configured.
+ *
+ * @return the extended agent card, or null if not available
+ */
protected abstract AgentCard getExtendedAgentCard();
+ /**
+ * Returns the custom call context factory, or null to use default context creation.
+ *
+ * @return the call context factory, or null for default behavior
+ */
protected abstract CallContextFactory getCallContextFactory();
+ /**
+ * Returns the executor for running async operations (streaming subscriptions, etc.).
+ *
+ * @return the executor
+ */
protected abstract Executor getExecutor();
/**
diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/package-info.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/package-info.java
index 0cc667b2d..b05417c46 100644
--- a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/package-info.java
+++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/package-info.java
@@ -2,6 +2,51 @@
* Copyright The WildFly Authors
* SPDX-License-Identifier: Apache-2.0
*/
+
+/**
+ * gRPC transport handler implementations for the A2A protocol.
+ *
+ * This package contains the core gRPC handler that processes gRPC requests
+ * and translates them to A2A protocol operations. It supports both unary (blocking)
+ * and streaming responses with proper gRPC error handling and status codes.
+ *
+ * This implementation uses Protocol Buffers for message serialization and provides:
+ * The gRPC handler provides rich context information equivalent to Python's
+ * {@code grpc.aio.ServicerContext}, including:
+ *
+ *
+ *
+ * @see io.a2a.spec.A2AMethods
+ */
public static final MapDefault Behavior
+ *
+ *
+ *
+ * Custom Implementation Example
+ * {@code
+ * @ApplicationScoped
+ * public class CustomCallContextFactory implements CallContextFactory {
+ * @Override
+ * public
+ *
+ * @see ServerCallContext
+ * @see GrpcHandler#createCallContext
+ * @see io.a2a.transport.grpc.context.GrpcContextKeys
+ */
public interface CallContextFactory {
+ /**
+ * Creates a {@link ServerCallContext} from gRPC request context.
+ *
+ * Request Flow
+ *
+ * gRPC Request → GrpcHandler (this class)
+ * ↓
+ * Protobuf → Domain conversion
+ * ↓
+ * RequestHandler → AgentExecutor
+ * ↓
+ * Domain → Protobuf conversion
+ * ↓
+ * gRPC Response (unary or streaming)
+ *
+ *
+ * Supported Operations
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * Error Handling
+ *
+ *
+ *
+ * Context Access
+ *
+ *
+ *
+ * Extension Points
+ *
+ *
+ *
+ * CDI Integration
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param request the gRPC message send request
+ * @param responseObserver the gRPC response stream observer
+ */
@Override
public void sendMessage(io.a2a.grpc.SendMessageRequest request,
StreamObserver
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param request the gRPC message send request
+ * @param responseObserver the gRPC response stream observer
+ */
@Override
public void sendStreamingMessage(io.a2a.grpc.SendMessageRequest request,
StreamObserver
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param publisher the reactive publisher of streaming events
+ * @param responseObserver the gRPC response stream observer
+ * @param context the server call context for disconnect detection
+ */
private void convertToStreamResponse(Flow.Publisher
+ *
+ *
+ *
+ *
+ *
+ * @param
+ *
+ *
+ * @param gRPC Protocol
+ *
+ *
+ *
+ * Supported Methods
+ *
+ *
+ *
+ * Context Access
+ *
+ *
+ *
+ * @see io.a2a.transport.grpc.handler.GrpcHandler
+ * @see io.a2a.transport.grpc.context.GrpcContextKeys
+ */
@NullMarked
package io.a2a.transport.grpc.handler;