diff --git a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java index 3b0421b44..c024a92f0 100644 --- a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java +++ b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/A2AExtensionsInterceptor.java @@ -13,17 +13,78 @@ /** * gRPC server interceptor that captures request metadata and context information, - * providing equivalent functionality to Python's grpc.aio.ServicerContext. + * providing equivalent functionality to Python's {@code grpc.aio.ServicerContext}. * - * This interceptor: - * - Extracts A2A extension headers from incoming requests - * - Captures ServerCall and Metadata for rich context access - * - Stores context information in gRPC Context for service method access - * - Provides proper equivalence to Python's ServicerContext + *

This interceptor executes before service methods are invoked, extracting A2A protocol + * headers and request metadata from the gRPC call and storing them in the gRPC {@link Context} + * for access by {@link io.a2a.transport.grpc.handler.GrpcHandler} and agent implementations. + * + *

Captured Information

+ * + * + *

Context Storage

+ *

All captured information is stored in the gRPC {@link Context} using keys from + * {@link io.a2a.transport.grpc.context.GrpcContextKeys}: + *

+ * + *

CDI Integration

+ *

This interceptor is registered as an {@code @ApplicationScoped} CDI bean and automatically + * applied to gRPC services through Quarkus gRPC's {@code @RegisterInterceptor} annotation. + * + *

Python Equivalence

+ *

This interceptor provides functionality equivalent to Python's {@code grpc.aio.ServicerContext}, + * enabling Java handlers to access the same rich context information available in Python implementations: + *

+ * + * @see io.a2a.transport.grpc.context.GrpcContextKeys + * @see io.a2a.transport.grpc.handler.GrpcHandler + * @see io.grpc.ServerInterceptor */ @ApplicationScoped public class A2AExtensionsInterceptor implements ServerInterceptor { + /** + * Intercepts incoming gRPC calls to capture metadata and context information. + * + *

This method extracts A2A protocol headers and request metadata, stores them + * in the gRPC {@link Context}, and proceeds with the call in the enhanced context. + * + *

Extraction Process: + *

    + *
  1. Extract {@code X-A2A-Version} header from metadata
  2. + *
  3. Extract {@code X-A2A-Extensions} header from metadata
  4. + *
  5. Capture complete {@link Metadata} object
  6. + *
  7. Capture gRPC method name from {@link ServerCall}
  8. + *
  9. Map gRPC method to A2A protocol method name
  10. + *
  11. Extract peer information from server call attributes
  12. + *
  13. Create enhanced {@link Context} with all captured information
  14. + *
  15. Proceed with call in enhanced context
  16. + *
+ * + * @param the request message type + * @param the response message type + * @param serverCall the gRPC server call + * @param metadata the request metadata (headers) + * @param serverCallHandler the next handler in the interceptor chain + * @return a listener for the server call + */ @Override public ServerCall.Listener interceptCall( ServerCall serverCall, diff --git a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java index e5f7a27c2..02d441a17 100644 --- a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java +++ b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcHandler.java @@ -17,6 +17,55 @@ import io.quarkus.security.Authenticated; import org.jspecify.annotations.Nullable; +/** + * Quarkus gRPC service implementation for the A2A protocol. + * + *

This class provides a production-ready gRPC service built on Quarkus gRPC, + * implementing the A2A protocol with CDI integration, authentication, and + * interceptor support for metadata extraction. + * + *

CDI Integration

+ *

This class is a Quarkus gRPC service ({@code @GrpcService}) that automatically: + *

    + *
  • Injects the public {@link AgentCard} (required)
  • + *
  • Injects the extended {@link AgentCard} (optional)
  • + *
  • Injects the {@link RequestHandler} for protocol operations
  • + *
  • Injects the {@link CallContextFactory} for custom context creation (optional)
  • + *
  • Injects the {@link Executor} for async operations
  • + *
+ * + *

Security

+ *

The service is protected with {@code @Authenticated} annotation, requiring + * authentication for all gRPC method calls. Configure authentication in + * {@code application.properties}: + *

+ * quarkus.security.users.embedded.enabled=true
+ * quarkus.security.users.embedded.plain-text=true
+ * quarkus.security.users.embedded.users.alice=password
+ * 
+ * + *

Interceptor Registration

+ *

The {@code @RegisterInterceptor} annotation automatically registers + * {@link A2AExtensionsInterceptor} to capture A2A protocol headers and + * metadata before service methods are invoked. + * + *

Extension Points

+ *

To customize context creation, provide a CDI bean implementing + * {@link CallContextFactory}: + *

{@code
+ * @ApplicationScoped
+ * public class CustomCallContextFactory implements CallContextFactory {
+ *     @Override
+ *     public  ServerCallContext create(StreamObserver responseObserver) {
+ *         // Custom context creation logic
+ *     }
+ * }
+ * }
+ * + * @see io.a2a.transport.grpc.handler.GrpcHandler + * @see A2AExtensionsInterceptor + * @see CallContextFactory + */ @GrpcService @RegisterInterceptor(A2AExtensionsInterceptor.class) @Authenticated @@ -29,11 +78,30 @@ public class QuarkusGrpcHandler extends GrpcHandler { private final Executor executor; /** - * No-args constructor for CDI proxy creation. - * CDI requires a non-private constructor to create proxies for @ApplicationScoped beans. - * All fields are initialized by the @Inject constructor during actual bean creation. + * Constructs a new QuarkusGrpcHandler with CDI-injected dependencies. + * + *

This constructor is invoked by CDI to create the gRPC service bean, + * injecting all required and optional dependencies. + * + *

Required Dependencies: + *

    + *
  • {@code agentCard} - Public agent card defining capabilities
  • + *
  • {@code requestHandler} - Request handler for protocol operations
  • + *
  • {@code executor} - Executor for async operations
  • + *
+ * + *

Optional Dependencies: + *

    + *
  • {@code extendedAgentCard} - Extended agent card (can be unresolvable)
  • + *
  • {@code callContextFactoryInstance} - Custom context factory (can be unsatisfied)
  • + *
+ * + * @param agentCard the public agent card (qualified with {@code @PublicAgentCard}) + * @param extendedAgentCard the extended agent card instance (qualified with {@code @ExtendedAgentCard}) + * @param requestHandler the request handler for protocol operations + * @param callContextFactoryInstance the call context factory instance (optional) + * @param executor the executor for async operations (qualified with {@code @Internal}) */ - @Inject public QuarkusGrpcHandler(@PublicAgentCard AgentCard agentCard, @ExtendedAgentCard Instance extendedAgentCard, diff --git a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcTransportMetadata.java b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcTransportMetadata.java index 1a5a7a078..b12635523 100644 --- a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcTransportMetadata.java +++ b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/QuarkusGrpcTransportMetadata.java @@ -3,7 +3,44 @@ import io.a2a.server.TransportMetadata; import io.a2a.spec.TransportProtocol; +/** + * Transport metadata provider for the Quarkus gRPC reference implementation. + * + *

This class identifies the transport protocol used by the gRPC server implementation. + * It is automatically discovered by the A2A server framework through CDI to provide + * protocol-specific metadata to components that need to distinguish between different + * transport implementations. + * + *

CDI Integration

+ *

This bean is automatically registered and can be injected where transport + * protocol information is needed: + *

{@code
+ * @Inject
+ * TransportMetadata transportMetadata;
+ *
+ * public void logProtocol() {
+ *     String protocol = transportMetadata.getTransportProtocol();
+ *     // Returns "grpc" for this implementation
+ * }
+ * }
+ * + *

Use Cases

+ *
    + *
  • Identifying the active transport protocol in multi-transport deployments
  • + *
  • Conditional logic based on transport capabilities
  • + *
  • Logging and metrics collection with transport-specific tags
  • + *
  • Protocol-specific error handling or feature detection
  • + *
+ * + * @see io.a2a.server.TransportMetadata + * @see io.a2a.spec.TransportProtocol + */ public class QuarkusGrpcTransportMetadata implements TransportMetadata { + /** + * Returns the transport protocol identifier for gRPC. + * + * @return the string "grpc" identifying this transport implementation + */ @Override public String getTransportProtocol() { return TransportProtocol.GRPC.asString(); diff --git a/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/package-info.java b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/package-info.java new file mode 100644 index 000000000..b61c213f5 --- /dev/null +++ b/reference/grpc/src/main/java/io/a2a/server/grpc/quarkus/package-info.java @@ -0,0 +1,168 @@ +/** + * Quarkus gRPC reference implementation for the A2A protocol. + * + *

This package provides a production-ready gRPC server implementation built on + * Quarkus gRPC and Protocol Buffers, demonstrating best practices for A2A protocol + * integration with CDI, authentication, and interceptor support. + * + *

Architecture

+ *
+ * gRPC Request (Protocol Buffers)
+ *     ↓
+ * A2AExtensionsInterceptor (metadata extraction)
+ *     ↓
+ * QuarkusGrpcHandler (@GrpcService)
+ *     ├─ Protobuf → Domain conversion
+ *     ├─ Create ServerCallContext
+ *     ├─ Route to GrpcHandler (transport layer)
+ *     └─ Domain → Protobuf conversion
+ *         ↓
+ * GrpcHandler (transport/grpc)
+ *     ↓
+ * RequestHandler (server-common)
+ *     ↓
+ * AgentExecutor (your implementation)
+ * 
+ * + *

Core Components

+ *
    + *
  • {@link io.a2a.server.grpc.quarkus.QuarkusGrpcHandler QuarkusGrpcHandler} - Main gRPC service implementation
  • + *
  • {@link io.a2a.server.grpc.quarkus.A2AExtensionsInterceptor A2AExtensionsInterceptor} - Metadata extraction interceptor
  • + *
  • {@link io.a2a.server.grpc.quarkus.QuarkusGrpcTransportMetadata QuarkusGrpcTransportMetadata} - Transport protocol identification
  • + *
+ * + *

gRPC Methods

+ * + *

Unary RPC (blocking): + *

    + *
  • {@code SendMessage} - Send message and wait for completion
  • + *
  • {@code GetTask} - Get task by ID
  • + *
  • {@code ListTasks} - List tasks with filtering
  • + *
  • {@code CancelTask} - Cancel task execution
  • + *
  • {@code CreateTaskPushNotificationConfig} - Configure push notifications
  • + *
  • {@code GetTaskPushNotificationConfig} - Get push notification config
  • + *
  • {@code ListTaskPushNotificationConfig} - List push notification configs
  • + *
  • {@code DeleteTaskPushNotificationConfig} - Delete push notification config
  • + *
  • {@code GetExtendedAgentCard} - Get extended agent card
  • + *
+ * + *

Server Streaming RPC: + *

    + *
  • {@code SendStreamingMessage} - Send message with streaming response
  • + *
  • {@code SubscribeToTask} - Subscribe to task events
  • + *
+ * + *

CDI Integration

+ * + *

Required CDI Beans: + *

    + *
  • {@link io.a2a.spec.AgentCard AgentCard} with {@code @PublicAgentCard} qualifier
  • + *
  • {@link io.a2a.server.agentexecution.AgentExecutor AgentExecutor} implementation
  • + *
+ * + *

Optional CDI Beans: + *

    + *
  • {@link io.a2a.spec.AgentCard AgentCard} with {@code @ExtendedAgentCard} qualifier
  • + *
  • {@link io.a2a.transport.grpc.handler.CallContextFactory CallContextFactory} for custom context creation
  • + *
+ * + *

Usage

+ * + *

Add Dependency: + *

{@code
+ * 
+ *   io.github.a2asdk
+ *   a2a-java-sdk-reference-grpc
+ *   ${a2a.version}
+ * 
+ * }
+ * + *

Provide Agent Card: + *

{@code
+ * @ApplicationScoped
+ * public class MyAgentCardProducer {
+ *     @Produces @PublicAgentCard
+ *     public AgentCard agentCard() {
+ *         return new AgentCard.Builder()
+ *             .name("My gRPC Agent")
+ *             .description("Agent description")
+ *             .url("http://localhost:9090")
+ *             .capabilities(new AgentCapabilities.Builder()
+ *                 .streaming(true)
+ *                 .build())
+ *             .build();
+ *     }
+ * }
+ * }
+ * + *

Provide Agent Executor: + *

{@code
+ * @ApplicationScoped
+ * public class MyAgentExecutorProducer {
+ *     @Produces
+ *     public AgentExecutor agentExecutor() {
+ *         return new MyAgentExecutor();
+ *     }
+ * }
+ * }
+ * + *

Configuration

+ * + *

gRPC Server: + *

+ * quarkus.grpc.server.port=9090
+ * quarkus.grpc.server.host=0.0.0.0
+ * 
+ * + *

Authentication: + *

+ * quarkus.security.users.embedded.enabled=true
+ * quarkus.security.users.embedded.plain-text=true
+ * quarkus.security.users.embedded.users.alice=password
+ * 
+ * + *

Customization

+ * + *

Custom Context Creation: + *

Provide a CDI bean implementing {@link io.a2a.transport.grpc.handler.CallContextFactory CallContextFactory}: + *

{@code
+ * @ApplicationScoped
+ * public class CustomCallContextFactory implements CallContextFactory {
+ *     @Override
+ *     public  ServerCallContext create(StreamObserver responseObserver) {
+ *         // Extract custom data from gRPC context
+ *         Context grpcContext = Context.current();
+ *         Metadata metadata = GrpcContextKeys.METADATA_KEY.get(grpcContext);
+ *         String orgId = metadata.get(
+ *             Metadata.Key.of("x-organization-id", Metadata.ASCII_STRING_MARSHALLER)
+ *         );
+ *
+ *         Map state = new HashMap<>();
+ *         state.put("organization", orgId);
+ *         state.put("grpc_response_observer", responseObserver);
+ *
+ *         return new ServerCallContext(
+ *             extractUser(),
+ *             state,
+ *             extractExtensions(grpcContext),
+ *             extractVersion(grpcContext)
+ *         );
+ *     }
+ * }
+ * }
+ * + *

Python Equivalence

+ *

This implementation provides equivalent functionality to Python's {@code grpc.aio} server: + *

    + *
  • {@code grpc.aio.ServicerContext} → {@link io.grpc.Context} with {@link A2AExtensionsInterceptor}
  • + *
  • {@code context.invocation_metadata()} → {@link io.a2a.transport.grpc.context.GrpcContextKeys#METADATA_KEY}
  • + *
  • {@code context.method()} → {@link io.a2a.transport.grpc.context.GrpcContextKeys#GRPC_METHOD_NAME_KEY}
  • + *
  • {@code context.peer()} → {@link io.a2a.transport.grpc.context.GrpcContextKeys#PEER_INFO_KEY}
  • + *
+ * + * @see io.a2a.server.grpc.quarkus.QuarkusGrpcHandler + * @see io.a2a.server.grpc.quarkus.A2AExtensionsInterceptor + * @see io.a2a.transport.grpc.handler.GrpcHandler + * @see io.a2a.transport.grpc.context.GrpcContextKeys + */ +package io.a2a.server.grpc.quarkus; diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java index a1392f20a..58878c7a3 100644 --- a/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/context/GrpcContextKeys.java @@ -9,9 +9,33 @@ /** * Shared gRPC context keys for A2A protocol data. * - * These keys provide access to gRPC context information similar to - * Python's grpc.aio.ServicerContext, enabling rich context access - * in service method implementations. + *

These keys provide access to gRPC context information stored in + * {@link io.grpc.Context}, enabling rich context access in service method + * implementations similar to Python's {@code grpc.aio.ServicerContext}. + * + *

Usage Example

+ *
{@code
+ * public void processRequest(ServerCallContext context) {
+ *     // Access gRPC context information
+ *     Context grpcContext = Context.current();
+ *     String method = GrpcContextKeys.GRPC_METHOD_NAME_KEY.get(grpcContext);
+ *     Metadata metadata = GrpcContextKeys.METADATA_KEY.get(grpcContext);
+ *     String peerInfo = GrpcContextKeys.PEER_INFO_KEY.get(grpcContext);
+ *
+ *     // Access A2A protocol headers
+ *     String version = GrpcContextKeys.VERSION_HEADER_KEY.get(grpcContext);
+ *     String extensions = GrpcContextKeys.EXTENSIONS_HEADER_KEY.get(grpcContext);
+ * }
+ * }
+ * + *

Context Population

+ *

These context keys are populated by server interceptors (typically + * {@code A2AExtensionsInterceptor}) that capture request metadata and store + * it in the gRPC context before service methods are called. + * + * @see io.grpc.Context + * @see io.grpc.Metadata + * @see io.a2a.server.ServerCallContext */ public final class GrpcContextKeys { @@ -54,9 +78,32 @@ public final class GrpcContextKeys { * Context key for storing the peer information. * Provides access to client connection details. */ - public static final Context.Key PEER_INFO_KEY = + public static final Context.Key PEER_INFO_KEY = Context.key("grpc-peer-info"); + /** + * Mapping from gRPC method names to A2A protocol method names. + * + *

This mapping translates gRPC protobuf method names to their corresponding + * A2A protocol method name constants for consistent method identification across + * all transports. + * + *

Method Mappings: + *

    + *
  • SendMessage → SendMessage
  • + *
  • SendStreamingMessage → SendStreamingMessage
  • + *
  • GetTask → GetTask
  • + *
  • ListTask → ListTasks
  • + *
  • CancelTask → CancelTask
  • + *
  • SubscribeToTask → SubscribeToTask
  • + *
  • CreateTaskPushNotification → CreateTaskPushNotificationConfig
  • + *
  • GetTaskPushNotification → GetTaskPushNotificationConfig
  • + *
  • ListTaskPushNotification → ListTaskPushNotificationConfig
  • + *
  • DeleteTaskPushNotification → DeleteTaskPushNotificationConfig
  • + *
+ * + * @see io.a2a.spec.A2AMethods + */ public static final Map METHOD_MAPPING = Map.of( "SendMessage", A2AMethods.SEND_MESSAGE_METHOD, "SendStreamingMessage", A2AMethods.SEND_STREAMING_MESSAGE_METHOD, diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/CallContextFactory.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/CallContextFactory.java index f214a51e5..1a52f41bb 100644 --- a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/CallContextFactory.java +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/CallContextFactory.java @@ -3,6 +3,72 @@ import io.a2a.server.ServerCallContext; import io.grpc.stub.StreamObserver; +/** + * Factory interface for creating {@link ServerCallContext} from gRPC request context. + * + *

This interface provides an extension point for customizing how {@link ServerCallContext} + * instances are created in gRPC applications. The default implementation in {@link GrpcHandler} + * extracts standard information (user, metadata, headers, peer info, protocol version), but + * applications can provide their own implementation to add custom context data. + * + *

Default Behavior

+ *

When no CDI bean implementing this interface is provided, {@link GrpcHandler} + * creates contexts with: + *

    + *
  • User authentication from security context
  • + *
  • gRPC metadata (headers)
  • + *
  • Method name and peer information
  • + *
  • A2A protocol version from {@code X-A2A-Version} header
  • + *
  • Required extensions from {@code X-A2A-Extensions} header
  • + *
  • Response observer for gRPC streaming
  • + *
+ * + *

Custom Implementation Example

+ *
{@code
+ * @ApplicationScoped
+ * public class CustomCallContextFactory implements CallContextFactory {
+ *     @Override
+ *     public  ServerCallContext create(StreamObserver responseObserver) {
+ *         // Extract custom data from gRPC context
+ *         Context grpcContext = Context.current();
+ *         Metadata metadata = GrpcContextKeys.METADATA_KEY.get(grpcContext);
+ *         String orgId = metadata.get(
+ *             Metadata.Key.of("x-organization-id", Metadata.ASCII_STRING_MARSHALLER)
+ *         );
+ *
+ *         Map state = new HashMap<>();
+ *         state.put("organization", orgId);
+ *         state.put("grpc_response_observer", responseObserver);
+ *
+ *         return new ServerCallContext(
+ *             extractUser(),
+ *             state,
+ *             extractExtensions(grpcContext),
+ *             extractVersion(grpcContext)
+ *         );
+ *     }
+ * }
+ * }
+ * + * @see ServerCallContext + * @see GrpcHandler#createCallContext + * @see io.a2a.transport.grpc.context.GrpcContextKeys + */ public interface CallContextFactory { + /** + * Creates a {@link ServerCallContext} from gRPC request context. + * + *

This method is called for each incoming gRPC request to create the context + * that will be passed to the {@link io.a2a.server.requesthandlers.RequestHandler} + * and eventually to the {@link io.a2a.server.agentexecution.AgentExecutor}. + * + *

Implementations should extract information from the current {@link io.grpc.Context} + * using {@link io.a2a.transport.grpc.context.GrpcContextKeys} to access metadata, + * method name, peer info, and A2A protocol headers. + * + * @param the response type for the gRPC method + * @param responseObserver the gRPC response stream observer + * @return a new ServerCallContext with extracted authentication, metadata, and headers + */ ServerCallContext create(StreamObserver responseObserver); } \ No newline at end of file diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java index 277a763cd..d5c8ce389 100644 --- a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java @@ -64,6 +64,88 @@ import io.grpc.stub.StreamObserver; import org.jspecify.annotations.Nullable; +/** + * gRPC transport handler for processing A2A protocol requests. + * + *

This abstract class implements the gRPC service interface for the A2A protocol, + * handling both unary (blocking) and server streaming RPC calls. It translates gRPC + * requests to A2A protocol operations, coordinates with the request handler and agent + * executor, and manages error handling with appropriate gRPC status codes. + * + *

Request Flow

+ *
+ * gRPC Request → GrpcHandler (this class)
+ *     ↓
+ * Protobuf → Domain conversion
+ *     ↓
+ * RequestHandler → AgentExecutor
+ *     ↓
+ * Domain → Protobuf conversion
+ *     ↓
+ * gRPC Response (unary or streaming)
+ * 
+ * + *

Supported Operations

+ * + *

Unary RPC (blocking): + *

    + *
  • {@link #sendMessage} - Send message and wait for completion
  • + *
  • {@link #getTask} - Retrieve task by ID
  • + *
  • {@link #cancelTask} - Cancel task execution
  • + *
  • {@link #listTasks} - List tasks with filtering
  • + *
  • {@link #createTaskPushNotificationConfig} - Configure push notifications
  • + *
  • {@link #getTaskPushNotificationConfig} - Get push notification config
  • + *
  • {@link #listTaskPushNotificationConfig} - List push notification configs
  • + *
  • {@link #deleteTaskPushNotificationConfig} - Delete push notification config
  • + *
  • {@link #getExtendedAgentCard} - Get extended agent capabilities
  • + *
+ * + *

Server Streaming RPC: + *

    + *
  • {@link #sendStreamingMessage} - Send message with streaming response
  • + *
  • {@link #subscribeToTask} - Subscribe to task events
  • + *
+ * + *

Error Handling

+ *

A2A errors are mapped to gRPC status codes: + *

    + *
  • {@link io.a2a.spec.InvalidRequestError} → {@link Status#INVALID_ARGUMENT}
  • + *
  • {@link io.a2a.spec.MethodNotFoundError} → {@link Status#NOT_FOUND}
  • + *
  • {@link io.a2a.spec.TaskNotFoundError} → {@link Status#NOT_FOUND}
  • + *
  • {@link io.a2a.spec.InternalError} → {@link Status#INTERNAL}
  • + *
  • {@link io.a2a.spec.UnsupportedOperationError} → {@link Status#UNIMPLEMENTED}
  • + *
  • {@link SecurityException} → {@link Status#UNAUTHENTICATED} or {@link Status#PERMISSION_DENIED}
  • + *
+ * + *

Context Access

+ *

The handler provides rich context information equivalent to Python's + * {@code grpc.aio.ServicerContext}: + *

    + *
  • {@link #getCurrentMetadata()} - Request metadata (headers)
  • + *
  • {@link #getCurrentMethodName()} - gRPC method name
  • + *
  • {@link #getCurrentPeerInfo()} - Client connection details
  • + *
+ * + *

Extension Points

+ *

Subclasses must implement: + *

    + *
  • {@link #getRequestHandler()} - Request handler instance
  • + *
  • {@link #getAgentCard()} - Public agent card
  • + *
  • {@link #getExtendedAgentCard()} - Extended agent card (nullable)
  • + *
  • {@link #getCallContextFactory()} - Custom context factory (nullable)
  • + *
  • {@link #getExecutor()} - Executor for async operations
  • + *
+ * + *

CDI Integration

+ *

This class is marked with {@code @Vetoed} to prevent direct CDI management. + * Subclasses should be CDI beans (e.g., {@code @GrpcService} in Quarkus) that + * inject dependencies and provide them through the abstract methods. + * + * @see io.a2a.grpc.A2AServiceGrpc.A2AServiceImplBase + * @see io.a2a.server.requesthandlers.RequestHandler + * @see CallContextFactory + * @see io.a2a.transport.grpc.context.GrpcContextKeys + */ @Vetoed public abstract class GrpcHandler extends A2AServiceGrpc.A2AServiceImplBase { @@ -75,10 +157,38 @@ public abstract class GrpcHandler extends A2AServiceGrpc.A2AServiceImplBase { private static final Logger LOGGER = Logger.getLogger(GrpcHandler.class.getName()); + /** + * Constructs a new GrpcHandler. + */ public GrpcHandler() { } + /** + * Handles a unary (blocking) message send request. + * + *

This method processes a message send request, waits for the agent to complete + * processing, and returns either a Task or Message in the response. + * + *

Protocol Flow: + *

    + *
  1. Validate A2A protocol version and extensions
  2. + *
  3. Convert protobuf request to domain {@link MessageSendParams}
  4. + *
  5. Invoke {@link io.a2a.server.requesthandlers.RequestHandler#onMessageSend}
  6. + *
  7. Convert domain response to protobuf {@link io.a2a.grpc.SendMessageResponse}
  8. + *
  9. Send response and complete the RPC
  10. + *
+ * + *

Error Handling: + *

    + *
  • {@link A2AError} → mapped to appropriate gRPC status code
  • + *
  • {@link SecurityException} → {@code UNAUTHENTICATED} or {@code PERMISSION_DENIED}
  • + *
  • {@link Throwable} → {@code INTERNAL} error
  • + *
+ * + * @param request the gRPC message send request + * @param responseObserver the gRPC response stream observer + */ @Override public void sendMessage(io.a2a.grpc.SendMessageRequest request, StreamObserver responseObserver) { @@ -232,6 +342,43 @@ public void listTaskPushNotificationConfig(io.a2a.grpc.ListTaskPushNotificationC } } + /** + * Handles a server streaming message send request. + * + *

This method processes a message send request with streaming response, where + * the agent can emit multiple events (artifacts, status updates, messages) as the + * task progresses. + * + *

Protocol Flow: + *

    + *
  1. Verify streaming capability is enabled in agent card
  2. + *
  3. Validate A2A protocol version and extensions
  4. + *
  5. Convert protobuf request to domain {@link MessageSendParams}
  6. + *
  7. Invoke {@link io.a2a.server.requesthandlers.RequestHandler#onMessageSendStream}
  8. + *
  9. Subscribe to event publisher and stream responses
  10. + *
  11. Convert each domain event to protobuf {@link io.a2a.grpc.StreamResponse}
  12. + *
  13. Complete RPC when final event received or error occurs
  14. + *
+ * + *

Streaming Characteristics: + *

    + *
  • Server streaming RPC - server sends multiple responses
  • + *
  • Backpressure handled through reactive streams subscription
  • + *
  • Client disconnect detection via gRPC context cancellation
  • + *
  • Automatic cleanup when stream completes or errors
  • + *
+ * + *

Error Handling: + *

    + *
  • Streaming not enabled → {@link io.a2a.spec.InvalidRequestError}
  • + *
  • Other {@link A2AError} → mapped to appropriate gRPC status code
  • + *
  • {@link SecurityException} → {@code UNAUTHENTICATED} or {@code PERMISSION_DENIED}
  • + *
  • {@link Throwable} → {@code INTERNAL} error
  • + *
+ * + * @param request the gRPC message send request + * @param responseObserver the gRPC response stream observer + */ @Override public void sendStreamingMessage(io.a2a.grpc.SendMessageRequest request, StreamObserver responseObserver) { @@ -278,6 +425,43 @@ public void subscribeToTask(io.a2a.grpc.SubscribeToTaskRequest request, } } + /** + * Converts a reactive stream of domain events to gRPC streaming responses. + * + *

This method subscribes to the event publisher and converts each domain event + * to a protobuf {@link StreamResponse}, handling backpressure through the reactive + * streams subscription and detecting client disconnections. + * + *

Backpressure Handling: + *

    + *
  1. Request 1 event from upstream
  2. + *
  3. Send event to gRPC response observer
  4. + *
  5. Wait for send completion
  6. + *
  7. Request next event (backpressure)
  8. + *
+ * + *

Disconnect Detection: + *

When the gRPC client disconnects: + *

    + *
  1. gRPC Context cancellation listener fires
  2. + *
  3. Invokes {@link ServerCallContext#invokeEventConsumerCancelCallback()}
  4. + *
  5. Cancels upstream subscription
  6. + *
  7. Stops event polling
  8. + *
+ * + *

Final Event Detection: + *

The stream completes automatically when a final task status is received: + *

    + *
  • {@code TASK_STATE_COMPLETED}
  • + *
  • {@code TASK_STATE_CANCELED}
  • + *
  • {@code TASK_STATE_FAILED}
  • + *
  • {@code TASK_STATE_REJECTED}
  • + *
+ * + * @param publisher the reactive publisher of streaming events + * @param responseObserver the gRPC response stream observer + * @param context the server call context for disconnect detection + */ private void convertToStreamResponse(Flow.Publisher publisher, StreamObserver responseObserver, ServerCallContext context) { @@ -398,6 +582,42 @@ public void deleteTaskPushNotificationConfig(io.a2a.grpc.DeleteTaskPushNotificat } } + /** + * Creates a {@link ServerCallContext} from the current gRPC request context. + * + *

This method extracts authentication, metadata, and A2A protocol information + * from the gRPC context and packages them into a context object for use by the + * request handler and agent executor. + * + *

Default Context Creation: + *

If no {@link CallContextFactory} is provided, creates a context with: + *

    + *
  • User authentication (defaults to {@link UnauthenticatedUser})
  • + *
  • Transport protocol ({@link TransportProtocol#GRPC})
  • + *
  • gRPC response observer for streaming
  • + *
  • gRPC context and metadata (equivalent to Python's ServicerContext)
  • + *
  • HTTP headers extracted from metadata
  • + *
  • gRPC method name
  • + *
  • Peer information (client connection details)
  • + *
  • A2A protocol version from {@code X-A2A-Version} header (via context)
  • + *
  • Required extensions from {@code X-A2A-Extensions} header (via context)
  • + *
+ * + *

Custom Context Creation: + *

If a {@link CallContextFactory} bean is present, delegates to + * {@link CallContextFactory#create(StreamObserver)} for custom context creation. + * + *

Context Information: + *

The gRPC context information is populated by server interceptors (typically + * {@code A2AExtensionsInterceptor}) that capture request metadata before service + * methods are invoked. + * + * @param the response type for the gRPC method + * @param responseObserver the gRPC response stream observer + * @return the server call context + * @see CallContextFactory + * @see io.a2a.transport.grpc.context.GrpcContextKeys + */ private ServerCallContext createCallContext(StreamObserver responseObserver) { CallContextFactory factory = getCallContextFactory(); if (factory == null) { @@ -463,6 +683,36 @@ private ServerCallContext createCallContext(StreamObserver responseObserv } } + /** + * Handles A2A protocol errors by mapping them to appropriate gRPC status codes. + * + *

This method converts domain-specific A2A errors to gRPC status codes with + * descriptive error messages, allowing clients to understand and handle errors + * appropriately. + * + *

Error Mappings: + *

    + *
  • {@link InvalidRequestError} → {@code INVALID_ARGUMENT}
  • + *
  • {@link MethodNotFoundError} → {@code NOT_FOUND}
  • + *
  • {@link InvalidParamsError} → {@code INVALID_ARGUMENT}
  • + *
  • {@link InternalError} → {@code INTERNAL}
  • + *
  • {@link TaskNotFoundError} → {@code NOT_FOUND}
  • + *
  • {@link TaskNotCancelableError} → {@code FAILED_PRECONDITION}
  • + *
  • {@link PushNotificationNotSupportedError} → {@code UNIMPLEMENTED}
  • + *
  • {@link UnsupportedOperationError} → {@code UNIMPLEMENTED}
  • + *
  • {@link JSONParseError} → {@code INTERNAL}
  • + *
  • {@link ContentTypeNotSupportedError} → {@code INVALID_ARGUMENT}
  • + *
  • {@link InvalidAgentResponseError} → {@code INTERNAL}
  • + *
  • {@link ExtendedAgentCardNotConfiguredError} → {@code FAILED_PRECONDITION}
  • + *
  • {@link ExtensionSupportRequiredError} → {@code FAILED_PRECONDITION}
  • + *
  • {@link VersionNotSupportedError} → {@code UNIMPLEMENTED}
  • + *
  • Unknown errors → {@code UNKNOWN}
  • + *
+ * + * @param the response type for the gRPC method + * @param responseObserver the gRPC response stream observer + * @param error the A2A protocol error + */ private void handleError(StreamObserver responseObserver, A2AError error) { Status status; String description; @@ -515,6 +765,23 @@ private void handleError(StreamObserver responseObserver, A2AError error) responseObserver.onError(status.withDescription(description).asRuntimeException()); } + /** + * Handles security-related exceptions by mapping them to gRPC authentication/authorization errors. + * + *

This method attempts to detect the type of security exception based on the exception + * class name and maps it to the appropriate gRPC status code. + * + *

Error Detection: + *

    + *
  • Unauthorized/Unauthenticated/Authentication exceptions → {@code UNAUTHENTICATED}
  • + *
  • Forbidden/AccessDenied/Authorization exceptions → {@code PERMISSION_DENIED}
  • + *
  • Other SecurityException → {@code PERMISSION_DENIED} (default)
  • + *
+ * + * @param the response type for the gRPC method + * @param responseObserver the gRPC response stream observer + * @param e the security exception + */ private void handleSecurityException(StreamObserver responseObserver, SecurityException e) { Status status; String description; @@ -574,22 +841,62 @@ private void validateTransportConfigurationWithCorrectClassLoader(AgentCard agen } } + /** + * Returns the deployment classloader for this handler. + * + *

Used for transport configuration validation with proper classloader context. + * + * @return the deployment classloader + */ protected ClassLoader getDeploymentClassLoader() { return this.getClass().getClassLoader(); } + /** + * Sets a callback to be invoked when streaming subscription starts. + * + *

This is a testing hook used to synchronize test execution with streaming setup. + * In production, this remains null. + * + * @param runnable the callback to invoke on subscription + */ public static void setStreamingSubscribedRunnable(Runnable runnable) { streamingSubscribedRunnable = runnable; } + /** + * Returns the request handler instance for processing A2A protocol requests. + * + * @return the request handler + */ protected abstract RequestHandler getRequestHandler(); + /** + * Returns the public agent card defining the agent's capabilities and metadata. + * + * @return the agent card + */ protected abstract AgentCard getAgentCard(); + /** + * Returns the extended agent card with additional capabilities, or null if not configured. + * + * @return the extended agent card, or null if not available + */ protected abstract AgentCard getExtendedAgentCard(); + /** + * Returns the custom call context factory, or null to use default context creation. + * + * @return the call context factory, or null for default behavior + */ protected abstract CallContextFactory getCallContextFactory(); + /** + * Returns the executor for running async operations (streaming subscriptions, etc.). + * + * @return the executor + */ protected abstract Executor getExecutor(); /** diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/package-info.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/package-info.java index 0cc667b2d..b05417c46 100644 --- a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/package-info.java +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/package-info.java @@ -2,6 +2,51 @@ * Copyright The WildFly Authors * SPDX-License-Identifier: Apache-2.0 */ + +/** + * gRPC transport handler implementations for the A2A protocol. + * + *

This package contains the core gRPC handler that processes gRPC requests + * and translates them to A2A protocol operations. It supports both unary (blocking) + * and streaming responses with proper gRPC error handling and status codes. + * + *

gRPC Protocol

+ *

This implementation uses Protocol Buffers for message serialization and provides: + *

    + *
  • Unary RPC calls for blocking operations
  • + *
  • Server streaming RPC for streaming responses
  • + *
  • Rich error handling with gRPC status codes
  • + *
  • Context-aware metadata extraction
  • + *
+ * + *

Supported Methods

+ *
    + *
  • {@code SendMessage} - Send message (unary/blocking)
  • + *
  • {@code SendStreamingMessage} - Send message (server streaming)
  • + *
  • {@code SubscribeToTask} - Subscribe to task updates (server streaming)
  • + *
  • {@code GetTask} - Get task by ID
  • + *
  • {@code ListTasks} - List tasks with filtering
  • + *
  • {@code CancelTask} - Cancel task execution
  • + *
  • {@code GetTaskPushNotificationConfig} - Get push notification config
  • + *
  • {@code CreateTaskPushNotificationConfig} - Create push notification config
  • + *
  • {@code ListTaskPushNotificationConfig} - List push notification configs
  • + *
  • {@code DeleteTaskPushNotificationConfig} - Delete push notification config
  • + *
  • {@code GetExtendedAgentCard} - Get extended agent card
  • + *
+ * + *

Context Access

+ *

The gRPC handler provides rich context information equivalent to Python's + * {@code grpc.aio.ServicerContext}, including: + *

    + *
  • Request metadata (headers)
  • + *
  • Method name
  • + *
  • Peer information
  • + *
  • A2A protocol version and extensions
  • + *
+ * + * @see io.a2a.transport.grpc.handler.GrpcHandler + * @see io.a2a.transport.grpc.context.GrpcContextKeys + */ @NullMarked package io.a2a.transport.grpc.handler;