Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,78 @@

/**
* gRPC server interceptor that captures request metadata and context information,
* providing equivalent functionality to Python's grpc.aio.ServicerContext.
* providing equivalent functionality to Python's {@code grpc.aio.ServicerContext}.
*
* This interceptor:
* - Extracts A2A extension headers from incoming requests
* - Captures ServerCall and Metadata for rich context access
* - Stores context information in gRPC Context for service method access
* - Provides proper equivalence to Python's ServicerContext
* <p>This interceptor executes before service methods are invoked, extracting A2A protocol
* headers and request metadata from the gRPC call and storing them in the gRPC {@link Context}
* for access by {@link io.a2a.transport.grpc.handler.GrpcHandler} and agent implementations.
*
* <h2>Captured Information</h2>
* <ul>
* <li><b>A2A Protocol Version</b>: {@code X-A2A-Version} header</li>
* <li><b>A2A Extensions</b>: {@code X-A2A-Extensions} header</li>
* <li><b>Complete Metadata</b>: All request headers via {@link io.grpc.Metadata}</li>
* <li><b>Method Name</b>: gRPC method being invoked</li>
* <li><b>Peer Information</b>: Client connection details</li>
* </ul>
*
* <h2>Context Storage</h2>
* <p>All captured information is stored in the gRPC {@link Context} using keys from
* {@link io.a2a.transport.grpc.context.GrpcContextKeys}:
* <ul>
* <li>{@link io.a2a.transport.grpc.context.GrpcContextKeys#VERSION_HEADER_KEY VERSION_HEADER_KEY}</li>
* <li>{@link io.a2a.transport.grpc.context.GrpcContextKeys#EXTENSIONS_HEADER_KEY EXTENSIONS_HEADER_KEY}</li>
* <li>{@link io.a2a.transport.grpc.context.GrpcContextKeys#METADATA_KEY METADATA_KEY}</li>
* <li>{@link io.a2a.transport.grpc.context.GrpcContextKeys#GRPC_METHOD_NAME_KEY GRPC_METHOD_NAME_KEY}</li>
* <li>{@link io.a2a.transport.grpc.context.GrpcContextKeys#METHOD_NAME_KEY METHOD_NAME_KEY}</li>
* <li>{@link io.a2a.transport.grpc.context.GrpcContextKeys#PEER_INFO_KEY PEER_INFO_KEY}</li>
* </ul>
*
* <h2>CDI Integration</h2>
* <p>This interceptor is registered as an {@code @ApplicationScoped} CDI bean and automatically
* applied to gRPC services through Quarkus gRPC's {@code @RegisterInterceptor} annotation.
*
* <h2>Python Equivalence</h2>
* <p>This interceptor provides functionality equivalent to Python's {@code grpc.aio.ServicerContext},
* enabling Java handlers to access the same rich context information available in Python implementations:
* <ul>
* <li>{@code context.invocation_metadata()} → {@link io.grpc.Metadata}</li>
* <li>{@code context.method()} → Method name via {@code GRPC_METHOD_NAME_KEY}</li>
* <li>{@code context.peer()} → Peer info via {@code PEER_INFO_KEY}</li>
* </ul>
*
* @see io.a2a.transport.grpc.context.GrpcContextKeys
* @see io.a2a.transport.grpc.handler.GrpcHandler
* @see io.grpc.ServerInterceptor
*/
@ApplicationScoped
public class A2AExtensionsInterceptor implements ServerInterceptor {

/**
* Intercepts incoming gRPC calls to capture metadata and context information.
*
* <p>This method extracts A2A protocol headers and request metadata, stores them
* in the gRPC {@link Context}, and proceeds with the call in the enhanced context.
*
* <p><b>Extraction Process:</b>
* <ol>
* <li>Extract {@code X-A2A-Version} header from metadata</li>
* <li>Extract {@code X-A2A-Extensions} header from metadata</li>
* <li>Capture complete {@link Metadata} object</li>
* <li>Capture gRPC method name from {@link ServerCall}</li>
* <li>Map gRPC method to A2A protocol method name</li>
* <li>Extract peer information from server call attributes</li>
* <li>Create enhanced {@link Context} with all captured information</li>
* <li>Proceed with call in enhanced context</li>
* </ol>
*
* @param <ReqT> the request message type
* @param <RespT> the response message type
* @param serverCall the gRPC server call
* @param metadata the request metadata (headers)
* @param serverCallHandler the next handler in the interceptor chain
* @return a listener for the server call
*/
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> serverCall,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,55 @@
import io.quarkus.security.Authenticated;
import org.jspecify.annotations.Nullable;

/**
* Quarkus gRPC service implementation for the A2A protocol.
*
* <p>This class provides a production-ready gRPC service built on Quarkus gRPC,
* implementing the A2A protocol with CDI integration, authentication, and
* interceptor support for metadata extraction.
*
* <h2>CDI Integration</h2>
* <p>This class is a Quarkus gRPC service ({@code @GrpcService}) that automatically:
* <ul>
* <li>Injects the public {@link AgentCard} (required)</li>
* <li>Injects the extended {@link AgentCard} (optional)</li>
* <li>Injects the {@link RequestHandler} for protocol operations</li>
* <li>Injects the {@link CallContextFactory} for custom context creation (optional)</li>
* <li>Injects the {@link Executor} for async operations</li>
* </ul>
*
* <h2>Security</h2>
* <p>The service is protected with {@code @Authenticated} annotation, requiring
* authentication for all gRPC method calls. Configure authentication in
* {@code application.properties}:
* <pre>
* quarkus.security.users.embedded.enabled=true
* quarkus.security.users.embedded.plain-text=true
* quarkus.security.users.embedded.users.alice=password
* </pre>
*
* <h2>Interceptor Registration</h2>
* <p>The {@code @RegisterInterceptor} annotation automatically registers
* {@link A2AExtensionsInterceptor} to capture A2A protocol headers and
* metadata before service methods are invoked.
*
* <h2>Extension Points</h2>
* <p>To customize context creation, provide a CDI bean implementing
* {@link CallContextFactory}:
* <pre>{@code
* @ApplicationScoped
* public class CustomCallContextFactory implements CallContextFactory {
* @Override
* public <V> ServerCallContext create(StreamObserver<V> responseObserver) {
* // Custom context creation logic
* }
* }
* }</pre>
*
* @see io.a2a.transport.grpc.handler.GrpcHandler
* @see A2AExtensionsInterceptor
* @see CallContextFactory
*/
@GrpcService
@RegisterInterceptor(A2AExtensionsInterceptor.class)
@Authenticated
Expand All @@ -29,11 +78,30 @@ public class QuarkusGrpcHandler extends GrpcHandler {
private final Executor executor;

/**
* No-args constructor for CDI proxy creation.
* CDI requires a non-private constructor to create proxies for @ApplicationScoped beans.
* All fields are initialized by the @Inject constructor during actual bean creation.
* Constructs a new QuarkusGrpcHandler with CDI-injected dependencies.
*
* <p>This constructor is invoked by CDI to create the gRPC service bean,
* injecting all required and optional dependencies.
*
* <p><b>Required Dependencies:</b>
* <ul>
* <li>{@code agentCard} - Public agent card defining capabilities</li>
* <li>{@code requestHandler} - Request handler for protocol operations</li>
* <li>{@code executor} - Executor for async operations</li>
* </ul>
*
* <p><b>Optional Dependencies:</b>
* <ul>
* <li>{@code extendedAgentCard} - Extended agent card (can be unresolvable)</li>
* <li>{@code callContextFactoryInstance} - Custom context factory (can be unsatisfied)</li>
* </ul>
*
* @param agentCard the public agent card (qualified with {@code @PublicAgentCard})
* @param extendedAgentCard the extended agent card instance (qualified with {@code @ExtendedAgentCard})
* @param requestHandler the request handler for protocol operations
* @param callContextFactoryInstance the call context factory instance (optional)
* @param executor the executor for async operations (qualified with {@code @Internal})
*/

@Inject
public QuarkusGrpcHandler(@PublicAgentCard AgentCard agentCard,
@ExtendedAgentCard Instance<AgentCard> extendedAgentCard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,44 @@
import io.a2a.server.TransportMetadata;
import io.a2a.spec.TransportProtocol;

/**
* Transport metadata provider for the Quarkus gRPC reference implementation.
*
* <p>This class identifies the transport protocol used by the gRPC server implementation.
* It is automatically discovered by the A2A server framework through CDI to provide
* protocol-specific metadata to components that need to distinguish between different
* transport implementations.
*
* <h2>CDI Integration</h2>
* <p>This bean is automatically registered and can be injected where transport
* protocol information is needed:
* <pre>{@code
* @Inject
* TransportMetadata transportMetadata;
*
* public void logProtocol() {
* String protocol = transportMetadata.getTransportProtocol();
* // Returns "grpc" for this implementation
* }
* }</pre>
*
* <h2>Use Cases</h2>
* <ul>
* <li>Identifying the active transport protocol in multi-transport deployments</li>
* <li>Conditional logic based on transport capabilities</li>
* <li>Logging and metrics collection with transport-specific tags</li>
* <li>Protocol-specific error handling or feature detection</li>
* </ul>
*
* @see io.a2a.server.TransportMetadata
* @see io.a2a.spec.TransportProtocol
*/
public class QuarkusGrpcTransportMetadata implements TransportMetadata {
/**
* Returns the transport protocol identifier for gRPC.
*
* @return the string "grpc" identifying this transport implementation
*/
@Override
public String getTransportProtocol() {
return TransportProtocol.GRPC.asString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/**
* Quarkus gRPC reference implementation for the A2A protocol.
*
* <p>This package provides a production-ready gRPC server implementation built on
* Quarkus gRPC and Protocol Buffers, demonstrating best practices for A2A protocol
* integration with CDI, authentication, and interceptor support.
*
* <h2>Architecture</h2>
* <pre>
* gRPC Request (Protocol Buffers)
* ↓
* A2AExtensionsInterceptor (metadata extraction)
* ↓
* QuarkusGrpcHandler (@GrpcService)
* ├─ Protobuf → Domain conversion
* ├─ Create ServerCallContext
* ├─ Route to GrpcHandler (transport layer)
* └─ Domain → Protobuf conversion
* ↓
* GrpcHandler (transport/grpc)
* ↓
* RequestHandler (server-common)
* ↓
* AgentExecutor (your implementation)
* </pre>
*
* <h2>Core Components</h2>
* <ul>
* <li>{@link io.a2a.server.grpc.quarkus.QuarkusGrpcHandler QuarkusGrpcHandler} - Main gRPC service implementation</li>
* <li>{@link io.a2a.server.grpc.quarkus.A2AExtensionsInterceptor A2AExtensionsInterceptor} - Metadata extraction interceptor</li>
* <li>{@link io.a2a.server.grpc.quarkus.QuarkusGrpcTransportMetadata QuarkusGrpcTransportMetadata} - Transport protocol identification</li>
* </ul>
*
* <h2>gRPC Methods</h2>
*
* <p><b>Unary RPC (blocking):</b>
* <ul>
* <li>{@code SendMessage} - Send message and wait for completion</li>
* <li>{@code GetTask} - Get task by ID</li>
* <li>{@code ListTasks} - List tasks with filtering</li>
* <li>{@code CancelTask} - Cancel task execution</li>
* <li>{@code CreateTaskPushNotificationConfig} - Configure push notifications</li>
* <li>{@code GetTaskPushNotificationConfig} - Get push notification config</li>
* <li>{@code ListTaskPushNotificationConfig} - List push notification configs</li>
* <li>{@code DeleteTaskPushNotificationConfig} - Delete push notification config</li>
* <li>{@code GetExtendedAgentCard} - Get extended agent card</li>
* </ul>
*
* <p><b>Server Streaming RPC:</b>
* <ul>
* <li>{@code SendStreamingMessage} - Send message with streaming response</li>
* <li>{@code SubscribeToTask} - Subscribe to task events</li>
* </ul>
*
* <h2>CDI Integration</h2>
*
* <p><b>Required CDI Beans:</b>
* <ul>
* <li>{@link io.a2a.spec.AgentCard AgentCard} with {@code @PublicAgentCard} qualifier</li>
* <li>{@link io.a2a.server.agentexecution.AgentExecutor AgentExecutor} implementation</li>
* </ul>
*
* <p><b>Optional CDI Beans:</b>
* <ul>
* <li>{@link io.a2a.spec.AgentCard AgentCard} with {@code @ExtendedAgentCard} qualifier</li>
* <li>{@link io.a2a.transport.grpc.handler.CallContextFactory CallContextFactory} for custom context creation</li>
* </ul>
*
* <h2>Usage</h2>
*
* <p><b>Add Dependency:</b>
* <pre>{@code
* <dependency>
* <groupId>io.github.a2asdk</groupId>
* <artifactId>a2a-java-sdk-reference-grpc</artifactId>
* <version>${a2a.version}</version>
* </dependency>
* }</pre>
*
* <p><b>Provide Agent Card:</b>
* <pre>{@code
* @ApplicationScoped
* public class MyAgentCardProducer {
* @Produces @PublicAgentCard
* public AgentCard agentCard() {
* return new AgentCard.Builder()
* .name("My gRPC Agent")
* .description("Agent description")
* .url("http://localhost:9090")
* .capabilities(new AgentCapabilities.Builder()
* .streaming(true)
* .build())
* .build();
* }
* }
* }</pre>
*
* <p><b>Provide Agent Executor:</b>
* <pre>{@code
* @ApplicationScoped
* public class MyAgentExecutorProducer {
* @Produces
* public AgentExecutor agentExecutor() {
* return new MyAgentExecutor();
* }
* }
* }</pre>
*
* <h2>Configuration</h2>
*
* <p><b>gRPC Server:</b>
* <pre>
* quarkus.grpc.server.port=9090
* quarkus.grpc.server.host=0.0.0.0
* </pre>
*
* <p><b>Authentication:</b>
* <pre>
* quarkus.security.users.embedded.enabled=true
* quarkus.security.users.embedded.plain-text=true
* quarkus.security.users.embedded.users.alice=password
* </pre>
*
* <h2>Customization</h2>
*
* <p><b>Custom Context Creation:</b>
* <p>Provide a CDI bean implementing {@link io.a2a.transport.grpc.handler.CallContextFactory CallContextFactory}:
* <pre>{@code
* @ApplicationScoped
* public class CustomCallContextFactory implements CallContextFactory {
* @Override
* public <V> ServerCallContext create(StreamObserver<V> responseObserver) {
* // Extract custom data from gRPC context
* Context grpcContext = Context.current();
* Metadata metadata = GrpcContextKeys.METADATA_KEY.get(grpcContext);
* String orgId = metadata.get(
* Metadata.Key.of("x-organization-id", Metadata.ASCII_STRING_MARSHALLER)
* );
*
* Map<String, Object> state = new HashMap<>();
* state.put("organization", orgId);
* state.put("grpc_response_observer", responseObserver);
*
* return new ServerCallContext(
* extractUser(),
* state,
* extractExtensions(grpcContext),
* extractVersion(grpcContext)
* );
* }
* }
* }</pre>
*
* <h2>Python Equivalence</h2>
* <p>This implementation provides equivalent functionality to Python's {@code grpc.aio} server:
* <ul>
* <li>{@code grpc.aio.ServicerContext} → {@link io.grpc.Context} with {@link A2AExtensionsInterceptor}</li>
* <li>{@code context.invocation_metadata()} → {@link io.a2a.transport.grpc.context.GrpcContextKeys#METADATA_KEY}</li>
* <li>{@code context.method()} → {@link io.a2a.transport.grpc.context.GrpcContextKeys#GRPC_METHOD_NAME_KEY}</li>
* <li>{@code context.peer()} → {@link io.a2a.transport.grpc.context.GrpcContextKeys#PEER_INFO_KEY}</li>
* </ul>
*
* @see io.a2a.server.grpc.quarkus.QuarkusGrpcHandler
* @see io.a2a.server.grpc.quarkus.A2AExtensionsInterceptor
* @see io.a2a.transport.grpc.handler.GrpcHandler
* @see io.a2a.transport.grpc.context.GrpcContextKeys
*/
package io.a2a.server.grpc.quarkus;
Loading